awscrt.eventstream

event-stream library for awscrt.

class awscrt.eventstream.HeaderType(value)

Supported types for the value within a Header

BOOL_TRUE = 0

Value is True.

No actual value is transmitted on the wire.

BOOL_FALSE = 1

Value is False.

No actual value is transmitted on the wire.

BYTE = 2

Value is signed 8-bit int.

INT16 = 3

Value is signed 16-bit int.

INT32 = 4

Value is signed 32-bit int.

INT64 = 5

Value is signed 64-bit int.

BYTE_BUF = 6

Value is raw bytes.

STRING = 7

Value is a str.

Transmitted on the wire as utf-8

TIMESTAMP = 8

Value is a posix timestamp (seconds since Unix epoch).

Transmitted on the wire as a 64-bit int

UUID = 9

Value is a UUID.

Transmitted on the wire as 16 bytes

class awscrt.eventstream.Header(name: str, value: Any, header_type: HeaderType)

A header in an event-stream message.

Each header has a name, value, and type. HeaderType enumerates the supported value types.

Create a header with one of the Header.from_X() functions.

classmethod from_bool(name: str, value: bool) Header

Create a Header of type BOOL_TRUE or BOOL_FALSE

classmethod from_byte(name: str, value: int) Header

Create a Header of type BYTE

The value must fit in an 8-bit signed int

classmethod from_int16(name: str, value: int) Header

Create a Header of type INT16

The value must fit in an 16-bit signed int

classmethod from_int32(name: str, value: int) Header

Create a Header of type INT32

The value must fit in an 32-bit signed int

classmethod from_int64(name: str, value: int) Header

Create a Header of type INT64

The value must fit in an 64-bit signed int

classmethod from_byte_buf(name: str, value: bytes | bytearray) Header

Create a Header of type BYTE_BUF

The value must be a bytes-like object

classmethod from_string(name: str, value: str) Header

Create a Header of type STRING

classmethod from_timestamp(name: str, value: int) Header

Create a Header of type TIMESTAMP

Value must be a posix timestamp (seconds since Unix epoch)

classmethod from_uuid(name: str, value: UUID) Header

Create a Header of type UUID

The value must be a UUID

property name: str

Header name

property type: HeaderType

Header type

property value: Any

Header value

The header’s type determines the value’s type. Use the value_as_X() methods for type-checked queries.

value_as_bool() bool

Return bool value

Raises an exception if type is not BOOL_TRUE or BOOL_FALSE

value_as_byte() int

Return value of 8-bit signed int

Raises an exception if type is not BYTE

value_as_int16() int

Return value of 16-bit signed int

Raises an exception if type is not INT16

value_as_int32() int

Return value of 32-bit signed int

Raises an exception if type is not INT32

value_as_int64() int

Return value of 64-bit signed int

Raises an exception if type is not INT64

value_as_byte_buf() bytes | bytearray

Return value of bytes

Raises an exception if type is not BYTE_BUF

value_as_string() str

Return value of string

Raises an exception if type is not STRING

value_as_timestamp() int

Return value of timestamp (seconds since Unix epoch)

Raises an exception if type is not TIMESTAMP

value_as_uuid() UUID

Return value of UUID

Raises an exception if type is not UUID

event-stream RPC (remote procedure call) protocol library for awscrt.

class awscrt.eventstream.rpc.MessageType(value)

Types of messages in the event-stream RPC protocol.

The APPLICATION_MESSAGE and APPLICATION_ERROR types may only be sent on streams, and will never arrive as a protocol message (stream-id 0).

For all other message types, they may only be sent as protocol messages (stream-id 0), and will never arrive as a stream message.

Different message types expect specific headers and flags, consult documentation.

APPLICATION_MESSAGE = 0

Application message

APPLICATION_ERROR = 1

Application error

PING = 2

Ping

PING_RESPONSE = 3

Ping response

CONNECT = 4

Connect

CONNECT_ACK = 5

Connect acknowledgement

If the MessageFlag.CONNECTION_ACCEPTED flag is not present, the connection has been rejected.

PROTOCOL_ERROR = 6

Protocol error

INTERNAL_ERROR = 7

Internal error

class awscrt.eventstream.rpc.MessageFlag

Flags for messages in the event-stream RPC protocol.

Flags may be XORed together. Not all flags can be used with all message types, consult documentation.

NONE = 0

No flags

CONNECTION_ACCEPTED = 1

Connection accepted

If this flag is absent from a MessageType.CONNECT_ACK, the connection has been rejected.

