s3torchconnector.dcp.s3_file_system
Attributes
Classes
S3-based implementation of PyTorch's FileSystemBase for distributed checkpointing. |
|
Metadata for S3 storage prefix. |
|
S3 implementation of PyTorch's FileSystemWriter for distributed checkpoints. |
|
S3 implementation of PyTorch's FileSystemReader with configurable reader strategies. |
Module Contents
- class s3torchconnector.dcp.s3_file_system.S3FileSystem(region: str, s3_client: s3torchconnector._s3client.S3Client | None = None, s3client_config: s3torchconnector.S3ClientConfig | None = None, reader_constructor: s3torchconnector.s3reader.S3ReaderConstructorProtocol | None = None)[source]
Bases:
torch.distributed.checkpoint.filesystem.FileSystemBaseS3-based implementation of PyTorch’s FileSystemBase for distributed checkpointing.
- create_stream(path: str | os.PathLike, mode: str) Generator[io.IOBase, None, None][source]
Create a stream for reading or writing to S3.
- Parameters:
path (Union[str, os.PathLike]) – The S3 path to read or write.
mode (str) – The mode for the stream. Supports ‘rb’ for read mode and ‘wb’ for write mode.
- Yields:
io.BufferedIOBase – A stream for reading or writing to S3.
- Raises:
ValueError – If the mode is not ‘rb’ or ‘wb’.
- concat_path(path: str | os.PathLike, suffix: str) str[source]
Concatenate a suffix to the given path.
- Parameters:
path (Union[str, os.PathLike]) – The base path.
suffix (str) – The suffix to concatenate.
- Returns:
The concatenated path.
- Return type:
str
- init_path(path: str | os.PathLike) str | os.PathLike[source]
Initialize the path for the filesystem.
- Parameters:
path (Union[str, os.PathLike]) – The path to initialize.
- Returns:
The initialized path.
- Return type:
Union[str, os.PathLike]
- rename(old_path: str | os.PathLike, new_path: str | os.PathLike) None[source]
Rename an object in S3.
This is emulated by copying it to a new path and deleting the old path. The deletion part is retried (see also
S3FileSystem._delete_with_retry()).- Parameters:
old_path (Union[str, os.PathLike]) – The current path of the object.
new_path (Union[str, os.PathLike]) – The new path for the object.
- Raises:
ValueError – If the old and new paths point to different buckets.
S3Exception – If there is an error with the S3 client.
- class s3torchconnector.dcp.s3_file_system.S3StorageWriter(region: str, path: str, s3client_config: s3torchconnector.S3ClientConfig | None = None, prefix_strategy: s3torchconnector.dcp.s3_prefix_strategy.S3PrefixStrategyBase | None = None, thread_count: int = 1, **kwargs)[source]
Bases:
torch.distributed.checkpoint.filesystem.FileSystemWriterS3 implementation of PyTorch’s FileSystemWriter for distributed checkpoints.
- prepare_global_plan(plans: List[torch.distributed.checkpoint.planner.SavePlan]) List[torch.distributed.checkpoint.planner.SavePlan][source]
Prepare save plans with S3-specific storage metadata.
- Parameters:
plans – List of save plans to be processed.
- Returns:
Modified save plans with S3 storage metadata.
- class s3torchconnector.dcp.s3_file_system.S3StorageReader(region: str, path: str | os.PathLike, s3client_config: s3torchconnector.S3ClientConfig | None = None, reader_constructor: s3torchconnector.s3reader.S3ReaderConstructorProtocol | s3torchconnector.s3reader.DCPS3ReaderConstructorProtocol | None = None)[source]
Bases:
torch.distributed.checkpoint.filesystem.FileSystemReaderS3 implementation of PyTorch’s FileSystemReader with configurable reader strategies.
By default, uses DCPOptimizedS3Reader for improved checkpoint loading performance. For unsupported or non-DCP access patterns, please use the generic reader:
- storage_reader = S3StorageReader(
region, path, reader_constructor=S3ReaderConstructor.default()
)
- classmethod validate_checkpoint_id(checkpoint_id: str | os.PathLike) bool[source]
Check if the given checkpoint_id is supported by the storage. This allow us to enable automatic storage selection.
- prepare_local_plan(plan: torch.distributed.checkpoint.planner.LoadPlan) torch.distributed.checkpoint.planner.LoadPlan[source]
Performs two key optimizations:
Load Ordering: Sorts load items by storage offset to enable sequential access
2. Range Injection: Provides byte range metadata to DCP reader constructors to enable usage of DCPOptimizedS3Reader for range-based streams and range coalescing
- Parameters:
plan (LoadPlan) – The load plan from PyTorch DCP.
- Returns:
The same plan with items sorted by storage offset.
- Return type:
LoadPlan
Note
Both optimizations are required for DCPOptimizedS3Reader.