Source code for s3torchconnector.s3writer

#  Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#  // SPDX-License-Identifier: BSD

import io
from typing import Union

from s3torchconnectorclient._mountpoint_s3_client import PutObjectStream


[docs] class S3Writer(io.BufferedIOBase): """A write-only, file like representation of a single object stored in S3.""" def __init__(self, stream: PutObjectStream):
[docs] self.stream = stream
self._position = 0 def __enter__(self): self._position = 0 return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def write( self, # Ignoring the type for this as we don't currently support the Buffer protocol data: Union[bytes, memoryview], # type: ignore ) -> int: """Write bytes to S3 Object specified by bucket and key Args: data (bytes | memoryview): bytes to write Returns: int: Number of bytes written Raises: S3Exception: An error occurred accessing S3. """ if isinstance(data, memoryview): data = data.tobytes() self.stream.write(data) self._position += len(data) return len(data)
[docs] def close(self): """Close write-stream to S3. Ensures all bytes are written successfully. Raises: S3Exception: An error occurred accessing S3. """ self.stream.close()
[docs] def flush(self): """No-op""" pass
[docs] def readable(self) -> bool: """ Returns: bool: Return whether object was opened for reading. """ return False
[docs] def writable(self) -> bool: """ Returns: bool: Return whether object was opened for writing. """ return True
[docs] def tell(self) -> int: """ Returns: int: Current stream position. """ return self._position