TERMINATE_STREAM = 2

Terminate stream

This message may be used with any message type. The sender will close their connection after the message is written to the wire. The receiver will close their connection after delivering the message to the user.

class awscrt.eventstream.rpc.ClientConnectionHandler

Base class for handling connection events.

Inherit from this class and override methods to handle connection events. All callbacks for this connection will be invoked on the same thread, and on_connection_setup() will always be the first callback invoked.

abstract on_connection_setup(connection, error, **kwargs) None

Invoked upon completion of the setup attempt.

If setup was successful, the connection is provided to the user.

Note that the network connection stays alive until it is closed, even if no local references to the connection object remain. The user should store a reference to this connection, and call connection.close() when they are done with it to avoid leaking resources.

Setup will always be the first callback invoked on the handler. If setup failed, no further callbacks will be invoked on this handler.

Parameters:
  • connection – The connection, if setup was successful, or None if setup failed.

  • error – None, if setup was successful, or an Exception if setup failed.

  • **kwargs – Forward compatibility kwargs.

abstract on_connection_shutdown(reason: Exception | None, **kwargs) None

Invoked when the connection finishes shutting down.

This event will not be invoked if connection setup failed.

Parameters:
  • reason – Reason will be None if the user initiated the shutdown, otherwise the reason will be an Exception.

  • **kwargs – Forward compatibility kwargs.

abstract on_protocol_message(headers: Sequence[Header], payload: bytes, message_type: MessageType, flags: int, **kwargs) None

Invoked when a message for the connection (stream-id 0) is received.

Parameters:
  • headers – Message headers.

  • payload – Binary message payload.

  • message_type – Message type.

  • flags – Message flags. Values from MessageFlag may be XORed together. Not all flags can be used with all message types, consult documentation.

  • **kwargs – Forward compatibility kwargs.

class awscrt.eventstream.rpc.ClientConnection(host_name, port, handler)

A client connection for the event-stream RPC protocol.

Use ClientConnection.connect() to establish a new connection.

Note that the network connection stays alive until it is closed, even if no local references to the connection object remain. The user should store a reference to any connections, and call close() when they are done with them to avoid leaking resources.

host_name

Remote host name.

Type:

str

port

Remote port.

Type:

int

shutdown_future

Completes when this connection has finished shutting down. Future will contain a result of None, or an exception indicating why shutdown occurred.

Type:

concurrent.futures.Future[None]

classmethod connect(*, handler: ClientConnectionHandler, host_name: str, port: int, bootstrap: ClientBootstrap = None, socket_options: SocketOptions | None = None, tls_connection_options: TlsConnectionOptions | None = None) concurrent.futures.Future

Asynchronously establish a new ClientConnection.

Parameters:
  • handler – Handler for connection events.

  • host_name – Connect to host.

  • port – Connect to port.

  • bootstrap – Client bootstrap to use when initiating socket connection. If None is provided, the default singleton is used.

  • socket_options – Optional socket options. If None is provided, then default options are used.

  • tls_connection_options – Optional TLS connection options. If None is provided, then the connection will be attempted over plain-text.

Returns:

concurrent.futures.Future – A Future which completes when the connection succeeds or fails. If successful, the Future will contain None. Otherwise it will contain an exception. If the connection is successful, it will be made available via the handler’s on_connection_setup callback. Note that this network connection stays alive until it is closed, even if no local references to the connection object remain. The user should store a reference to any connections, and call close() when they are done with them to avoid leaking resources.

close()

Close the connection.

Shutdown is asynchronous. This call has no effect if the connection is already closed or closing.

Note that, if the network connection hasn’t already ended, close() MUST be called to avoid leaking resources. The network connection will not terminate simply because there are no references to the connection object.

Returns:

concurrent.futures.Future – This connection’s shutdown_future, which completes when shutdown has finished.

is_open()
Returns:

bool – True if this connection is open and usable, False otherwise. Check shutdown_future to know when the connection is completely finished shutting down.

send_protocol_message(*, headers: Sequence[Header] | None = None, payload: bytes | bytearray | None = None, message_type: MessageType, flags: int | None = None, on_flush: Callable = None) concurrent.futures.Future

Send a protocol message.

Protocol messages use stream-id 0.

Use the returned future, or the on_flush callback, to be informed when the message is successfully written to the wire, or fails to send.

