s3torchconnector.s3reader
Submodules
Classes
An abstract base class for read-only, file-like representation of a single object stored in S3. |
|
Constructor for creating |
|
Sequential S3 reader implementation |
|
Range-based S3 reader implementation with adaptive buffering. |
|
S3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading. |
Package Contents
- class s3torchconnector.s3reader.S3Reader[source]
Bases:
abc.ABC,io.BufferedIOBaseAn abstract base class for read-only, file-like representation of a single object stored in S3.
This class defines the interface for S3 readers. Concrete implementations (SequentialS3Reader or RangedS3Reader extend this class. S3ReaderConstructor creates partial functions of these implementations, which are then completed by S3Client with the remaining required parameters.
- property bucket: str
- Abstractmethod:
- property key: str
- Abstractmethod:
- abstract read(size: int | None = None) bytes[source]
Read and return up to n bytes.
If the argument is omitted, None, or negative, reads and returns all data until EOF.
If the argument is positive, and the underlying raw stream is not ‘interactive’, multiple raw reads may be issued to satisfy the byte count (unless EOF is reached first). But for interactive raw streams (as well as sockets and pipes), at most one raw read will be issued, and a short result does not imply that EOF is imminent.
Returns an empty bytes object on EOF.
Returns None if the underlying raw stream was open in non-blocking mode and no data is available at the moment.
- abstract seek(offset: int, whence: int = SEEK_SET, /) int[source]
Change stream position.
Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by whence. Values for whence are:
0 – start of stream (the default); offset should be zero or positive
1 – current stream position; offset may be negative
2 – end of stream; offset is usually negative
Return the new absolute position.
- class s3torchconnector.s3reader.S3ReaderConstructor[source]
Constructor for creating
partial(S3Reader)instances.Creates partial
S3Readerinstances that will be completed byS3Clientwith the remaining required parameters (e.g.bucket,key,get_object_info,get_stream).The constructor provides factory methods for different reader types:
sequential(): Creates a constructor for sequential readers that buffer the entire object. Best for full reads and repeated access.range_based(): Creates a constructor for range-based readers that fetch specific byte ranges. Suitable for sparse partial reads for large objects.
- static sequential() s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol[source]
Creates a constructor for sequential (generic) readers.
This reader is the generic reader that supports all access patterns.
- Returns:
Partial constructor for SequentialS3Reader
- Return type:
Example:
reader_constructor = S3ReaderConstructor.sequential()
- static range_based(buffer_size: int | None = None) s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol[source]
Creates a constructor for range-based readers
- Parameters:
buffer_size – Internal buffer size in bytes. If None, uses default 8MB. Set to 0 to disable buffering.
- Returns:
Partial constructor for RangedS3Reader
- Return type:
Range-based reader performs byte-range requests for each read/readinto call to read specific portions of S3 objects without downloading the entire file.
Buffer size affects read performance:
Small reads (<
buffer_size): Loadsbuffer_sizebytes to buffer to reduce S3 API calls for small, sequential readsLarge reads (≥
buffer_size): bypass the buffer for direct transfer from S3Forward overlap reads: Reuses buffered data when reading ranges that extend beyond current buffer, and processes remaining
data according to size with logic above.
Configuration Guide:
Use larger buffer sizes for workloads with many small, sequential reads of nearby bytes
Use smaller buffer sizes or disable buffering for sparse partial reads
Buffer can be disabled by setting
buffer_sizeto 0If
buffer_sizeis None, uses default 8MB buffer
Examples:
# Range-based reader with default 8MB buffer reader_constructor = S3ReaderConstructor.range_based() # Range-based reader with custom buffer size reader_constructor = S3ReaderConstructor.range_based(buffer_size=16*1024*1024) # Range-based reader with buffering disabled reader_constructor = S3ReaderConstructor.range_based(buffer_size=0)
- static dcp_optimized(max_gap_size: int | float = DEFAULT_MAX_GAP_SIZE) s3torchconnector.s3reader.protocol.DCPS3ReaderConstructorProtocol[source]
Creates a constructor for DCP-optimized readers for faster checkpoint loading.
The DCP-optimized reader provides performance improvements for DCP reading through:
Selective data fetching with range coalescing to only fetch required byte ranges
Per-item buffer management to reduce buffer allocation costs
Eliminating buffer copy by storing S3 chunks as memoryview references
- Parameters:
max_gap_size –
Maximum gap size in bytes between ranges to coalesce into the same S3 read stream. Most users should use the default value.
Default: 32MB (
32 * 1024 * 1024)Use
float("inf")to coalesce all ranges regardless of gapsUse 0 to disable coalescing, which creates a new range-based stream for each gap
- Returns:
Constructor that creates DCPOptimizedS3Reader when ranges are available, falling back to SequentialS3Reader otherwise.
- Return type:
DCPOptimizedConstructorProtocol
- Requirements:
Should be used with S3StorageReader, in which
prepare_local_plan()automatically handles:Load ordering: Sorts items by storage offset for sequential access
Range injection: Provides byte ranges from DCP load plan to the reader
Advanced users implementing custom readers must include these optimizations in their
prepare_local_plan()/read_data()implementation to use the DCP-optimized reader.
Example:
reader_constructor = S3ReaderConstructor.dcp_optimized() storage_reader = S3StorageReader(region, path, reader_constructor=reader_constructor) DCP.load(state_dict, storage_reader=storage_reader)
- static default() s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol[source]
Creates the default generic reader constructor.
This creates a sequential (generic) reader that supports all access patterns.
- Returns:
Partial constructor for SequentialS3Reader
- Return type:
- static get_reader_type_string(constructor: s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol | None) str[source]
Returns the reader type string for the given constructor.
- class s3torchconnector.s3reader.SequentialS3Reader(bucket: str, key: str, get_object_info: Callable[[], s3torchconnectorclient._mountpoint_s3_client.ObjectInfo | s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult], get_stream: Callable[[], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream])[source]
Bases:
s3torchconnector.s3reader.s3reader.S3ReaderSequential S3 reader implementation
Maintains an internal buffer for efficient sequential reads and repeated access. Optimal for most use cases, including full object reads.
- property bucket: str
- property key: str
- prefetch() None[source]
Start fetching data from S3.
- Raises:
S3Exception – An error occurred accessing S3.
- readinto(buf) int[source]
Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned.
- Parameters:
buf – writable bytes-like object
- Returns:
numer of bytes read or zero, if no bytes available
- Return type:
int
- read(size: int | None = None) bytes[source]
Read up to size bytes from the object and return them.
If size is zero or positive, read that many bytes from S3, or until the end of the object. If size is None or negative, read the entire file.
- Parameters:
size (int | None) – how many bytes to read.
- Returns:
Bytes read from S3 Object
- Return type:
bytes
- Raises:
S3Exception – An error occurred accessing S3.
- seek(offset: int, whence: int = SEEK_SET, /) int[source]
Change the stream position to the given byte offset, interpreted relative to whence.
When seeking beyond the end of the file, always stay at EOF. Seeking before the start of the file results in a ValueError.
- Parameters:
offset (int) – How many bytes to seek relative to whence.
whence (int) – One of SEEK_SET, SEEK_CUR, and SEEK_END. Default: SEEK_SET
- Returns:
Current position of the stream
- Return type:
int
- Raises:
S3Exception – An error occurred accessing S3.
- class s3torchconnector.s3reader.RangedS3Reader(bucket: str, key: str, get_object_info: Callable[[], s3torchconnectorclient._mountpoint_s3_client.ObjectInfo | s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult], get_stream: Callable[[int | None, int | None], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream], buffer_size: int | None = None)[source]
Bases:
s3torchconnector.s3reader.s3reader.S3ReaderRange-based S3 reader implementation with adaptive buffering.
Performs byte-range requests to read specific portions of S3 objects without downloading the entire file. Includes optional adaptive buffer to reduce S3 API calls for small, sequential reads while bypassing buffering for large reads. Optimal for sparse partial reads of large objects.
Buffering behavior:
Small reads (<
buffer_size): Loadsbuffer_sizebytes to buffer, copies to userLarge reads (>=
buffer_size): Direct S3 access, bypass bufferForward overlapping reads: Reuses existing buffer data if possible when read range extends beyond current buffer
Buffer can be disabled by setting
buffer_sizeto 0If
buffer_sizeis None, uses default 8MB buffer
- Parameters:
bucket – S3 bucket name
key – S3 object key
get_object_info – Callable that returns object metadata
get_stream – Callable that returns stream for byte range requests
buffer_size – Internal buffer size in bytes, defaults to 8MB
- property bucket: str
- property key: str
- readinto(buf) int[source]
Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned.
- Parameters:
buf – writable bytes-like object
- Returns:
numer of bytes read or zero, if no bytes available
- Return type:
int
- read(size: int | None = None) bytes[source]
Read up to size bytes from the current position.
If size is zero or positive, read that many bytes from S3, or until the end of the object. If size is None or negative, read until the end of the object.
- Parameters:
size (int | None) – how many bytes to read.
- Returns:
Bytes read from specified range.
- Return type:
bytes
- Raises:
S3Exception – An error occurred accessing S3.
- seek(offset: int, whence: int = SEEK_SET, /) int[source]
Change the stream position to the given byte offset, interpreted relative to whence.
When seeking beyond the end of the file, always stay at EOF. Seeking before the start of the file results in a ValueError.
- Parameters:
offset (int) – How many bytes to seek relative to whence.
whence (int) – One of SEEK_SET, SEEK_CUR, and SEEK_END. Default: SEEK_SET
- Returns:
Current position of the stream
- Return type:
int
- Raises:
S3Exception – An error occurred accessing S3.
- class s3torchconnector.s3reader.DCPOptimizedS3Reader(bucket: str, key: str, item_ranges: List[ItemRange], get_object_info: Callable[[], s3torchconnectorclient._mountpoint_s3_client.ObjectInfo | s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult], get_stream: Callable[[int | None, int | None], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream], max_gap_size: int | float = DEFAULT_MAX_GAP_SIZE)[source]
Bases:
s3torchconnector.s3reader.s3reader.S3ReaderS3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading.
Provides up to 2x performance improvement over default sequential reader through:
1. Selective data fetching with range coalescing: Uses byte range information from PyTorch’s
LoadPlanto only fetch required data. Groups nearby ranges withinmax_gap_sizeinto single S3 streams to minimize first-byte latency while avoiding unnecessary data transfer.2. Per-item buffer management: Buffers per-item (per-tensor) instead of per-file. Each buffer stores only the required item’s byte ranges and is discarded after PyTorch reads the item, which removes overhead of resizing large buffers and re-copying data repeatedly.
3. Eliminate buffer copy: Custom
_ItemViewBufferstores S3 chunks as memoryview references instead of copying into BytesIO, avoiding allocation and copy overhead.Requirements:
DCP Loading - reader is only designed for usage via dcp_optimized reader_constructor for
dcp.load()Pre-sorted list of item_ranges, injected automatically in
prepare_local_plan.Sequential Access over exact item_ranges provided, also applied automatically by
prepare_local_plan
Usage: Created automatically by
DCPOptimizedConstructorwhen used withS3StorageReaderandS3ReaderConstructor.dcp_optimized():reader_constructor = S3ReaderConstructor.dcp_optimized(max_gap_size=32*1024*1024) storage_reader = S3StorageReader(region, path, reader_constructor=reader_constructor) DCP.load(state_dict, storage_reader=storage_reader)
- Error Handling:
Non-sequential access attempts raise ValueError.
- property bucket: str
- property key: str
- property closed: bool
Returns: bool: Return whether the object is closed.
- read(size: int | None = None) bytes[source]
Read up to size bytes from the current position.
Supports backward seeking within the current item buffer, but forward-only access across DCP items (sequential item access required).
- Parameters:
size (int) – how many bytes to read.
- Returns:
Bytes read from specified range.
- Return type:
bytes
- Raises:
TypeError – If size is not an integer.
ValueError – If position is outside valid DCP ranges, and if size is None or negative (full file reads not supported).
S3Exception – An error occurred accessing S3.
- readinto(buf) int[source]
Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned.
- Parameters:
buf – writable bytes-like object
- Returns:
number of bytes read or zero, if no bytes available
- Return type:
int
- Raises:
ValueError – If position is outside valid DCP ranges.
TypeError – If buf is not writable.
S3Exception – An error occurred accessing S3.
- seek(offset: int, whence: int = SEEK_SET, /) int[source]
Change position within DCP ranges, interpreted relative to whence.
Supports arbitrary seeking within current item buffer, but only forward sequential access across DCP items (cannot seek back to previous items).
- Parameters:
offset (int) – How many bytes to seek relative to whence.
whence (int) – One of SEEK_SET, and SEEK_CUR. SEEK_END not supported. Default: SEEK_SET.
- Returns:
Current position of the stream
- Return type:
int
- Raises:
TypeError – If whence is not SEEK_SET or SEEK_CUR.
ValueError – If seeking to negative position or accessing previous items.