awscrt.eventstream¶
event-stream library for awscrt.
- class awscrt.eventstream.HeaderType(value)¶
Supported types for the value within a Header
- BOOL_TRUE = 0¶
Value is True.
No actual value is transmitted on the wire.
- BOOL_FALSE = 1¶
Value is False.
No actual value is transmitted on the wire.
- BYTE = 2¶
Value is signed 8-bit int.
- INT16 = 3¶
Value is signed 16-bit int.
- INT32 = 4¶
Value is signed 32-bit int.
- INT64 = 5¶
Value is signed 64-bit int.
- BYTE_BUF = 6¶
Value is raw bytes.
- STRING = 7¶
Value is a str.
Transmitted on the wire as utf-8
- TIMESTAMP = 8¶
Value is a posix timestamp (seconds since Unix epoch).
Transmitted on the wire as a 64-bit int
- UUID = 9¶
Value is a UUID.
Transmitted on the wire as 16 bytes
- class awscrt.eventstream.Header(name: str, value: Any, header_type: HeaderType)¶
A header in an event-stream message.
Each header has a name, value, and type.
HeaderType
enumerates the supported value types.Create a header with one of the Header.from_X() functions.
- classmethod from_bool(name: str, value: bool) Header ¶
Create a Header of type
BOOL_TRUE
orBOOL_FALSE
- classmethod from_byte(name: str, value: int) Header ¶
Create a Header of type
BYTE
The value must fit in an 8-bit signed int
- classmethod from_int16(name: str, value: int) Header ¶
Create a Header of type
INT16
The value must fit in an 16-bit signed int
- classmethod from_int32(name: str, value: int) Header ¶
Create a Header of type
INT32
The value must fit in an 32-bit signed int
- classmethod from_int64(name: str, value: int) Header ¶
Create a Header of type
INT64
The value must fit in an 64-bit signed int
- classmethod from_byte_buf(name: str, value: bytes | bytearray) Header ¶
Create a Header of type
BYTE_BUF
The value must be a bytes-like object
- classmethod from_timestamp(name: str, value: int) Header ¶
Create a Header of type
TIMESTAMP
Value must be a posix timestamp (seconds since Unix epoch)
- classmethod from_uuid(name: str, value: UUID) Header ¶
Create a Header of type
UUID
The value must be a UUID
- property type: HeaderType¶
Header type
- property value: Any¶
Header value
The header’s type determines the value’s type. Use the value_as_X() methods for type-checked queries.
- value_as_bool() bool ¶
Return bool value
Raises an exception if type is not
BOOL_TRUE
orBOOL_FALSE
- value_as_byte_buf() bytes | bytearray ¶
Return value of bytes
Raises an exception if type is not
BYTE_BUF
event-stream RPC (remote procedure call) protocol library for awscrt.
- class awscrt.eventstream.rpc.MessageType(value)¶
Types of messages in the event-stream RPC protocol.
The
APPLICATION_MESSAGE
andAPPLICATION_ERROR
types may only be sent on streams, and will never arrive as a protocol message (stream-id 0).For all other message types, they may only be sent as protocol messages (stream-id 0), and will never arrive as a stream message.
Different message types expect specific headers and flags, consult documentation.
- APPLICATION_MESSAGE = 0¶
Application message
- APPLICATION_ERROR = 1¶
Application error
- PING = 2¶
Ping
- PING_RESPONSE = 3¶
Ping response
- CONNECT = 4¶
Connect
- CONNECT_ACK = 5¶
Connect acknowledgement
If the
MessageFlag.CONNECTION_ACCEPTED
flag is not present, the connection has been rejected.
- PROTOCOL_ERROR = 6¶
Protocol error
- INTERNAL_ERROR = 7¶
Internal error
- class awscrt.eventstream.rpc.MessageFlag¶
Flags for messages in the event-stream RPC protocol.
Flags may be XORed together. Not all flags can be used with all message types, consult documentation.
- NONE = 0¶
No flags
- CONNECTION_ACCEPTED = 1¶
Connection accepted
If this flag is absent from a
MessageType.CONNECT_ACK
, the connection has been rejected.
- TERMINATE_STREAM = 2¶
Terminate stream
This message may be used with any message type. The sender will close their connection after the message is written to the wire. The receiver will close their connection after delivering the message to the user.
- class awscrt.eventstream.rpc.ClientConnectionHandler¶
Base class for handling connection events.
Inherit from this class and override methods to handle connection events. All callbacks for this connection will be invoked on the same thread, and
on_connection_setup()
will always be the first callback invoked.- abstract on_connection_setup(connection, error, **kwargs) None ¶
Invoked upon completion of the setup attempt.
If setup was successful, the connection is provided to the user.
Note that the network connection stays alive until it is closed, even if no local references to the connection object remain. The user should store a reference to this connection, and call connection.close() when they are done with it to avoid leaking resources.
Setup will always be the first callback invoked on the handler. If setup failed, no further callbacks will be invoked on this handler.
- Parameters:
connection – The connection, if setup was successful, or None if setup failed.
error – None, if setup was successful, or an Exception if setup failed.
**kwargs – Forward compatibility kwargs.
- abstract on_connection_shutdown(reason: Exception | None, **kwargs) None ¶
Invoked when the connection finishes shutting down.
This event will not be invoked if connection setup failed.
- Parameters:
reason – Reason will be None if the user initiated the shutdown, otherwise the reason will be an Exception.
**kwargs – Forward compatibility kwargs.
- abstract on_protocol_message(headers: Sequence[Header], payload: bytes, message_type: MessageType, flags: int, **kwargs) None ¶
Invoked when a message for the connection (stream-id 0) is received.
- Parameters:
headers – Message headers.
payload – Binary message payload.
message_type – Message type.
flags – Message flags. Values from
MessageFlag
may be XORed together. Not all flags can be used with all message types, consult documentation.**kwargs – Forward compatibility kwargs.
- class awscrt.eventstream.rpc.ClientConnection(host_name, port, handler)¶
A client connection for the event-stream RPC protocol.
Use
ClientConnection.connect()
to establish a new connection.Note that the network connection stays alive until it is closed, even if no local references to the connection object remain. The user should store a reference to any connections, and call
close()
when they are done with them to avoid leaking resources.- shutdown_future¶
Completes when this connection has finished shutting down. Future will contain a result of None, or an exception indicating why shutdown occurred.
- Type:
- classmethod connect(*, handler: ClientConnectionHandler, host_name: str, port: int, bootstrap: ClientBootstrap = None, socket_options: SocketOptions | None = None, tls_connection_options: TlsConnectionOptions | None = None) concurrent.futures.Future ¶
Asynchronously establish a new ClientConnection.
- Parameters:
handler – Handler for connection events.
host_name – Connect to host.
port – Connect to port.
bootstrap – Client bootstrap to use when initiating socket connection. If None is provided, the default singleton is used.
socket_options – Optional socket options. If None is provided, then default options are used.
tls_connection_options – Optional TLS connection options. If None is provided, then the connection will be attempted over plain-text.
- Returns:
concurrent.futures.Future – A Future which completes when the connection succeeds or fails. If successful, the Future will contain None. Otherwise it will contain an exception. If the connection is successful, it will be made available via the handler’s on_connection_setup callback. Note that this network connection stays alive until it is closed, even if no local references to the connection object remain. The user should store a reference to any connections, and call
close()
when they are done with them to avoid leaking resources.
- close()¶
Close the connection.
Shutdown is asynchronous. This call has no effect if the connection is already closed or closing.
Note that, if the network connection hasn’t already ended, close() MUST be called to avoid leaking resources. The network connection will not terminate simply because there are no references to the connection object.
- Returns:
concurrent.futures.Future – This connection’s
shutdown_future
, which completes when shutdown has finished.
- is_open()¶
- Returns:
bool – True if this connection is open and usable, False otherwise. Check
shutdown_future
to know when the connection is completely finished shutting down.
- send_protocol_message(*, headers: Sequence[Header] | None = None, payload: bytes | bytearray | None = None, message_type: MessageType, flags: int | None = None, on_flush: Callable = None) concurrent.futures.Future ¶
Send a protocol message.
Protocol messages use stream-id 0.
Use the returned future, or the on_flush callback, to be informed when the message is successfully written to the wire, or fails to send.
- Keyword Arguments:
headers – Message headers.
payload – Binary message payload.
message_type – Message type.
flags – Message flags. Values from
MessageFlag
may be XORed together. Not all flags can be used with all message types, consult documentation.on_flush –
Callback invoked when the message is successfully written to the wire, or fails to send. The function should take the following arguments and return nothing:
error (Optional[Exception]): None if the message was successfully written to the wire, or an Exception if it failed to send.
**kwargs (dict): Forward compatibility kwargs.
This callback is always invoked on the connection’s event-loop thread.
- Returns:
A future which completes with a result of None if the message is successfully written to the wire, or an exception if the message fails to send.
- new_stream(handler: ClientContinuationHandler) ClientContinuation ¶
Create a new stream.
The stream will send no data until
ClientContinuation.activate()
is called. Call activate() when you’re ready for callbacks and events to fire.- Parameters:
handler – Handler to process continuation messages and state changes.
- Returns:
The new continuation object.
- class awscrt.eventstream.rpc.ClientContinuation(handler, connection)¶
A continuation of messages on a given stream-id.
Create with
ClientConnection.new_stream()
.The stream will send no data until
ClientContinuation.activate()
is called. Call activate() when you’re ready for callbacks and events to fire.- connection¶
This stream’s connection.
- Type:
- closed_future¶
Future which completes with a result of None when the continuation has closed.
- activate(*, operation: str, headers: Sequence[Header] | None = None, payload: bytes | bytearray | None = None, message_type: MessageType, flags: int | None = None, on_flush: Callable | None = None)¶
Activate the stream by sending its first message.
Use the returned future, or the on_flush callback, to be informed when the message is successfully written to the wire, or fails to send.
activate() may only be called once, use send_message() to write further messages on this stream-id.
- Keyword Arguments:
operation – Operation name for this stream.
headers – Message headers.
payload – Binary message payload.
message_type – Message type.
flags – Message flags. Values from
MessageFlag
may be XORed together. Not all flags can be used with all message types, consult documentation.on_flush –
Callback invoked when the message is successfully written to the wire, or fails to send. The function should take the following arguments and return nothing:
error (Optional[Exception]): None if the message was successfully written to the wire, or an Exception if it failed to send.
**kwargs (dict): Forward compatibility kwargs.
This callback is always invoked on the connection’s event-loop thread.
- Returns:
A future which completes with a result of None if the message is successfully written to the wire, or an exception if the message fails to send.
- send_message(*, headers: Sequence[Header] = None, payload: bytes | bytearray = None, message_type: MessageType, flags: int = None, on_flush: Callable = None) concurrent.futures.Future ¶
Send a continuation message.
Use the returned future, or the on_flush callback, to be informed when the message is successfully written to the wire, or fails to send.
Note that the the first message on a stream-id must be sent with activate(), send_message() is for all messages that follow.
- Keyword Arguments:
operation – Operation name for this stream.
headers – Message headers.
payload – Binary message payload.
message_type – Message type.
flags – Message flags. Values from
MessageFlag
may be XORed together. Not all flags can be used with all message types, consult documentation.on_flush –
Callback invoked when the message is successfully written to the wire, or fails to send. The function should take the following arguments and return nothing:
error (Optional[Exception]): None if the message was successfully written to the wire, or an Exception if it failed to send.
**kwargs (dict): Forward compatibility kwargs.
This callback is always invoked on the connection’s event-loop thread.
- Returns:
A future which completes with a result of None if the message is successfully written to the wire, or an exception if the message fails to send.
- class awscrt.eventstream.rpc.ClientContinuationHandler¶
Base class for handling stream continuation events.
Inherit from this class and override methods to handle events. All callbacks will be invoked on the same thread (the same thread used by the connection).
A common pattern is to store the continuation within its handler. Example:
continuation_handler.continuation = connection.new_stream(continuation_handler)
- abstract on_continuation_message(headers: Sequence[Header], payload: bytes, message_type: MessageType, flags: int, **kwargs) None ¶
Invoked when a message is received on this continuation.
- Parameters:
headers – Message headers.
payload – Binary message payload.
message_type – Message type.
flags – Message flags. Values from
MessageFlag
may be XORed together. Not all flags can be used with all message types, consult documentation.**kwargs – Forward compatibility kwargs.
- abstract on_continuation_closed(**kwargs) None ¶
Invoked when the continuation is closed.
Once the continuation is closed, no more messages may be sent or received. The continuation is closed when a message is sent or received with the TERMINATE_STREAM flag, or when the connection shuts down.
- Parameters:
**kwargs – Forward compatibility kwargs.