Keyword Arguments:
  • headers – Message headers.

  • payload – Binary message payload.

  • message_type – Message type.

  • flags – Message flags. Values from MessageFlag may be XORed together. Not all flags can be used with all message types, consult documentation.

  • on_flush

    Callback invoked when the message is successfully written to the wire, or fails to send. The function should take the following arguments and return nothing:

    • error (Optional[Exception]): None if the message was successfully written to the wire, or an Exception if it failed to send.

    • **kwargs (dict): Forward compatibility kwargs.

    This callback is always invoked on the connection’s event-loop thread.

Returns:

A future which completes with a result of None if the message is successfully written to the wire, or an exception if the message fails to send.

new_stream(handler: ClientContinuationHandler) ClientContinuation

Create a new stream.

The stream will send no data until ClientContinuation.activate() is called. Call activate() when you’re ready for callbacks and events to fire.

Parameters:

handler – Handler to process continuation messages and state changes.

Returns:

The new continuation object.

class awscrt.eventstream.rpc.ClientContinuation(handler, connection)

A continuation of messages on a given stream-id.

Create with ClientConnection.new_stream().

The stream will send no data until ClientContinuation.activate() is called. Call activate() when you’re ready for callbacks and events to fire.

connection

This stream’s connection.

Type:

ClientConnection

closed_future

Future which completes with a result of None when the continuation has closed.

Type:

concurrent.futures.Future

activate(*, operation: str, headers: Sequence[Header] | None = None, payload: bytes | bytearray | None = None, message_type: MessageType, flags: int | None = None, on_flush: Callable | None = None)

Activate the stream by sending its first message.

Use the returned future, or the on_flush callback, to be informed when the message is successfully written to the wire, or fails to send.

activate() may only be called once, use send_message() to write further messages on this stream-id.

Keyword Arguments:
  • operation – Operation name for this stream.

  • headers – Message headers.

  • payload – Binary message payload.

  • message_type – Message type.

  • flags – Message flags. Values from MessageFlag may be XORed together. Not all flags can be used with all message types, consult documentation.

  • on_flush

    Callback invoked when the message is successfully written to the wire, or fails to send. The function should take the following arguments and return nothing:

    • error (Optional[Exception]): None if the message was successfully written to the wire, or an Exception if it failed to send.

    • **kwargs (dict): Forward compatibility kwargs.

    This callback is always invoked on the connection’s event-loop thread.

Returns:

A future which completes with a result of None if the message is successfully written to the wire, or an exception if the message fails to send.

send_message(*, headers: Sequence[Header] = None, payload: bytes | bytearray = None, message_type: MessageType, flags: int = None, on_flush: Callable = None) concurrent.futures.Future

Send a continuation message.

Use the returned future, or the on_flush callback, to be informed when the message is successfully written to the wire, or fails to send.

Note that the the first message on a stream-id must be sent with activate(), send_message() is for all messages that follow.

Keyword Arguments:
  • operation – Operation name for this stream.

  • headers – Message headers.

  • payload – Binary message payload.

  • message_type – Message type.

  • flags – Message flags. Values from MessageFlag may be XORed together. Not all flags can be used with all message types, consult documentation.

  • on_flush

    Callback invoked when the message is successfully written to the wire, or fails to send. The function should take the following arguments and return nothing:

    • error (Optional[Exception]): None if the message was successfully written to the wire, or an Exception if it failed to send.

    • **kwargs (dict): Forward compatibility kwargs.

    This callback is always invoked on the connection’s event-loop thread.

Returns:

A future which completes with a result of None if the message is successfully written to the wire, or an exception if the message fails to send.

class awscrt.eventstream.rpc.ClientContinuationHandler

Base class for handling stream continuation events.

Inherit from this class and override methods to handle events. All callbacks will be invoked on the same thread (the same thread used by the connection).

A common pattern is to store the continuation within its handler. Example:

continuation_handler.continuation = connection.new_stream(continuation_handler)
abstract on_continuation_message(headers: Sequence[Header], payload: bytes, message_type: MessageType, flags: int, **kwargs) None

Invoked when a message is received on this continuation.

Parameters:
  • headers – Message headers.

  • payload – Binary message payload.

  • message_type – Message type.

  • flags – Message flags. Values from MessageFlag may be XORed together. Not all flags can be used with all message types, consult documentation.

  • **kwargs – Forward compatibility kwargs.

abstract on_continuation_closed(**kwargs) None

Invoked when the continuation is closed.

Once the continuation is closed, no more messages may be sent or received. The continuation is closed when a message is sent or received with the TERMINATE_STREAM flag, or when the connection shuts down.

Parameters:

**kwargs – Forward compatibility kwargs.