s3torchconnector.dcp

Submodules

Classes

S3FileSystem

Helper class that provides a standard way to create an ABC using

S3StorageReader

Interface used by load_state_dict to read from storage.

S3StorageWriter

Basic implementation of StorageWriter using file IO.

S3PrefixStrategyBase

Base class for S3 prefix generation strategies.

DefaultPrefixStrategy

Default strategy for generating S3 prefixes.

NumericPrefixStrategy

Base class for numeric prefix generation strategies.

BinaryPrefixStrategy

Binary (Base2) prefix generation strategy using only 0 and 1.

HexPrefixStrategy

Hexadecimal-based prefix generation strategy.

Package Contents

class s3torchconnector.dcp.S3FileSystem(region: str, s3_client: s3torchconnector._s3client.S3Client | None = None, s3client_config: s3torchconnector.S3ClientConfig | None = None)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemBase

Helper class that provides a standard way to create an ABC using inheritance.

create_stream(path: str | os.PathLike, mode: str) Generator[io.IOBase, None, None][source]

Create a stream for reading or writing to S3.

Parameters:
  • path (Union[str, os.PathLike]) – The S3 path to read or write.

  • mode (str) – The mode for the stream. Supports ‘rb’ for read mode and ‘wb’ for write mode.

Yields:

io.BufferedIOBase – A stream for reading or writing to S3.

Raises:

ValueError – If the mode is not ‘rb’ or ‘wb’.

concat_path(path: str | os.PathLike, suffix: str) str[source]

Concatenate a suffix to the given path.

Parameters:
  • path (Union[str, os.PathLike]) – The base path.

  • suffix (str) – The suffix to concatenate.

Returns:

The concatenated path.

Return type:

str

init_path(path: str | os.PathLike) str | os.PathLike[source]

Initialize the path for the filesystem.

Parameters:

path (Union[str, os.PathLike]) – The path to initialize.

Returns:

The initialized path.

Return type:

Union[str, os.PathLike]

rename(old_path: str | os.PathLike, new_path: str | os.PathLike) None[source]

Rename an object in S3.

This is emulated by copying it to a new path and deleting the old path. The deletion part is retried (see also S3FileSystem._delete_with_retry()).

Parameters:
  • old_path (Union[str, os.PathLike]) – The current path of the object.

  • new_path (Union[str, os.PathLike]) – The new path for the object.

Raises:
  • ValueError – If the old and new paths point to different buckets.

  • S3Exception – If there is an error with the S3 client.

mkdir(path: str | os.PathLike) None[source]

No-op method for creating directories in S3 (not needed).

exists(path: str | os.PathLike) bool[source]
rm_file(path: str | os.PathLike) None[source]
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]
class s3torchconnector.dcp.S3StorageReader(region: str, path: str | os.PathLike, s3client_config: s3torchconnector.S3ClientConfig | None = None)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemReader

Interface used by load_state_dict to read from storage.

One StorageReader instance acts as both the coordinator and the follower in a distributed checkpoint. As part of initialization, each instance is told its role.

A subclass should expected the following sequence of calls by load_state_dict:

  1. (all ranks) set checkpoint_id if users pass a valid checkpoint_id.

  2. (all ranks) read_metadata()

  3. (all ranks) set_up_storage_reader()

  4. (all ranks) prepare_local_plan()

  5. (coordinator) prepare_global_plan()

  6. (all ranks) read_data()

fs
path = ''
sync_files = False
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]

Check if the given checkpoint_id is supported by the stroage. This allow us to enable automatic storage selection.

class s3torchconnector.dcp.S3StorageWriter(region: str, path: str, s3client_config: s3torchconnector.S3ClientConfig | None = None, prefix_strategy: s3torchconnector.dcp.s3_prefix_strategy.S3PrefixStrategyBase | None = None, **kwargs)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemWriter

Basic implementation of StorageWriter using file IO.

This implementation makes the following assumptions and simplifications:

  • The checkpoint path is an empty or non-existing directory.

  • File creation is atomic

The checkpoint consist of one file per write request plus a .metadata file with the serialized metadata.

fs
path = ''
prefix_strategy
prepare_global_plan(plans: List[torch.distributed.checkpoint.planner.SavePlan]) List[torch.distributed.checkpoint.planner.SavePlan][source]

Prepare save plans with S3-specific storage metadata.

Parameters:

plans – List of save plans to be processed.

Returns:

Modified save plans with S3 storage metadata.

classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]

Check if the given checkpoint_id is supported by the stroage. This allow us to enable automatic storage selection.

class s3torchconnector.dcp.S3PrefixStrategyBase[source]

Bases: abc.ABC

Base class for S3 prefix generation strategies.

abstract generate_prefix(rank: int) str[source]

Generate storage prefix for the given rank.

class s3torchconnector.dcp.DefaultPrefixStrategy[source]

Bases: S3PrefixStrategyBase

Default strategy for generating S3 prefixes.

generate_prefix(rank: int) str[source]

Generate simple rank-based name without prefix.

class s3torchconnector.dcp.NumericPrefixStrategy(base: int, epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]

Bases: S3PrefixStrategyBase

Base class for numeric prefix generation strategies.

base
epoch_num = None
min_prefix_len = 10
prefix_count = 1
prefix_map
generate_prefix(rank: int) str[source]

Generate numeric-based prefix with optional epoch number.

Parameters:

rank – Process rank in the distributed environment.

Returns:

<pattern>/epoch_<num>/__<rank>_ or <pattern>/__<rank>_ if no epoch number is provided.

Return type:

Prefix string in format

class s3torchconnector.dcp.BinaryPrefixStrategy(epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]

Bases: NumericPrefixStrategy

Binary (Base2) prefix generation strategy using only 0 and 1.

class s3torchconnector.dcp.HexPrefixStrategy(epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]

Bases: NumericPrefixStrategy

Hexadecimal-based prefix generation strategy.