s3torchconnector.dcp ==================== .. py:module:: s3torchconnector.dcp Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/s3torchconnector/dcp/s3_file_system/index Classes ------- .. autoapisummary:: s3torchconnector.dcp.S3FileSystem s3torchconnector.dcp.S3StorageReader s3torchconnector.dcp.S3StorageWriter Package Contents ---------------- .. py:class:: S3FileSystem(region: str, s3_client: Optional[s3torchconnector._s3client.S3Client] = None) Bases: :py:obj:`torch.distributed.checkpoint.filesystem.FileSystemBase` Helper class that provides a standard way to create an ABC using inheritance. .. py:method:: create_stream(path: Union[str, os.PathLike], mode: str) -> Generator[io.IOBase, None, None] Create a stream for reading or writing to S3. :param path: The S3 path to read or write. :type path: Union[str, os.PathLike] :param mode: The mode for the stream. Supports 'rb' for read mode and 'wb' for write mode. :type mode: str :Yields: *io.BufferedIOBase* -- A stream for reading or writing to S3. :raises ValueError: If the mode is not 'rb' or 'wb'. .. py:method:: concat_path(path: Union[str, os.PathLike], suffix: str) -> str Concatenate a suffix to the given path. :param path: The base path. :type path: Union[str, os.PathLike] :param suffix: The suffix to concatenate. :type suffix: str :returns: The concatenated path. :rtype: str .. py:method:: init_path(path: Union[str, os.PathLike]) -> Union[str, os.PathLike] Initialize the path for the filesystem. :param path: The path to initialize. :type path: Union[str, os.PathLike] :returns: The initialized path. :rtype: Union[str, os.PathLike] .. py:method:: rename(old_path: Union[str, os.PathLike], new_path: Union[str, os.PathLike]) -> None Rename an object in S3. This is emulated by copying it to a new path and deleting the old path. The deletion part is retried (see also :func:`S3FileSystem._delete_with_retry`). :param old_path: The current path of the object. :type old_path: Union[str, os.PathLike] :param new_path: The new path for the object. :type new_path: Union[str, os.PathLike] :raises ValueError: If the old and new paths point to different buckets. :raises S3Exception: If there is an error with the S3 client. .. py:method:: mkdir(path: Union[str, os.PathLike]) -> None No-op method for creating directories in S3 (not needed). .. py:method:: exists(path: Union[str, os.PathLike]) -> bool .. py:method:: rm_file(path: Union[str, os.PathLike]) -> None .. py:method:: validate_checkpoint_id(checkpoint_id: Union[str, os.PathLike]) -> bool :classmethod: .. py:class:: S3StorageReader(region: str, path: Union[str, os.PathLike]) Bases: :py:obj:`torch.distributed.checkpoint.filesystem.FileSystemReader` Interface used by ``load_state_dict`` to read from storage. One StorageReader instance acts as both the coordinator and the follower in a distributed checkpoint. As part of initialization, each instance is told its role. A subclass should expected the following sequence of calls by ``load_state_dict``: 0) (all ranks) set checkpoint_id if users pass a valid checkpoint_id. 1) (all ranks) read_metadata() 2) (all ranks) set_up_storage_reader() 3) (all ranks) prepare_local_plan() 4) (coordinator) prepare_global_plan() 5) (all ranks) read_data() .. py:attribute:: fs .. py:attribute:: path :value: '' .. py:attribute:: sync_files :value: False .. py:method:: validate_checkpoint_id(checkpoint_id: Union[str, os.PathLike]) -> bool :classmethod: Check if the given checkpoint_id is supported by the stroage. This allow us to enable automatic storage selection. .. py:class:: S3StorageWriter(region: str, path: str, **kwargs) Bases: :py:obj:`torch.distributed.checkpoint.filesystem.FileSystemWriter` Basic implementation of StorageWriter using file IO. This implementation makes the following assumptions and simplifications: * The checkpoint path is an empty or non-existing directory. * File creation is atomic The checkpoint consist of one file per write request plus a `.metadata` file with the serialized metadata. .. py:attribute:: fs .. py:attribute:: path :value: '' .. py:method:: validate_checkpoint_id(checkpoint_id: Union[str, os.PathLike]) -> bool :classmethod: Check if the given checkpoint_id is supported by the stroage. This allow us to enable automatic storage selection.