s3torchconnector ================ .. py:module:: s3torchconnector Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/s3torchconnector/dcp/index /autoapi/s3torchconnector/lightning/index /autoapi/s3torchconnector/s3checkpoint/index /autoapi/s3torchconnector/s3iterable_dataset/index /autoapi/s3torchconnector/s3map_dataset/index /autoapi/s3torchconnector/s3reader/index /autoapi/s3torchconnector/s3writer/index Classes ------- .. autoapisummary:: s3torchconnector.S3Reader s3torchconnector.S3ReaderConstructor s3torchconnector.S3Writer s3torchconnector.S3IterableDataset s3torchconnector.S3MapDataset s3torchconnector.S3Checkpoint s3torchconnector.S3ClientConfig Package Contents ---------------- .. py:class:: S3Reader Bases: :py:obj:`abc.ABC`, :py:obj:`io.BufferedIOBase` An abstract base class for read-only, file-like representation of a single object stored in S3. This class defines the interface for S3 readers. Concrete implementations (SequentialS3Reader or RangedS3Reader extend this class. S3ReaderConstructor creates partial functions of these implementations, which are then completed by S3Client with the remaining required parameters. .. py:property:: bucket :type: str :abstractmethod: .. py:property:: key :type: str :abstractmethod: .. py:method:: read(size: Optional[int] = None) -> bytes :abstractmethod: Read and return up to n bytes. If the argument is omitted, None, or negative, reads and returns all data until EOF. If the argument is positive, and the underlying raw stream is not 'interactive', multiple raw reads may be issued to satisfy the byte count (unless EOF is reached first). But for interactive raw streams (as well as sockets and pipes), at most one raw read will be issued, and a short result does not imply that EOF is imminent. Returns an empty bytes object on EOF. Returns None if the underlying raw stream was open in non-blocking mode and no data is available at the moment. .. py:method:: seek(offset: int, whence: int = SEEK_SET, /) -> int :abstractmethod: Change stream position. Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by whence. Values for whence are: * 0 -- start of stream (the default); offset should be zero or positive * 1 -- current stream position; offset may be negative * 2 -- end of stream; offset is usually negative Return the new absolute position. .. py:method:: tell() -> int :abstractmethod: Return current stream position. .. py:method:: readinto(buf) -> int :abstractmethod: .. py:method:: seekable() -> bool :returns: Return whether object supports seek operations. :rtype: bool .. py:method:: readable() -> bool :returns: Return whether object was opened for reading. :rtype: bool .. py:method:: writable() -> bool :returns: Return whether object was opened for writing. :rtype: bool .. py:class:: S3ReaderConstructor Constructor for creating ``partial(S3Reader)`` instances. Creates partial ``S3Reader`` instances that will be completed by ``S3Client`` with the remaining required parameters (e.g. ``bucket``, ``key``, ``get_object_info``, ``get_stream``). The constructor provides factory methods for different reader types: - ``sequential()``: Creates a constructor for sequential readers that buffer the entire object. Best for full reads and repeated access. - ``range_based()``: Creates a constructor for range-based readers that fetch specific byte ranges. Suitable for sparse partial reads for large objects. .. py:method:: sequential() -> s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol :staticmethod: Creates a constructor for sequential readers :returns: Partial constructor for SequentialS3Reader :rtype: S3ReaderConstructorProtocol Example:: reader_constructor = S3ReaderConstructor.sequential() .. py:method:: range_based(buffer_size: Optional[int] = None) -> s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol :staticmethod: Creates a constructor for range-based readers :param buffer_size: Internal buffer size in bytes. If None, uses default 8MB. Set to 0 to disable buffering. :returns: Partial constructor for RangedS3Reader :rtype: S3ReaderConstructorProtocol Range-based reader performs byte-range requests to read specific portions of S3 objects without downloading the entire file. Buffer size affects read performance: * Small reads (< ``buffer_size``): Loads ``buffer_size`` bytes to buffer to reduce S3 API calls for small, sequential reads * Large reads (≥ ``buffer_size``): bypass the buffer for direct transfer from S3 * Forward overlap reads: Reuses buffered data when reading ranges that extend beyond current buffer, and processes remaining data according to size with logic above. Configuration Guide: * Use larger buffer sizes for workloads with many small, sequential reads of nearby bytes * Use smaller buffer sizes or disable buffering for sparse partial reads * Buffer can be disabled by setting ``buffer_size`` to 0 * If ``buffer_size`` is None, uses default 8MB buffer Examples:: # Range-based reader with default 8MB buffer reader_constructor = S3ReaderConstructor.range_based() # Range-based reader with custom buffer size reader_constructor = S3ReaderConstructor.range_based(buffer_size=16*1024*1024) # Range-based reader with buffering disabled reader_constructor = S3ReaderConstructor.range_based(buffer_size=0) .. py:method:: default() -> s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol :staticmethod: Creates default reader constructor (sequential) :returns: Partial constructor for SequentialS3Reader :rtype: S3ReaderConstructorProtocol .. py:method:: get_reader_type_string(constructor: Optional[s3torchconnector.s3reader.protocol.S3ReaderConstructorProtocol]) -> str :staticmethod: Returns the reader type string for the given constructor. .. py:class:: S3Writer(stream: s3torchconnectorclient._mountpoint_s3_client.PutObjectStream) Bases: :py:obj:`io.BufferedIOBase` A write-only, file like representation of a single object stored in S3. .. py:attribute:: stream .. py:method:: write(data: Union[bytes, memoryview]) -> int Write bytes to S3 Object specified by bucket and key :param data: bytes to write :type data: bytes | memoryview :returns: Number of bytes written :rtype: int :raises S3Exception: An error occurred accessing S3. :raises ValueError: If the writer is closed. .. py:method:: close() Close write-stream to S3. Ensures all bytes are written successfully. :raises S3Exception: An error occurred accessing S3. .. py:property:: closed :type: bool Returns: bool: Return whether the object is closed. .. py:method:: flush() No-op .. py:method:: readable() -> bool :returns: Return whether object was opened for reading. :rtype: bool .. py:method:: writable() -> bool :returns: Return whether object is open for writing. :rtype: bool .. py:method:: tell() -> int :returns: Current stream position. :rtype: int .. py:class:: S3IterableDataset(region: str, get_dataset_objects: Callable[[s3torchconnector._s3client.S3Client], Iterable[s3torchconnector._s3bucket_key_data.S3BucketKeyData]], endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, enable_sharding: bool = False, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) Bases: :py:obj:`torch.utils.data.IterableDataset` An IterableStyle dataset created from S3 objects. To create an instance of S3IterableDataset, you need to use `from_prefix` or `from_objects` methods. .. py:property:: region .. py:property:: endpoint .. py:method:: from_objects(object_uris: Union[str, Iterable[str]], *, region: str, endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, enable_sharding: bool = False, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) :classmethod: Returns an instance of S3IterableDataset using the S3 URI(s) provided. :param object_uris: S3 URI of the object(s) desired. :type object_uris: str | Iterable[str] :param region: AWS region of the S3 bucket where the objects are stored. :type region: str :param endpoint: AWS endpoint of the S3 bucket where the objects are stored. :type endpoint: str :param transform: Optional callable which is used to transform an S3Reader into the desired type. :param s3client_config: Optional S3ClientConfig with parameters for S3 client. :param enable_sharding: If True, shard the dataset across multiple workers for parallel data loading. If False (default), each worker loads the entire dataset independently. :param reader_constructor: Optional partial(S3Reader) created using S3ReaderConstructor e.g. S3ReaderConstructor.sequential() or S3ReaderConstructor.range_based() :type reader_constructor: Optional[S3ReaderConstructorProtocol] :returns: An IterableStyle dataset created from S3 objects. :rtype: S3IterableDataset :raises S3Exception: An error occurred accessing S3. .. py:method:: from_prefix(s3_uri: str, *, region: str, endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, enable_sharding: bool = False, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) :classmethod: Returns an instance of S3IterableDataset using the S3 URI provided. :param s3_uri: An S3 URI (prefix) of the object(s) desired. Objects matching the prefix will be included in the returned dataset. :type s3_uri: str :param region: AWS region of the S3 bucket where the objects are stored. :type region: str :param endpoint: AWS endpoint of the S3 bucket where the objects are stored. :type endpoint: str :param transform: Optional callable which is used to transform an S3Reader into the desired type. :param s3client_config: Optional S3ClientConfig with parameters for S3 client. :param enable_sharding: If True, shard the dataset across multiple workers for parallel data loading. If False (default), each worker loads the entire dataset independently. :param reader_constructor: Optional partial(S3Reader) created using S3ReaderConstructor e.g. S3ReaderConstructor.sequential() or S3ReaderConstructor.range_based() :type reader_constructor: Optional[S3ReaderConstructorProtocol] :returns: An IterableStyle dataset created from S3 objects. :rtype: S3IterableDataset :raises S3Exception: An error occurred accessing S3. .. py:class:: S3MapDataset(region: str, get_dataset_objects: Callable[[s3torchconnector._s3client.S3Client], Iterable[s3torchconnector._s3bucket_key_data.S3BucketKeyData]], endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) Bases: :py:obj:`torch.utils.data.Dataset` A Map-Style dataset created from S3 objects. To create an instance of S3MapDataset, you need to use `from_prefix` or `from_objects` methods. .. py:property:: region .. py:property:: endpoint .. py:method:: from_objects(object_uris: Union[str, Iterable[str]], *, region: str, endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) :classmethod: Returns an instance of S3MapDataset using the S3 URI(s) provided. :param object_uris: S3 URI of the object(s) desired. :type object_uris: str | Iterable[str] :param region: AWS region of the S3 bucket where the objects are stored. :type region: str :param endpoint: AWS endpoint of the S3 bucket where the objects are stored. :type endpoint: str :param transform: Optional callable which is used to transform an S3Reader into the desired type. :param s3client_config: Optional S3ClientConfig with parameters for S3 client. :param reader_constructor: Optional partial(S3Reader) created using S3ReaderConstructor e.g. S3ReaderConstructor.sequential() or S3ReaderConstructor.range_based() :type reader_constructor: Optional[S3ReaderConstructorProtocol] :returns: A Map-Style dataset created from S3 objects. :rtype: S3MapDataset :raises S3Exception: An error occurred accessing S3. .. py:method:: from_prefix(s3_uri: str, *, region: str, endpoint: Optional[str] = None, transform: Callable[[s3torchconnector.S3Reader], Any] = identity, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None, reader_constructor: Optional[s3torchconnector.s3reader.S3ReaderConstructorProtocol] = None) :classmethod: Returns an instance of S3MapDataset using the S3 URI provided. :param s3_uri: An S3 URI (prefix) of the object(s) desired. Objects matching the prefix will be included in the returned dataset. :type s3_uri: str :param region: AWS region of the S3 bucket where the objects are stored. :type region: str :param endpoint: AWS endpoint of the S3 bucket where the objects are stored. :type endpoint: str :param transform: Optional callable which is used to transform an S3Reader into the desired type. :param s3client_config: Optional S3ClientConfig with parameters for S3 client. :param reader_constructor: Optional partial(S3Reader) created using S3ReaderConstructor e.g. S3ReaderConstructor.sequential() or S3ReaderConstructor.range_based() :type reader_constructor: Optional[S3ReaderConstructorProtocol] :returns: A Map-Style dataset created from S3 objects. :rtype: S3MapDataset :raises S3Exception: An error occurred accessing S3. .. py:class:: S3Checkpoint(region: str, endpoint: Optional[str] = None, s3client_config: Optional[s3torchconnector._s3client.S3ClientConfig] = None) A checkpoint manager for S3. To read a checkpoint from S3, users need to create an S3Reader by providing s3_uri of the checkpoint stored in S3. Similarly, to save a checkpoint to S3, users need to create an S3Writer by providing s3_uri. S3Reader and S3Writer implements io.BufferedIOBase therefore, they can be passed to torch.load, and torch.save. .. py:attribute:: region .. py:attribute:: endpoint :value: None .. py:method:: reader(s3_uri: str) -> s3torchconnector.S3Reader Creates an S3Reader from a given s3_uri. :param s3_uri: A valid s3_uri. (i.e. s3:///) :type s3_uri: str :returns: a read-only binary stream of the S3 object's contents, specified by the s3_uri. :rtype: S3Reader :raises S3Exception: An error occurred accessing S3. .. py:method:: writer(s3_uri: str) -> s3torchconnector.S3Writer Creates an S3Writer from a given s3_uri. :param s3_uri: A valid s3_uri. (i.e. s3:///) :type s3_uri: str :returns: a write-only binary stream. The content is saved to S3 using the specified s3_uri. :rtype: S3Writer :raises S3Exception: An error occurred accessing S3. .. py:class:: S3ClientConfig A dataclass exposing configurable parameters for the S3 client. Args: throughput_target_gbps(float): Throughput target in Gigabits per second (Gbps) that we are trying to reach. 10.0 Gbps by default (may change in future). part_size(int): Size (bytes) of file parts that will be uploaded/downloaded. Note: for saving checkpoints, the inner client will adjust the part size to meet the service limits. (max number of parts per upload is 10,000, minimum upload part size is 5 MiB). Part size must have values between 5MiB and 5GiB. 8MiB by default (may change in future). unsigned(bool): Set to true to disable signing S3 requests. force_path_style(bool): forceful path style addressing for S3 client. max_attempts(int): amount of retry attempts for retrieable errors. profile(str): Profile name to use for S3 authentication. .. py:attribute:: throughput_target_gbps :type: float :value: 10.0 .. py:attribute:: part_size :type: int :value: 8388608 .. py:attribute:: unsigned :type: bool :value: False .. py:attribute:: force_path_style :type: bool :value: False .. py:attribute:: max_attempts :type: int :value: 10 .. py:attribute:: profile :type: Optional[str] :value: None