s3torchconnector.s3reader.dcp_optimized ======================================= .. py:module:: s3torchconnector.s3reader.dcp_optimized .. autoapi-nested-parse:: DCP-Optimized S3 Reader provides these 3 optimizations: 1. Selective data fetching with range coalescing to only fetch required byte ranges 2. Per-item buffer management to reduce buffer allocation costs 3. Eliminating buffer copy by storing S3 chunks as memoryview references Data Flow Overview: DCP.load(model_state_dict, storage_reader=s3_storage_reader) -> read_metadata() # reads .metadata file -> set_up_storage_reader(metadata) # populates storage_data -> prepare_local_plan(plan) # (patched) sorts items, injects ranges to constructor -> DCPOptimizedConstructor.set_item_ranges_by_file() -> read_data(plan) # per-file loop below -> DCPOptimizedS3Reader __init__ -> _validate_and_coalesce_ranges() # validates and groups ItemRanges into RangeGroups -> DCPOptimizedS3Reader read()/readinto() -> _find_item_for_range() # returns item for given read request -> [if new item] _get_item_buffer() # fetches item byte data -> [if new RangeGroup] _get_stream_for_item() # creates new stream before fetching byte data -> 1: Handle leftover bytes from prev. chunk -> 2: Skip gap data from coalescing -> 3: Fetch remaining data from S3 -> _ItemViewBuffer read()/readinto() # returns data from buffer Attributes ---------- .. autoapisummary:: s3torchconnector.s3reader.dcp_optimized.log s3torchconnector.s3reader.dcp_optimized.DEFAULT_MAX_GAP_SIZE s3torchconnector.s3reader.dcp_optimized.FIND_ITEM_ERROR_PREFIX s3torchconnector.s3reader.dcp_optimized.FALLBACK_GUIDANCE Classes ------- .. autoapisummary:: s3torchconnector.s3reader.dcp_optimized.ItemRange s3torchconnector.s3reader.dcp_optimized.RangeGroup s3torchconnector.s3reader.dcp_optimized.DCPOptimizedS3Reader Module Contents --------------- .. py:data:: log .. py:data:: DEFAULT_MAX_GAP_SIZE :value: 33554432 .. py:data:: FIND_ITEM_ERROR_PREFIX :value: 'DCPOptimizedS3Reader only supports sequentially accessing provided ranges: ' .. py:data:: FALLBACK_GUIDANCE :value: Multiline-String .. raw:: html
Show Value .. code-block:: python """If this error is encountered with the default DCP reader (S3ReaderConstructor.dcp_optimized()) added in s3torchconnector v1.5.0, please refer to the troubleshooting doc (https://github.com/awslabs/s3-connector-for-pytorch/blob/main/docs/TROUBLESHOOTING.md#dcpoptimizeds3reader-errors). For unsupported or non-DCP access patterns, use the generic reader: S3StorageReader(region, path, reader_constructor=S3ReaderConstructor.default())""" .. raw:: html
.. py:class:: ItemRange Byte range for a single DCP ReadItem (tensor). Inclusive start, exclusive end. .. py:attribute:: start :type: int .. py:attribute:: end :type: int .. py:class:: RangeGroup Group of nearby ItemRanges that will share a single S3 range request. Created by coalescing ItemRanges with gaps <= max_gap_size in _validate_and_coalesce_ranges. One S3 stream will serve all items in the RangeGroup sequentially. .. py:attribute:: start :type: int .. py:attribute:: end :type: int .. py:attribute:: item_ranges :type: List[ItemRange] .. py:class:: DCPOptimizedS3Reader(bucket: str, key: str, item_ranges: List[ItemRange], get_object_info: Callable[[], Union[s3torchconnectorclient._mountpoint_s3_client.ObjectInfo, s3torchconnectorclient._mountpoint_s3_client.HeadObjectResult]], get_stream: Callable[[Optional[int], Optional[int]], s3torchconnectorclient._mountpoint_s3_client.GetObjectStream], max_gap_size: Union[int, float] = DEFAULT_MAX_GAP_SIZE) Bases: :py:obj:`s3torchconnector.s3reader.s3reader.S3Reader` S3 reader implementation optimized for PyTorch Distributed Checkpoint (DCP) loading. Provides up to 2x performance improvement over default sequential reader through: 1. **Selective data fetching with range coalescing**: Uses byte range information from PyTorch's ``LoadPlan`` to only fetch required data. Groups nearby ranges within ``max_gap_size`` into single S3 streams to minimize first-byte latency while avoiding unnecessary data transfer. 2. **Per-item buffer management**: Buffers per-item (per-tensor) instead of per-file. Each buffer stores only the required item's byte ranges and is discarded after PyTorch reads the item, which removes overhead of resizing large buffers and re-copying data repeatedly. 3. **Eliminate buffer copy**: Custom ``_ItemViewBuffer`` stores S3 chunks as memoryview references instead of copying into BytesIO, avoiding allocation and copy overhead. **Requirements**: - DCP Loading - reader is only designed for usage via dcp_optimized reader_constructor for ``dcp.load()`` - Pre-sorted list of item_ranges, injected automatically in ``prepare_local_plan``. - Sequential Access over exact item_ranges provided, also applied automatically by ``prepare_local_plan`` **Usage**: Created automatically by ``DCPOptimizedConstructor`` when used with ``S3StorageReader`` and ``S3ReaderConstructor.dcp_optimized()``: reader_constructor = S3ReaderConstructor.dcp_optimized(max_gap_size=32*1024*1024) storage_reader = S3StorageReader(region, path, reader_constructor=reader_constructor) DCP.load(state_dict, storage_reader=storage_reader) **Error Handling**: Non-sequential access attempts raise ValueError. .. py:property:: bucket :type: str .. py:property:: key :type: str .. py:property:: closed :type: bool Returns: bool: Return whether the object is closed. .. py:method:: read(size: Optional[int] = None) -> bytes Read up to size bytes from the current position. Supports backward seeking within the current item buffer, but forward-only access across DCP items (sequential item access required). :param size: how many bytes to read. :type size: int :returns: Bytes read from specified range. :rtype: bytes :raises TypeError: If size is not an integer. :raises ValueError: If position is outside valid DCP ranges, and if size is None or negative (full file reads not supported). :raises S3Exception: An error occurred accessing S3. .. py:method:: readinto(buf) -> int Read up to len(buf) bytes into a pre-allocated, writable bytes-like object buf. Return the number of bytes read. If no bytes are available, zero is returned. :param buf: writable bytes-like object :returns: number of bytes read or zero, if no bytes available :rtype: int :raises ValueError: If position is outside valid DCP ranges. :raises TypeError: If buf is not writable. :raises S3Exception: An error occurred accessing S3. .. py:method:: seek(offset: int, whence: int = SEEK_SET, /) -> int Change position within DCP ranges, interpreted relative to whence. Supports arbitrary seeking within current item buffer, but only forward sequential access across DCP items (cannot seek back to previous items). :param offset: How many bytes to seek relative to whence. :type offset: int :param whence: One of SEEK_SET, and SEEK_CUR. SEEK_END not supported. Default: SEEK_SET. :type whence: int :returns: Current position of the stream :rtype: int :raises TypeError: If whence is not SEEK_SET or SEEK_CUR. :raises ValueError: If seeking to negative position or accessing previous items. .. py:method:: tell() -> int :returns: Current absolute position in the object. :rtype: int .. py:method:: close() -> None Close the stream and release resources.