s3torchconnector ================ .. py:module:: s3torchconnector Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/s3torchconnector/dcp/index /autoapi/s3torchconnector/lightning/index /autoapi/s3torchconnector/s3checkpoint/index /autoapi/s3torchconnector/s3iterable_dataset/index /autoapi/s3torchconnector/s3map_dataset/index /autoapi/s3torchconnector/s3reader/index /autoapi/s3torchconnector/s3writer/index Classes ------- .. autoapisummary:: s3torchconnector.S3Reader s3torchconnector.S3ReaderConstructor s3torchconnector.S3Writer s3torchconnector.S3IterableDataset s3torchconnector.S3MapDataset Package Contents ---------------- .. py:class:: S3Reader Bases: :py:obj:`abc.ABC`, :py:obj:`io.BufferedIOBase` An abstract base class for read-only, file-like representation of a single object stored in S3. This class defines the interface for S3 readers. Concrete implementations (SequentialS3Reader or RangedS3Reader extend this class. S3ReaderConstructor creates partial functions of these implementations, which are then completed by S3Client with the remaining required parameters. .. py:property:: bucket :type: str :abstractmethod: .. py:property:: key :type: str :abstractmethod: .. py:method:: read(size: Optional[int] = None) -> bytes :abstractmethod: Read and return up to n bytes. If the argument is omitted, None, or negative, reads and returns all data until EOF. If the argument is positive, and the underlying raw stream is not 'interactive', multiple raw reads may be issued to satisfy the byte count (unless EOF is reached first). But for interactive raw streams (as well as sockets and pipes), at most one raw read will be issued, and a short result does not imply that EOF is imminent. Returns an empty bytes object on EOF. Returns None if the underlying raw stream was open in non-blocking mode and no data is available at the moment. .. py:method:: seek(offset: int, whence: int = SEEK_SET, /) -> int :abstractmethod: Change stream position. Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by whence. Values for whence are: * 0 -- start of stream (the default); offset should be zero or positive * 1 -- current stream position; offset may be negative * 2 -- end of stream; offset is usually negative Return the new absolute position. .. py:method:: tell() -> int :abstractmethod: Return current stream position. .. py:method:: readinto(buf) -> int :abstractmethod: .. py:method:: seekable() -> bool :returns: Return whether object supports seek operations. :rtype: bool .. py:method:: readable() -> bool :returns: Return whether object was opened for reading. :rtype: bool .. py:method:: writable() -> bool :returns: Return whether object was opened for writing. :rtype: bool .. py:class:: S3ReaderConstructor Constructor for creating ``partial(S3Reader)`` instances. Creates partial ``S3Reader`` instances that will be completed by ``S3Client`` with the remaining required parameters (e.g. ``bucket``, ``key``, ``get_object_info``, ``get_stream``). The constructor provides factory methods for different reader types: - ``sequential()``: Creates a constructor for sequential readers that buffer the entire object. Best for full reads and repeated access. - ``range_based()``: Creates a constructor for range-based readers that fetch specific byte ranges. Suitable for sparse partial reads for large objects. .. py:method:: sequential() -> s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol :staticmethod: Creates a constructor for sequential (generic) readers. This reader is the generic reader that supports all access patterns. :returns: Partial constructor for SequentialS3Reader :rtype: S3ReaderConstructorProtocol Example:: reader_constructor = S3ReaderConstructor.sequential() .. py:method:: range_based(buffer_size: Optional[int] = None) -> s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol :staticmethod: Creates a constructor for range-based readers :param buffer_size: Internal buffer size in bytes. If None, uses default 8MB. Set to 0 to disable buffering. :returns: Partial constructor for RangedS3Reader :rtype: S3ReaderConstructorProtocol Range-based reader performs byte-range requests for each read/readinto call to read specific portions of S3 objects without downloading the entire file. Buffer size affects read performance: * Small reads (< ``buffer_size``): Loads ``buffer_size`` bytes to buffer to reduce S3 API calls for small, sequential reads * Large reads (≥ ``buffer_size``): bypass the buffer for direct transfer from S3 * Forward overlap reads: Reuses buffered data when reading ranges that extend beyond current buffer, and processes remaining data according to size with logic above. Configuration Guide: * Use larger buffer sizes for workloads with many small, sequential reads of nearby bytes * Use smaller buffer sizes or disable buffering for sparse partial reads * Buffer can be disabled by setting ``buffer_size`` to 0 * If ``buffer_size`` is None, uses default 8MB buffer Examples:: # Range-based reader with default 8MB buffer reader_constructor = S3ReaderConstructor.range_based() # Range-based reader with custom buffer size reader_constructor = S3ReaderConstructor.range_based(buffer_size=16*1024*1024) # Range-based reader with buffering disabled reader_constructor = S3ReaderConstructor.range_based(buffer_size=0) .. py:method:: dcp_optimized(max_gap_size: Union[int, float] = DEFAULT_MAX_GAP_SIZE) -> s3torchconnector.s3reader.protocol.DCPS3ReaderConstructorProtocol :staticmethod: Creates a constructor for DCP-optimized readers for faster checkpoint loading. The DCP-optimized reader provides performance improvements for DCP reading through: - Selective data fetching with range coalescing to only fetch required byte ranges - Per-item buffer management to reduce buffer allocation costs - Eliminating buffer copy by storing S3 chunks as memoryview references :param max_gap_size: Maximum gap size in bytes between ranges to coalesce into the same S3 read stream. Most users should use the default value. - Default: 32MB (``32 * 1024 * 1024``) - Use ``float("inf")`` to coalesce all ranges regardless of gaps - Use 0 to disable coalescing, which creates a new range-based stream for each gap :returns: Constructor that creates DCPOptimizedS3Reader when ranges are available, falling back to SequentialS3Reader otherwise. :rtype: DCPOptimizedConstructorProtocol Requirements: Should be used with S3StorageReader, in which ``prepare_local_plan()`` automatically handles: - Load ordering: Sorts items by storage offset for sequential access - Range injection: Provides byte ranges from DCP load plan to the reader Advanced users implementing custom readers must include these optimizations in their ``prepare_local_plan()``/``read_data()`` implementation to use the DCP-optimized reader. Example:: reader_constructor = S3ReaderConstructor.dcp_optimized() storage_reader = S3StorageReader(region, path, reader_constructor=reader_constructor) DCP.load(state_dict, storage_reader=storage_reader) .. py:method:: default() -> s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol :staticmethod: Creates the default generic reader constructor. This creates a sequential (generic) reader that supports all access patterns. :returns: Partial constructor for SequentialS3Reader :rtype: S3ReaderConstructorProtocol .. py:method:: get_reader_type_string(constructor: Optional[s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol]) -> str :staticmethod: Returns the reader type string for the given constructor. .. py:class:: S3Writer(stream: s3torchconnectorclient._mountpoint_s3_client.PutObjectStream) Bases: :py:obj:`io.BufferedIOBase` A write-only, file like representation of a single object stored in S3. .. py:attribute:: stream .. py:method:: write(data: Union[bytes, memoryview]) -> int Write bytes to S3 Object specified by bucket and key :param data: bytes to write :type data: bytes | memoryview :returns: Number of bytes written :rtype: int :raises S3Exception: An error occurred accessing S3. :raises ValueError: If the writer is closed. .. py:method:: close() Close write-stream to S3. Ensures all bytes are written successfully. :raises S3Exception: An error occurred accessing S3. .. py:property:: closed :type: bool Returns: bool: Return whether the object is closed. .. py:method:: flush() No-op .. py:method:: readable() -> bool :returns: Return whether object was opened for reading. :rtype: bool .. py:method:: writable() -> bool :returns: Return whether object is open for writing. :rtype: bool .. py:method:: tell() -> int :returns: Current stream position. :rtype: int .. py:class:: S3IterableDataset(region: str, get_dataset_objects: Callable[[s3torchconnector._s3client.S3Client], Iterable[s3torchconnector._s3bucket_key_data.S3BucketKeyData]], endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, enable_sharding: bool = False, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) Bases: :py:obj:`torch.utils.data.IterableDataset` An IterableStyle dataset created from S3 objects. To create an instance of S3IterableDataset, you need to use `from_prefix` or `from_objects` methods. .. py:property:: region .. py:property:: endpoint .. py:method:: from_objects(object_uris: Union[str, Iterable[str]], *, region: str, endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, enable_sharding: bool = False, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) :classmethod: Returns an instance of S3IterableDataset using the S3 URI(s) provided. :param object_uris: S3 URI of the object(s) desired. :type object_uris: str | Iterable[str] :param region: AWS region of the S3 bucket where the objects are stored. :type region: str :param endpoint: AWS endpoint of the S3 bucket where the objects are stored. :type endpoint: str :param transform: Optional callable which is used to transform an S3Reader into the desired type. :param s3client_config: Optional S3ClientConfig with parameters for S3 client. :param enable_sharding: If True, shard the dataset across multiple workers for parallel data loading. If False (default), each worker loads the entire dataset independently. :param reader_constructor: Optional partial(S3Reader) created using S3ReaderConstructor e.g. S3ReaderConstructor.sequential() or S3ReaderConstructor.range_based() :type reader_constructor: Optional[S3ReaderConstructorProtocol] :returns: An IterableStyle dataset created from S3 objects. :rtype: S3IterableDataset :raises S3Exception: An error occurred accessing S3. .. py:method:: from_prefix(s3_uri: str, *, region: str, endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, enable_sharding: bool = False, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) :classmethod: Returns an instance of S3IterableDataset using the S3 URI provided. :param s3_uri: An S3 URI (prefix) of the object(s) desired. Objects matching the prefix will be included in the returned dataset. :type s3_uri: str :param region: AWS region of the S3 bucket where the objects are stored. :type region: str :param endpoint: AWS endpoint of the S3 bucket where the objects are stored. :type endpoint: str :param transform: Optional callable which is used to transform an S3Reader into the desired type. :param s3client_config: Optional S3ClientConfig with parameters for S3 client. :param enable_sharding: If True, shard the dataset across multiple workers for parallel data loading. If False (default), each worker loads the entire dataset independently. :param reader_constructor: Optional partial(S3Reader) created using S3ReaderConstructor e.g. S3ReaderConstructor.sequential() or S3ReaderConstructor.range_based() :type reader_constructor: Optional[S3ReaderConstructorProtocol] :returns: An IterableStyle dataset created from S3 objects. :rtype: S3IterableDataset :raises S3Exception: An error occurred accessing S3. .. py:class:: S3MapDataset(region: str, get_dataset_objects: Callable[[s3torchconnector._s3client.S3Client], Iterable[s3torchconnector._s3bucket_key_data.S3BucketKeyData]], endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) Bases: :py:obj:`torch.utils.data.Dataset` A Map-Style dataset created from S3 objects. To create an instance of S3MapDataset, you need to use `from_prefix` or `from_objects` methods. .. py:property:: region .. py:property:: endpoint .. py:method:: from_objects(object_uris: Union[str, Iterable[str]], *, region: str, endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) :classmethod: Returns an instance of S3MapDataset using the S3 URI(s) provided. :param object_uris: S3 URI of the object(s) desired. :type object_uris: str | Iterable[str] :param region: AWS region of the S3 bucket where the objects are stored. :type region: str :param endpoint: AWS endpoint of the S3 bucket where the objects are stored. :type endpoint: str :param transform: Optional callable which is used to transform an S3Reader into the desired type. :param s3client_config: Optional S3ClientConfig with parameters for S3 client. :param reader_constructor: Optional partial(S3Reader) created using S3ReaderConstructor e.g. S3ReaderConstructor.sequential() or S3ReaderConstructor.range_based() :type reader_constructor: Optional[S3ReaderConstructorProtocol] :returns: A Map-Style dataset created from S3 objects. :rtype: S3MapDataset :raises S3Exception: An error occurred accessing S3. .. py:method:: from_prefix(s3_uri: str, *, region: str, endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) :classmethod: Returns an instance of S3MapDataset using the S3 URI provided. :param s3_uri: An S3 URI (prefix) of the object(s) desired. Objects matching the prefix will be included in the returned dataset. :type s3_uri: str :param region: AWS region of the S3 bucket where the objects are stored. :type region: str :param endpoint: AWS endpoint of the S3 bucket where the objects are stored. :type endpoint: str :param transform: Optional callable which is used to transform an S3Reader into the desired type. :param s3client_config: Optional S3ClientConfig with parameters for S3 client. :param reader_constructor: Optional partial(S3Reader) created using S3ReaderConstructor e.g. S3ReaderConstructor.sequential() or S3ReaderConstructor.range_based() :type reader_constructor: Optional[S3ReaderConstructorProtocol] :returns: A Map-Style dataset created from S3 objects. :rtype: S3MapDataset :raises S3Exception: An error occurred accessing S3.