s3torchconnector.dcp.s3_file_system

Attributes

logger

Classes

S3FileSystem

Helper class that provides a standard way to create an ABC using

S3StorageWriter

Basic implementation of StorageWriter using file IO.

S3StorageReader

Interface used by load_state_dict to read from storage.

Module Contents

s3torchconnector.dcp.s3_file_system.logger[source]
class s3torchconnector.dcp.s3_file_system.S3FileSystem(region: str, s3_client: s3torchconnector._s3client.S3Client | None = None)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemBase

Helper class that provides a standard way to create an ABC using inheritance.

create_stream(path: str | os.PathLike, mode: str) Generator[io.IOBase, None, None][source]

Create a stream for reading or writing to S3.

Parameters:
  • path (Union[str, os.PathLike]) – The S3 path to read or write.

  • mode (str) – The mode for the stream. Supports ‘rb’ for read mode and ‘wb’ for write mode.

Yields:

io.BufferedIOBase – A stream for reading or writing to S3.

Raises:

ValueError – If the mode is not ‘rb’ or ‘wb’.

concat_path(path: str | os.PathLike, suffix: str) str[source]

Concatenate a suffix to the given path.

Parameters:
  • path (Union[str, os.PathLike]) – The base path.

  • suffix (str) – The suffix to concatenate.

Returns:

The concatenated path.

Return type:

str

init_path(path: str | os.PathLike) str | os.PathLike[source]

Initialize the path for the filesystem.

Parameters:

path (Union[str, os.PathLike]) – The path to initialize.

Returns:

The initialized path.

Return type:

Union[str, os.PathLike]

rename(old_path: str | os.PathLike, new_path: str | os.PathLike) None[source]

Rename an object in S3.

This is emulated by copying it to a new path and deleting the old path. The deletion part is retried (see also S3FileSystem._delete_with_retry()).

Parameters:
  • old_path (Union[str, os.PathLike]) – The current path of the object.

  • new_path (Union[str, os.PathLike]) – The new path for the object.

Raises:
  • ValueError – If the old and new paths point to different buckets.

  • S3Exception – If there is an error with the S3 client.

mkdir(path: str | os.PathLike) None[source]

No-op method for creating directories in S3 (not needed).

exists(path: str | os.PathLike) bool[source]
rm_file(path: str | os.PathLike) None[source]
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]
class s3torchconnector.dcp.s3_file_system.S3StorageWriter(region: str, path: str, **kwargs)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemWriter

Basic implementation of StorageWriter using file IO.

This implementation makes the following assumptions and simplifications:

  • The checkpoint path is an empty or non-existing directory.

  • File creation is atomic

The checkpoint consist of one file per write request plus a .metadata file with the serialized metadata.

fs[source]
path = ''[source]
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]

Check if the given checkpoint_id is supported by the stroage. This allow us to enable automatic storage selection.

class s3torchconnector.dcp.s3_file_system.S3StorageReader(region: str, path: str | os.PathLike)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemReader

Interface used by load_state_dict to read from storage.

One StorageReader instance acts as both the coordinator and the follower in a distributed checkpoint. As part of initialization, each instance is told its role.

A subclass should expected the following sequence of calls by load_state_dict:

  1. (all ranks) set checkpoint_id if users pass a valid checkpoint_id.

  2. (all ranks) read_metadata()

  3. (all ranks) set_up_storage_reader()

  4. (all ranks) prepare_local_plan()

  5. (coordinator) prepare_global_plan()

  6. (all ranks) read_data()

fs[source]
path = ''[source]
sync_files = False[source]
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]

Check if the given checkpoint_id is supported by the stroage. This allow us to enable automatic storage selection.