s3torchconnector.dcp
Submodules
Classes
S3-based implementation of PyTorch's FileSystemBase for distributed checkpointing. |
|
S3 implementation of PyTorch's FileSystemReader with configurable reader strategies. |
|
S3 implementation of PyTorch's FileSystemWriter for distributed checkpoints. |
|
Base class for S3 prefix generation strategies. |
|
Default strategy for generating S3 prefixes. |
|
Base class for numeric prefix generation strategies. |
|
Binary (Base2) prefix generation strategy using only 0 and 1. |
|
Hexadecimal-based prefix generation strategy. |
Package Contents
- class s3torchconnector.dcp.S3FileSystem(region: str, s3_client: s3torchconnector._s3client.S3Client | None = None, s3client_config: s3torchconnector.S3ClientConfig | None = None, reader_constructor: s3torchconnector.s3reader.S3ReaderConstructorProtocol | None = None)[source]
Bases:
torch.distributed.checkpoint.filesystem.FileSystemBaseS3-based implementation of PyTorch’s FileSystemBase for distributed checkpointing.
- create_stream(path: str | os.PathLike, mode: str) Generator[io.IOBase, None, None][source]
Create a stream for reading or writing to S3.
- Parameters:
path (Union[str, os.PathLike]) – The S3 path to read or write.
mode (str) – The mode for the stream. Supports ‘rb’ for read mode and ‘wb’ for write mode.
- Yields:
io.BufferedIOBase – A stream for reading or writing to S3.
- Raises:
ValueError – If the mode is not ‘rb’ or ‘wb’.
- concat_path(path: str | os.PathLike, suffix: str) str[source]
Concatenate a suffix to the given path.
- Parameters:
path (Union[str, os.PathLike]) – The base path.
suffix (str) – The suffix to concatenate.
- Returns:
The concatenated path.
- Return type:
str
- init_path(path: str | os.PathLike) str | os.PathLike[source]
Initialize the path for the filesystem.
- Parameters:
path (Union[str, os.PathLike]) – The path to initialize.
- Returns:
The initialized path.
- Return type:
Union[str, os.PathLike]
- rename(old_path: str | os.PathLike, new_path: str | os.PathLike) None[source]
Rename an object in S3.
This is emulated by copying it to a new path and deleting the old path. The deletion part is retried (see also
S3FileSystem._delete_with_retry()).- Parameters:
old_path (Union[str, os.PathLike]) – The current path of the object.
new_path (Union[str, os.PathLike]) – The new path for the object.
- Raises:
ValueError – If the old and new paths point to different buckets.
S3Exception – If there is an error with the S3 client.
- class s3torchconnector.dcp.S3StorageReader(region: str, path: str | os.PathLike, s3client_config: s3torchconnector.S3ClientConfig | None = None, reader_constructor: s3torchconnector.s3reader.S3ReaderConstructorProtocol | s3torchconnector.s3reader.DCPS3ReaderConstructorProtocol | None = None)[source]
Bases:
torch.distributed.checkpoint.filesystem.FileSystemReaderS3 implementation of PyTorch’s FileSystemReader with configurable reader strategies.
By default, uses DCPOptimizedS3Reader for improved checkpoint loading performance. For unsupported or non-DCP access patterns, please use the generic reader:
- storage_reader = S3StorageReader(
region, path, reader_constructor=S3ReaderConstructor.default()
)
- fs: S3FileSystem
- path = ''
- sync_files = False
- classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]
Check if the given checkpoint_id is supported by the storage. This allow us to enable automatic storage selection.
- prepare_local_plan(plan: torch.distributed.checkpoint.planner.LoadPlan) torch.distributed.checkpoint.planner.LoadPlan[source]
Performs two key optimizations:
Load Ordering: Sorts load items by storage offset to enable sequential access
2. Range Injection: Provides byte range metadata to DCP reader constructors to enable usage of DCPOptimizedS3Reader for range-based streams and range coalescing
- Parameters:
plan (LoadPlan) – The load plan from PyTorch DCP.
- Returns:
The same plan with items sorted by storage offset.
- Return type:
LoadPlan
Note
Both optimizations are required for DCPOptimizedS3Reader.
- class s3torchconnector.dcp.S3StorageWriter(region: str, path: str, s3client_config: s3torchconnector.S3ClientConfig | None = None, prefix_strategy: s3torchconnector.dcp.s3_prefix_strategy.S3PrefixStrategyBase | None = None, thread_count: int = 1, **kwargs)[source]
Bases:
torch.distributed.checkpoint.filesystem.FileSystemWriterS3 implementation of PyTorch’s FileSystemWriter for distributed checkpoints.
- fs
- path = ''
- prefix_strategy
- prepare_global_plan(plans: List[torch.distributed.checkpoint.planner.SavePlan]) List[torch.distributed.checkpoint.planner.SavePlan][source]
Prepare save plans with S3-specific storage metadata.
- Parameters:
plans – List of save plans to be processed.
- Returns:
Modified save plans with S3 storage metadata.
- class s3torchconnector.dcp.S3PrefixStrategyBase[source]
Bases:
abc.ABCBase class for S3 prefix generation strategies.
- class s3torchconnector.dcp.DefaultPrefixStrategy[source]
Bases:
S3PrefixStrategyBaseDefault strategy for generating S3 prefixes.
- class s3torchconnector.dcp.NumericPrefixStrategy(base: int, epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]
Bases:
S3PrefixStrategyBaseBase class for numeric prefix generation strategies.
- base
- epoch_num = None
- min_prefix_len = 10
- prefix_count = 1
- prefix_map
- class s3torchconnector.dcp.BinaryPrefixStrategy(epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]
Bases:
NumericPrefixStrategyBinary (Base2) prefix generation strategy using only 0 and 1.
- class s3torchconnector.dcp.HexPrefixStrategy(epoch_num: int | None = None, min_prefix_length: int = 10, prefix_count: int | None = None)[source]
Bases:
NumericPrefixStrategyHexadecimal-based prefix generation strategy.