s3torchconnector.s3reader.dcp_optimized

DCP-Optimized S3 Reader provides these 3 optimizations: 1. Selective data fetching with range coalescing to only fetch required byte ranges 2. Per-item buffer management to reduce buffer allocation costs 3. Eliminating buffer copy by storing S3 chunks as memoryview references

Data Flow Overview:
DCP.load(model_state_dict, storage_reader=s3_storage_reader)

-> read_metadata() # reads .metadata file -> set_up_storage_reader(metadata) # populates storage_data -> prepare_local_plan(plan) # (patched) sorts items, injects ranges to constructor

-> DCPOptimizedConstructor.set_item_ranges_by_file()

-> read_data(plan) # per-file loop below
-> DCPOptimizedS3Reader __init__

-> _validate_and_coalesce_ranges() # validates and groups ItemRanges into RangeGroups

-> DCPOptimizedS3Reader read()/readinto()

-> _find_item_for_range() # returns item for given read request -> [if new item] _get_item_buffer() # fetches item byte data

-> [if new RangeGroup] _get_stream_for_item() # creates new stream before fetching byte data -> 1: Handle leftover bytes from prev. chunk -> 2: Skip gap data from coalescing -> 3: Fetch remaining data from S3

-> _ItemViewBuffer read()/readinto() # returns data from buffer

Attributes

log

DEFAULT_MAX_GAP_SIZE

FIND_ITEM_ERROR_PREFIX

FALLBACK_GUIDANCE

Classes

ItemRange

Byte range for a single DCP ReadItem (tensor). Inclusive start, exclusive end.

RangeGroup

Group of nearby ItemRanges that will share a single S3 range request.

DCPOptimizedS3Reader

S3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading.

Module Contents

s3torchconnector.s3reader.dcp_optimized.log[source]
s3torchconnector.s3reader.dcp_optimized.DEFAULT_MAX_GAP_SIZE = 33554432[source]
s3torchconnector.s3reader.dcp_optimized.FIND_ITEM_ERROR_PREFIX = 'DCPOptimizedS3Reader only supports sequentially accessing provided ranges: '[source]
s3torchconnector.s3reader.dcp_optimized.FALLBACK_GUIDANCE = Multiline-String[source]
Show Value
"""If this error is encountered with the default DCP reader (S3ReaderConstructor.dcp_optimized()) added in s3torchconnector v1.5.0, please refer to the troubleshooting doc (https://github.com/awslabs/s3-connector-for-pytorch/blob/main/docs/TROUBLESHOOTING.md#dcpoptimizeds3reader-errors).
For unsupported or non-DCP access patterns, use the generic reader: S3StorageReader(region, path, reader_constructor=S3ReaderConstructor.default())"""
class s3torchconnector.s3reader.dcp_optimized.ItemRange[source]

Byte range for a single DCP ReadItem (tensor). Inclusive start, exclusive end.

start: int[source]
end: int[source]
class s3torchconnector.s3reader.dcp_optimized.RangeGroup[source]

Group of nearby ItemRanges that will share a single S3 range request.

Created by coalescing ItemRanges with gaps <= max_gap_size in _validate_and_coalesce_ranges. One S3 stream will serve all items in the RangeGroup sequentially.

start: int[source]
end: int[source]
item_ranges: List[ItemRange][source]
class s3torchconnector.s3reader.dcp_optimized.DCPOptimizedS3Reader(bucket: str, key: str, item_ranges: List[ItemRange], get_object_info: Callable[[], s3torchconnectorclient._mountpoint_s3_client.ObjectInfo | s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult], get_stream: Callable[[int | None, int | None], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream], max_gap_size: int | float = DEFAULT_MAX_GAP_SIZE)[source]

Bases: s3torchconnector.s3reader.s3reader.S3Reader

S3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading.

Provides up to 2x performance improvement over default sequential reader through:

1. Selective data fetching with range coalescing: Uses byte range information from PyTorch’s LoadPlan to only fetch required data. Groups nearby ranges within max_gap_size into single S3 streams to minimize first-byte latency while avoiding unnecessary data transfer.

2. Per-item buffer management: Buffers per-item (per-tensor) instead of per-file. Each buffer stores only the required item’s byte ranges and is discarded after PyTorch reads the item, which removes overhead of resizing large buffers and re-copying data repeatedly.

3. Eliminate buffer copy: Custom _ItemViewBuffer stores S3 chunks as memoryview references instead of copying into BytesIO, avoiding allocation and copy overhead.

Requirements:

  • DCP Loading - reader is only designed for usage via dcp_optimized reader_constructor for dcp.load()

  • Pre-sorted list of item_ranges, injected automatically in prepare_local_plan.

  • Sequential Access over exact item_ranges provided, also applied automatically by prepare_local_plan

Usage: Created automatically by DCPOptimizedConstructor when used with S3StorageReader and S3ReaderConstructor.dcp_optimized():

reader_constructor = S3ReaderConstructor.dcp_optimized(max_gap_size=32*1024*1024) storage_reader = S3StorageReader(region, path, reader_constructor=reader_constructor) DCP.load(state_dict, storage_reader=storage_reader)

Error Handling:

Non-sequential access attempts raise ValueError.

property bucket: str[source]
property key: str[source]
property closed: bool[source]

Returns: bool: Return whether the object is closed.

read(size: int | None = None) bytes[source]

Read up to size bytes from the current position.

Supports backward seeking within the current item buffer, but forward-only access across DCP items (sequential item access required).

Parameters:

size (int) – how many bytes to read.

Returns:

Bytes read from specified range.

Return type:

bytes

Raises:
  • TypeError – If size is not an integer.

  • ValueError – If position is outside valid DCP ranges, and if size is None or negative (full file reads not supported).

  • S3Exception – An error occurred accessing S3.

readinto(buf) int[source]

Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned.

Parameters:

buf – writable bytes-like object

Returns:

number of bytes read or zero, if no bytes available

Return type:

int

Raises:
  • ValueError – If position is outside valid DCP ranges.

  • TypeError – If buf is not writable.

  • S3Exception – An error occurred accessing S3.

seek(offset: int, whence: int = SEEK_SET, /) int[source]

Change position within DCP ranges, interpreted relative to whence.

Supports arbitrary seeking within current item buffer, but only forward sequential access across DCP items (cannot seek back to previous items).

Parameters:
  • offset (int) – How many bytes to seek relative to whence.

  • whence (int) – One of SEEK_SET, and SEEK_CUR. SEEK_END not supported. Default: SEEK_SET.

Returns:

Current position of the stream

Return type:

int

Raises:
  • TypeError – If whence is not SEEK_SET or SEEK_CUR.

  • ValueError – If seeking to negative position or accessing previous items.

tell() int[source]
Returns:

Current absolute position in the object.

Return type:

int

close() None[source]

Close the stream and release resources.