s3torchconnector.dcp.s3_file_system

Attributes

logger

Classes

S3FileSystem

S3-based implementation of PyTorch's FileSystemBase for distributed checkpointing.

StorageMetadata

Metadata for S3 storage prefix.

S3StorageWriter

S3 implementation of PyTorch's FileSystemWriter for distributed checkpoints.

S3StorageReader

S3 implementation of PyTorch's FileSystemReader with configurable reader strategies.

Module Contents

s3torchconnector.dcp.s3_file_system.logger[source]
class s3torchconnector.dcp.s3_file_system.S3FileSystem(region: str, s3_client: s3torchconnector._s3client.S3Client | None = None, s3client_config: s3torchconnector.S3ClientConfig | None = None, reader_constructor: s3torchconnector.s3reader.S3ReaderConstructorProtocol | None = None)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemBase

S3-based implementation of PyTorch’s FileSystemBase for distributed checkpointing.

create_stream(path: str | os.PathLike, mode: str) Generator[io.IOBase, None, None][source]

Create a stream for reading or writing to S3.

Parameters:
  • path (Union[str, os.PathLike]) – The S3 path to read or write.

  • mode (str) – The mode for the stream. Supports ‘rb’ for read mode and ‘wb’ for write mode.

Yields:

io.BufferedIOBase – A stream for reading or writing to S3.

Raises:

ValueError – If the mode is not ‘rb’ or ‘wb’.

concat_path(path: str | os.PathLike, suffix: str) str[source]

Concatenate a suffix to the given path.

Parameters:
  • path (Union[str, os.PathLike]) – The base path.

  • suffix (str) – The suffix to concatenate.

Returns:

The concatenated path.

Return type:

str

init_path(path: str | os.PathLike) str | os.PathLike[source]

Initialize the path for the filesystem.

Parameters:

path (Union[str, os.PathLike]) – The path to initialize.

Returns:

The initialized path.

Return type:

Union[str, os.PathLike]

rename(old_path: str | os.PathLike, new_path: str | os.PathLike) None[source]

Rename an object in S3.

This is emulated by copying it to a new path and deleting the old path. The deletion part is retried (see also S3FileSystem._delete_with_retry()).

Parameters:
  • old_path (Union[str, os.PathLike]) – The current path of the object.

  • new_path (Union[str, os.PathLike]) – The new path for the object.

Raises:
  • ValueError – If the old and new paths point to different buckets.

  • S3Exception – If there is an error with the S3 client.

mkdir(path: str | os.PathLike) None[source]

No-op method for creating directories in S3 (not needed).

exists(path: str | os.PathLike) bool[source]
rm_file(path: str | os.PathLike) None[source]
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]
class s3torchconnector.dcp.s3_file_system.StorageMetadata[source]

Metadata for S3 storage prefix.

prefix: str[source]
class s3torchconnector.dcp.s3_file_system.S3StorageWriter(region: str, path: str, s3client_config: s3torchconnector.S3ClientConfig | None = None, prefix_strategy: s3torchconnector.dcp.s3_prefix_strategy.S3PrefixStrategyBase | None = None, thread_count: int = 1, **kwargs)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemWriter

S3 implementation of PyTorch’s FileSystemWriter for distributed checkpoints.

fs[source]
path = ''[source]
prefix_strategy[source]
prepare_global_plan(plans: List[torch.distributed.checkpoint.planner.SavePlan]) List[torch.distributed.checkpoint.planner.SavePlan][source]

Prepare save plans with S3-specific storage metadata.

Parameters:

plans – List of save plans to be processed.

Returns:

Modified save plans with S3 storage metadata.

classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]

Check if the given checkpoint_id is supported by the storage. This allow us to enable automatic storage selection.

class s3torchconnector.dcp.s3_file_system.S3StorageReader(region: str, path: str | os.PathLike, s3client_config: s3torchconnector.S3ClientConfig | None = None, reader_constructor: s3torchconnector.s3reader.S3ReaderConstructorProtocol | s3torchconnector.s3reader.DCPS3ReaderConstructorProtocol | None = None)[source]

Bases: torch.distributed.checkpoint.filesystem.FileSystemReader

S3 implementation of PyTorch’s FileSystemReader with configurable reader strategies.

By default, uses DCPOptimizedS3Reader for improved checkpoint loading performance. For unsupported or non-DCP access patterns, please use the generic reader:

storage_reader = S3StorageReader(

region, path, reader_constructor=S3ReaderConstructor.default()

)

fs: S3FileSystem[source]
path = ''[source]
sync_files = False[source]
classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]

Check if the given checkpoint_id is supported by the storage. This allow us to enable automatic storage selection.

prepare_local_plan(plan: torch.distributed.checkpoint.planner.LoadPlan) torch.distributed.checkpoint.planner.LoadPlan[source]

Performs two key optimizations:

  1. Load Ordering: Sorts load items by storage offset to enable sequential access

2. Range Injection: Provides byte range metadata to DCP reader constructors to enable usage of DCPOptimizedS3Reader for range-based streams and range coalescing

Parameters:

plan (LoadPlan) – The load plan from PyTorch DCP.

Returns:

The same plan with items sorted by storage offset.

Return type:

LoadPlan

Note

Both optimizations are required for DCPOptimizedS3Reader.