s3torchconnector.s3reader

Submodules

Classes

S3Reader

An abstract base class for read-only, file-like representation of a single object stored in S3.

S3ReaderConstructor

Constructor for creating partial(S3Reader) instances.

SequentialS3Reader

Sequential S3 reader implementation

RangedS3Reader

Range-based S3 reader implementation with adaptive buffering.

DCPOptimizedS3Reader

S3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading.

Package Contents

class s3torchconnector.s3reader.S3Reader[source]

Bases: abc.ABC, io.BufferedIOBase

An abstract base class for read-only, file-like representation of a single object stored in S3.

This class defines the interface for S3 readers. Concrete implementations (SequentialS3Reader or RangedS3Reader extend this class. S3ReaderConstructor creates partial functions of these implementations, which are then completed by S3Client with the remaining required parameters.

property bucket: str
Abstractmethod:

property key: str
Abstractmethod:

abstract read(size: int | None = None) bytes[source]

Read and return up to n bytes.

If the argument is omitted, None, or negative, reads and returns all data until EOF.

If the argument is positive, and the underlying raw stream is not ‘interactive’, multiple raw reads may be issued to satisfy the byte count (unless EOF is reached first). But for interactive raw streams (as well as sockets and pipes), at most one raw read will be issued, and a short result does not imply that EOF is imminent.

Returns an empty bytes object on EOF.

Returns None if the underlying raw stream was open in non-blocking mode and no data is available at the moment.

abstract seek(offset: int, whence: int = SEEK_SET, /) int[source]

Change stream position.

Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by whence. Values for whence are:

  • 0 – start of stream (the default); offset should be zero or positive

  • 1 – current stream position; offset may be negative

  • 2 – end of stream; offset is usually negative

Return the new absolute position.

abstract tell() int[source]

Return current stream position.

abstract readinto(buf) int[source]
seekable() bool[source]
Returns:

Return whether object supports seek operations.

Return type:

bool

readable() bool[source]
Returns:

Return whether object was opened for reading.

Return type:

bool

writable() bool[source]
Returns:

Return whether object was opened for writing.

Return type:

bool

class s3torchconnector.s3reader.S3ReaderConstructor[source]

Constructor for creating partial(S3Reader) instances.

Creates partial S3Reader instances that will be completed by S3Client with the remaining required parameters (e.g. bucket, key, get_object_info, get_stream).

The constructor provides factory methods for different reader types:

  • sequential(): Creates a constructor for sequential readers that buffer the entire object. Best for full reads and repeated access.

  • range_based(): Creates a constructor for range-based readers that fetch specific byte ranges. Suitable for sparse partial reads for large objects.

static sequential() s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol[source]

Creates a constructor for sequential (generic) readers.

This reader is the generic reader that supports all access patterns.

Returns:

Partial constructor for SequentialS3Reader

Return type:

S3ReaderConstructorProtocol

Example:

reader_constructor = S3ReaderConstructor.sequential()
static range_based(buffer_size: int | None = None) s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol[source]

Creates a constructor for range-based readers

Parameters:

buffer_size – Internal buffer size in bytes. If None, uses default 8MB. Set to 0 to disable buffering.

Returns:

Partial constructor for RangedS3Reader

Return type:

S3ReaderConstructorProtocol

Range-based reader performs byte-range requests for each read/readinto call to read specific portions of S3 objects without downloading the entire file.

Buffer size affects read performance:

  • Small reads (< buffer_size): Loads buffer_size bytes to buffer to reduce S3 API calls for small, sequential reads

  • Large reads (≥ buffer_size): bypass the buffer for direct transfer from S3

  • Forward overlap reads: Reuses buffered data when reading ranges that extend beyond current buffer, and processes remaining

data according to size with logic above.

Configuration Guide:

  • Use larger buffer sizes for workloads with many small, sequential reads of nearby bytes

  • Use smaller buffer sizes or disable buffering for sparse partial reads

  • Buffer can be disabled by setting buffer_size to 0

  • If buffer_size is None, uses default 8MB buffer

Examples:

# Range-based reader with default 8MB buffer
reader_constructor = S3ReaderConstructor.range_based()

# Range-based reader with custom buffer size
reader_constructor = S3ReaderConstructor.range_based(buffer_size=16*1024*1024)

# Range-based reader with buffering disabled
reader_constructor = S3ReaderConstructor.range_based(buffer_size=0)
static dcp_optimized(max_gap_size: int | float = DEFAULT_MAX_GAP_SIZE) s3torchconnector.s3reader.protocol.DCPS3ReaderConstructorProtocol[source]

Creates a constructor for DCP-optimized readers for faster checkpoint loading.

The DCP-optimized reader provides performance improvements for DCP reading through:

  • Selective data fetching with range coalescing to only fetch required byte ranges

  • Per-item buffer management to reduce buffer allocation costs

  • Eliminating buffer copy by storing S3 chunks as memoryview references

Parameters:

max_gap_size

Maximum gap size in bytes between ranges to coalesce into the same S3 read stream. Most users should use the default value.

  • Default: 32MB (32 * 1024 * 1024)

  • Use float("inf") to coalesce all ranges regardless of gaps

  • Use 0 to disable coalescing, which creates a new range-based stream for each gap

Returns:

Constructor that creates DCPOptimizedS3Reader when ranges are available, falling back to SequentialS3Reader otherwise.

Return type:

DCPOptimizedConstructorProtocol

Requirements:

Should be used with S3StorageReader, in which prepare_local_plan() automatically handles:

  • Load ordering: Sorts items by storage offset for sequential access

  • Range injection: Provides byte ranges from DCP load plan to the reader

Advanced users implementing custom readers must include these optimizations in their prepare_local_plan()/read_data() implementation to use the DCP-optimized reader.

Example:

reader_constructor = S3ReaderConstructor.dcp_optimized()
storage_reader = S3StorageReader(region, path, reader_constructor=reader_constructor)
DCP.load(state_dict, storage_reader=storage_reader)
static default() s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol[source]

Creates the default generic reader constructor.

This creates a sequential (generic) reader that supports all access patterns.

Returns:

Partial constructor for SequentialS3Reader

Return type:

S3ReaderConstructorProtocol

static get_reader_type_string(constructor: s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol | None) str[source]

Returns the reader type string for the given constructor.

class s3torchconnector.s3reader.SequentialS3Reader(bucket: str, key: str, get_object_info: Callable[[], s3torchconnectorclient._mountpoint_s3_client.ObjectInfo | s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult], get_stream: Callable[[], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream])[source]

Bases: s3torchconnector.s3reader.s3reader.S3Reader

Sequential S3 reader implementation

Maintains an internal buffer for efficient sequential reads and repeated access. Optimal for most use cases, including full object reads.

property bucket: str
property key: str
prefetch() None[source]

Start fetching data from S3.

Raises:

S3Exception – An error occurred accessing S3.

readinto(buf) int[source]

Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned.

Parameters:

buf – writable bytes-like object

Returns:

numer of bytes read or zero, if no bytes available

Return type:

int

read(size: int | None = None) bytes[source]

Read up to size bytes from the object and return them.

If size is zero or positive, read that many bytes from S3, or until the end of the object. If size is None or negative, read the entire file.

Parameters:

size (int | None) – how many bytes to read.

Returns:

Bytes read from S3 Object

Return type:

bytes

Raises:

S3Exception – An error occurred accessing S3.

seek(offset: int, whence: int = SEEK_SET, /) int[source]

Change the stream position to the given byte offset, interpreted relative to whence.

When seeking beyond the end of the file, always stay at EOF. Seeking before the start of the file results in a ValueError.

Parameters:
  • offset (int) – How many bytes to seek relative to whence.

  • whence (int) – One of SEEK_SET, SEEK_CUR, and SEEK_END. Default: SEEK_SET

Returns:

Current position of the stream

Return type:

int

Raises:

S3Exception – An error occurred accessing S3.

tell() int[source]
Returns:

Current stream position.

Return type:

int

class s3torchconnector.s3reader.RangedS3Reader(bucket: str, key: str, get_object_info: Callable[[], s3torchconnectorclient._mountpoint_s3_client.ObjectInfo | s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult], get_stream: Callable[[int | None, int | None], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream], buffer_size: int | None = None)[source]

Bases: s3torchconnector.s3reader.s3reader.S3Reader

Range-based S3 reader implementation with adaptive buffering.

Performs byte-range requests to read specific portions of S3 objects without downloading the entire file. Includes optional adaptive buffer to reduce S3 API calls for small, sequential reads while bypassing buffering for large reads. Optimal for sparse partial reads of large objects.

Buffering behavior:

  • Small reads (< buffer_size): Loads buffer_size bytes to buffer, copies to user

  • Large reads (>= buffer_size): Direct S3 access, bypass buffer

  • Forward overlapping reads: Reuses existing buffer data if possible when read range extends beyond current buffer

  • Buffer can be disabled by setting buffer_size to 0

  • If buffer_size is None, uses default 8MB buffer

Parameters:
  • bucket – S3 bucket name

  • key – S3 object key

  • get_object_info – Callable that returns object metadata

  • get_stream – Callable that returns stream for byte range requests

  • buffer_size – Internal buffer size in bytes, defaults to 8MB

property bucket: str
property key: str
readinto(buf) int[source]

Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned.

Parameters:

buf – writable bytes-like object

Returns:

numer of bytes read or zero, if no bytes available

Return type:

int

read(size: int | None = None) bytes[source]

Read up to size bytes from the current position.

If size is zero or positive, read that many bytes from S3, or until the end of the object. If size is None or negative, read until the end of the object.

Parameters:

size (int | None) – how many bytes to read.

Returns:

Bytes read from specified range.

Return type:

bytes

Raises:

S3Exception – An error occurred accessing S3.

seek(offset: int, whence: int = SEEK_SET, /) int[source]

Change the stream position to the given byte offset, interpreted relative to whence.

When seeking beyond the end of the file, always stay at EOF. Seeking before the start of the file results in a ValueError.

Parameters:
  • offset (int) – How many bytes to seek relative to whence.

  • whence (int) – One of SEEK_SET, SEEK_CUR, and SEEK_END. Default: SEEK_SET

Returns:

Current position of the stream

Return type:

int

Raises:

S3Exception – An error occurred accessing S3.

tell() int[source]
Returns:

Current stream position.

Return type:

int

class s3torchconnector.s3reader.DCPOptimizedS3Reader(bucket: str, key: str, item_ranges: List[ItemRange], get_object_info: Callable[[], s3torchconnectorclient._mountpoint_s3_client.ObjectInfo | s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult], get_stream: Callable[[int | None, int | None], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream], max_gap_size: int | float = DEFAULT_MAX_GAP_SIZE)[source]

Bases: s3torchconnector.s3reader.s3reader.S3Reader

S3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading.

Provides up to 2x performance improvement over default sequential reader through:

1. Selective data fetching with range coalescing: Uses byte range information from PyTorch’s LoadPlan to only fetch required data. Groups nearby ranges within max_gap_size into single S3 streams to minimize first-byte latency while avoiding unnecessary data transfer.

2. Per-item buffer management: Buffers per-item (per-tensor) instead of per-file. Each buffer stores only the required item’s byte ranges and is discarded after PyTorch reads the item, which removes overhead of resizing large buffers and re-copying data repeatedly.

3. Eliminate buffer copy: Custom _ItemViewBuffer stores S3 chunks as memoryview references instead of copying into BytesIO, avoiding allocation and copy overhead.

Requirements:

  • DCP Loading - reader is only designed for usage via dcp_optimized reader_constructor for dcp.load()

  • Pre-sorted list of item_ranges, injected automatically in prepare_local_plan.

  • Sequential Access over exact item_ranges provided, also applied automatically by prepare_local_plan

Usage: Created automatically by DCPOptimizedConstructor when used with S3StorageReader and S3ReaderConstructor.dcp_optimized():

reader_constructor = S3ReaderConstructor.dcp_optimized(max_gap_size=32*1024*1024) storage_reader = S3StorageReader(region, path, reader_constructor=reader_constructor) DCP.load(state_dict, storage_reader=storage_reader)

Error Handling:

Non-sequential access attempts raise ValueError.

property bucket: str
property key: str
property closed: bool

Returns: bool: Return whether the object is closed.

read(size: int | None = None) bytes[source]

Read up to size bytes from the current position.

Supports backward seeking within the current item buffer, but forward-only access across DCP items (sequential item access required).

Parameters:

size (int) – how many bytes to read.

Returns:

Bytes read from specified range.

Return type:

bytes

Raises:
  • TypeError – If size is not an integer.

  • ValueError – If position is outside valid DCP ranges, and if size is None or negative (full file reads not supported).

  • S3Exception – An error occurred accessing S3.

readinto(buf) int[source]

Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned.

Parameters:

buf – writable bytes-like object

Returns:

number of bytes read or zero, if no bytes available

Return type:

int

Raises:
  • ValueError – If position is outside valid DCP ranges.

  • TypeError – If buf is not writable.

  • S3Exception – An error occurred accessing S3.

seek(offset: int, whence: int = SEEK_SET, /) int[source]

Change position within DCP ranges, interpreted relative to whence.

Supports arbitrary seeking within current item buffer, but only forward sequential access across DCP items (cannot seek back to previous items).

Parameters:
  • offset (int) – How many bytes to seek relative to whence.

  • whence (int) – One of SEEK_SET, and SEEK_CUR. SEEK_END not supported. Default: SEEK_SET.

Returns:

Current position of the stream

Return type:

int

Raises:
  • TypeError – If whence is not SEEK_SET or SEEK_CUR.

  • ValueError – If seeking to negative position or accessing previous items.

tell() int[source]
Returns:

Current absolute position in the object.

Return type:

int

close() None[source]

Close the stream and release resources.