s3torchconnector.dcp

Submodules

Classes

S3FileSystem

S3-based implementation of PyTorch's FileSystemBase for distributed checkpointing.

S3StorageReader

S3 implementation of PyTorch's FileSystemReader with configurable reader strategies.

S3StorageWriter

S3 implementation of PyTorch's FileSystemWriter for distributed checkpoints.

S3PrefixStrategyBase

Base class for S3 prefix generation strategies.

DefaultPrefixStrategy

Default strategy for generating S3 prefixes.

NumericPrefixStrategy

Base class for numeric prefix generation strategies.

BinaryPrefixStrategy

Binary (Base2) prefix generation strategy using only 0 and 1.

HexPrefixStrategy

Hexadecimal-based prefix generation strategy.

Package Contents

class s3torchconnector.dcp.S3FileSystem(region: str, s3_client: s3torchconnector._s3client.S3Client | None = None, s3client_config: s3torchconnector.S3ClientConfig | None = None, reader_constructor: s3torchconnector.s3reader.S3ReaderConstructorProtocol | None = None)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemBase

S3-based implementation of PyTorch’s FileSystemBase for distributed checkpointing.

create_stream(path: str | os.PathLike, mode: str) Generator[io.IOBase, None, None][source]

Create a stream for reading or writing to S3.

Parameters:
  • path (Union[str, os.PathLike]) – The S3 path to read or write.

  • mode (str) – The mode for the stream. Supports ‘rb’ for read mode and ‘wb’ for write mode.

Yields:

io.BufferedIOBase – A stream for reading or writing to S3.

Raises:

ValueError – If the mode is not ‘rb’ or ‘wb’.

concat_path(path: str | os.PathLike, suffix: str) str[source]

Concatenate a suffix to the given path.

Parameters:
  • path (Union[str, os.PathLike]) – The base path.

  • suffix (str) – The suffix to concatenate.

Returns:

The concatenated path.

Return type:

str

init_path(path: str | os.PathLike) str | os.PathLike[source]

Initialize the path for the filesystem.

Parameters:

path (Union[str, os.PathLike]) – The path to initialize.

Returns:

The initialized path.

Return type:

Union[str, os.PathLike]

rename(old_path: str | os.PathLike, new_path: str | os.PathLike) None[source]

Rename an object in S3.

This is emulated by copying it to a new path and deleting the old path. The deletion part is retried (see also S3FileSystem._delete_with_retry()).

Parameters:
  • old_path (Union[str, os.PathLike]) – The current path of the object.

  • new_path (Union[str, os.PathLike]) – The new path for the object.

Raises:
  • ValueError – If the old and new paths point to different buckets.

  • S3Exception – If there is an error with the S3 client.

mkdir(path: str | os.PathLike) None[source]

No-op method for creating directories in S3 (not needed).

exists(path: str | os.PathLike) bool[source]
rm_file(path: str | os.PathLike) None[source]
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]
class s3torchconnector.dcp.S3StorageReader(region: str, path: str | os.PathLike, s3client_config: s3torchconnector.S3ClientConfig | None = None, reader_constructor: s3torchconnector.s3reader.S3ReaderConstructorProtocol | s3torchconnector.s3reader.DCPS3ReaderConstructorProtocol | None = None)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemReader

S3 implementation of PyTorch’s FileSystemReader with configurable reader strategies.

By default, uses DCPOptimizedS3Reader for improved checkpoint loading performance. For unsupported or non-DCP access patterns, please use the generic reader:

storage_reader = S3StorageReader(

region, path, reader_constructor=S3ReaderConstructor.default()

)

fs: S3FileSystem
path = ''
sync_files = False
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]

Check if the given checkpoint_id is supported by the storage. This allow us to enable automatic storage selection.

prepare_local_plan(plan: torch.distributed.checkpoint.planner.LoadPlan) torch.distributed.checkpoint.planner.LoadPlan[source]

Performs two key optimizations:

  1. Load Ordering: Sorts load items by storage offset to enable sequential access

2. Range Injection: Provides byte range metadata to DCP reader constructors to enable usage of DCPOptimizedS3Reader for range-based streams and range coalescing

Parameters:

plan (LoadPlan) – The load plan from PyTorch DCP.

Returns:

The same plan with items sorted by storage offset.

Return type:

LoadPlan

Note

Both optimizations are required for DCPOptimizedS3Reader.

class s3torchconnector.dcp.S3StorageWriter(region: str, path: str, s3client_config: s3torchconnector.S3ClientConfig | None = None, prefix_strategy: s3torchconnector.dcp.s3_prefix_strategy.S3PrefixStrategyBase | None = None, thread_count: int = 1, **kwargs)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemWriter

S3 implementation of PyTorch’s FileSystemWriter for distributed checkpoints.

fs
path = ''
prefix_strategy
prepare_global_plan(plans: List[torch.distributed.checkpoint.planner.SavePlan]) List[torch.distributed.checkpoint.planner.SavePlan][source]

Prepare save plans with S3-specific storage metadata.

Parameters:

plans – List of save plans to be processed.

Returns:

Modified save plans with S3 storage metadata.

classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]

Check if the given checkpoint_id is supported by the storage. This allow us to enable automatic storage selection.

class s3torchconnector.dcp.S3PrefixStrategyBase[source]

Bases: abc.ABC

Base class for S3 prefix generation strategies.

abstract generate_prefix(rank: int) str[source]

Generate storage prefix for the given rank.

class s3torchconnector.dcp.DefaultPrefixStrategy[source]

Bases: S3PrefixStrategyBase

Default strategy for generating S3 prefixes.

generate_prefix(rank: int) str[source]

Generate simple rank-based name without prefix.

class s3torchconnector.dcp.NumericPrefixStrategy(base: int, epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]

Bases: S3PrefixStrategyBase

Base class for numeric prefix generation strategies.

base
epoch_num = None
min_prefix_len = 10
prefix_count = 1
prefix_map
generate_prefix(rank: int) str[source]

Generate numeric-based prefix with optional epoch number.

Parameters:

rank – Process rank in the distributed environment.

Returns:

<pattern>/epoch_<num>/__<rank>_ or <pattern>/__<rank>_ if no epoch number is provided.

Return type:

Prefix string in format

class s3torchconnector.dcp.BinaryPrefixStrategy(epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]

Bases: NumericPrefixStrategy

Binary (Base2) prefix generation strategy using only 0 and 1.

class s3torchconnector.dcp.HexPrefixStrategy(epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]

Bases: NumericPrefixStrategy

Hexadecimal-based prefix generation strategy.