s3torchconnector.s3reader.dcp_optimized
DCP-Optimized S3 Reader provides these 3 optimizations: 1. Selective data fetching with range coalescing to only fetch required byte ranges 2. Per-item buffer management to reduce buffer allocation costs 3. Eliminating buffer copy by storing S3 chunks as memoryview references
- Data Flow Overview:
- DCP.load(model_state_dict, storage_reader=s3_storage_reader)
-> read_metadata() # reads .metadata file -> set_up_storage_reader(metadata) # populates storage_data -> prepare_local_plan(plan) # (patched) sorts items, injects ranges to constructor
-> DCPOptimizedConstructor.set_item_ranges_by_file()
- -> read_data(plan) # per-file loop below
- -> DCPOptimizedS3Reader __init__
-> _validate_and_coalesce_ranges() # validates and groups ItemRanges into RangeGroups
- -> DCPOptimizedS3Reader read()/readinto()
-> _find_item_for_range() # returns item for given read request -> [if new item] _get_item_buffer() # fetches item byte data
-> [if new RangeGroup] _get_stream_for_item() # creates new stream before fetching byte data -> 1: Handle leftover bytes from prev. chunk -> 2: Skip gap data from coalescing -> 3: Fetch remaining data from S3
-> _ItemViewBuffer read()/readinto() # returns data from buffer
Attributes
Classes
Byte range for a single DCP ReadItem (tensor). Inclusive start, exclusive end. |
|
Group of nearby ItemRanges that will share a single S3 range request. |
|
S3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading. |
Module Contents
- s3torchconnector.s3reader.dcp_optimized.FIND_ITEM_ERROR_PREFIX = 'DCPOptimizedS3Reader only supports sequentially accessing provided ranges: '[source]
- s3torchconnector.s3reader.dcp_optimized.FALLBACK_GUIDANCE = Multiline-String[source]
Show Value
"""If this error is encountered with the default DCP reader (S3ReaderConstructor.dcp_optimized()) added in s3torchconnector v1.5.0, please refer to the troubleshooting doc (https://github.com/awslabs/s3-connector-for-pytorch/blob/main/docs/TROUBLESHOOTING.md#dcpoptimizeds3reader-errors). For unsupported or non-DCP access patterns, use the generic reader: S3StorageReader(region, path, reader_constructor=S3ReaderConstructor.default())"""
- class s3torchconnector.s3reader.dcp_optimized.ItemRange[source]
Byte range for a single DCP ReadItem (tensor). Inclusive start, exclusive end.
- class s3torchconnector.s3reader.dcp_optimized.RangeGroup[source]
Group of nearby ItemRanges that will share a single S3 range request.
Created by coalescing ItemRanges with gaps <= max_gap_size in _validate_and_coalesce_ranges. One S3 stream will serve all items in the RangeGroup sequentially.
- class s3torchconnector.s3reader.dcp_optimized.DCPOptimizedS3Reader(bucket: str, key: str, item_ranges: List[ItemRange], get_object_info: Callable[[], s3torchconnectorclient._mountpoint_s3_client.ObjectInfo | s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult], get_stream: Callable[[int | None, int | None], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream], max_gap_size: int | float = DEFAULT_MAX_GAP_SIZE)[source]
Bases:
s3torchconnector.s3reader.s3reader.S3ReaderS3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading.
Provides up to 2x performance improvement over default sequential reader through:
1. Selective data fetching with range coalescing: Uses byte range information from PyTorch’s
LoadPlanto only fetch required data. Groups nearby ranges withinmax_gap_sizeinto single S3 streams to minimize first-byte latency while avoiding unnecessary data transfer.2. Per-item buffer management: Buffers per-item (per-tensor) instead of per-file. Each buffer stores only the required item’s byte ranges and is discarded after PyTorch reads the item, which removes overhead of resizing large buffers and re-copying data repeatedly.
3. Eliminate buffer copy: Custom
_ItemViewBufferstores S3 chunks as memoryview references instead of copying into BytesIO, avoiding allocation and copy overhead.Requirements:
DCP Loading - reader is only designed for usage via dcp_optimized reader_constructor for
dcp.load()Pre-sorted list of item_ranges, injected automatically in
prepare_local_plan.Sequential Access over exact item_ranges provided, also applied automatically by
prepare_local_plan
Usage: Created automatically by
DCPOptimizedConstructorwhen used withS3StorageReaderandS3ReaderConstructor.dcp_optimized():reader_constructor = S3ReaderConstructor.dcp_optimized(max_gap_size=32*1024*1024) storage_reader = S3StorageReader(region, path, reader_constructor=reader_constructor) DCP.load(state_dict, storage_reader=storage_reader)
- Error Handling:
Non-sequential access attempts raise ValueError.
- read(size: int | None = None) bytes[source]
Read up to size bytes from the current position.
Supports backward seeking within the current item buffer, but forward-only access across DCP items (sequential item access required).
- Parameters:
size (int) – how many bytes to read.
- Returns:
Bytes read from specified range.
- Return type:
bytes
- Raises:
TypeError – If size is not an integer.
ValueError – If position is outside valid DCP ranges, and if size is None or negative (full file reads not supported).
S3Exception – An error occurred accessing S3.
- readinto(buf) int[source]
Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned.
- Parameters:
buf – writable bytes-like object
- Returns:
number of bytes read or zero, if no bytes available
- Return type:
int
- Raises:
ValueError – If position is outside valid DCP ranges.
TypeError – If buf is not writable.
S3Exception – An error occurred accessing S3.
- seek(offset: int, whence: int = SEEK_SET, /) int[source]
Change position within DCP ranges, interpreted relative to whence.
Supports arbitrary seeking within current item buffer, but only forward sequential access across DCP items (cannot seek back to previous items).
- Parameters:
offset (int) – How many bytes to seek relative to whence.
whence (int) – One of SEEK_SET, and SEEK_CUR. SEEK_END not supported. Default: SEEK_SET.
- Returns:
Current position of the stream
- Return type:
int
- Raises:
TypeError – If whence is not SEEK_SET or SEEK_CUR.
ValueError – If seeking to negative position or accessing previous items.