awscrt.mqtt

MQTT

All network operations in awscrt.mqtt are asynchronous.

class awscrt.mqtt.QoS(value)

Quality of Service enumeration

[MQTT-4.3]

AT_MOST_ONCE = 0

QoS 0 - At most once delivery

The message is delivered according to the capabilities of the underlying network. No response is sent by the receiver and no retry is performed by the sender. The message arrives at the receiver either once or not at all.

AT_LEAST_ONCE = 1

QoS 1 - At least once delivery

This quality of service ensures that the message arrives at the receiver at least once.

EXACTLY_ONCE = 2

QoS 2 - Exactly once delivery

This is the highest quality of service, for use when neither loss nor duplication of messages are acceptable. There is an increased overhead associated with this quality of service.

Note that, while this client supports QoS 2, the AWS IoT Core server does not support QoS 2 at time of writing (May 2020).

class awscrt.mqtt.ConnectReturnCode(value)

Connect return code enumeration.

[MQTT-3.2.2.3]

ACCEPTED = 0

Connection Accepted.

UNACCEPTABLE_PROTOCOL_VERSION = 1

Connection Refused, unacceptable protocol version.

The Server does not support the level of the MQTT protocol requested by the Client.

IDENTIFIER_REJECTED = 2

Connection Refused, identifier rejected.

The Client identifier is correct UTF-8 but not allowed by the Server.

SERVER_UNAVAILABLE = 3

Connection Refused, Server unavailable.

The Network Connection has been made but the MQTT service is unavailable.

BAD_USERNAME_OR_PASSWORD = 4

Connection Refused, bad user name or password.

The data in the user name or password is malformed.

NOT_AUTHORIZED = 5

Connection Refused, not authorized.

The Client is not authorized to connect.

class awscrt.mqtt.Will(topic, qos, payload, retain)

A Will message is published by the server if a client is lost unexpectedly.

The Will message is stored on the server when a client connects. It is published if the client connection is lost without the server receiving a DISCONNECT packet.

[MQTT-3.1.2-8]

Parameters:
  • topic (str) – Topic to publish Will message on.

  • qos (QoS) – QoS used when publishing the Will message.

  • payload (bytes) – Content of Will message.

  • retain (bool) – Whether the Will message is to be retained when it is published.

topic

Topic to publish Will message on.

Type:

str

qos

QoS used when publishing the Will message.

Type:

QoS

payload

Content of Will message.

Type:

bytes

retain

Whether the Will message is to be retained when it is published.

Type:

bool

class awscrt.mqtt.OnConnectionSuccessData(return_code: ConnectReturnCode | None = None, session_present: bool = False)

Dataclass containing data related to a on_connection_success Callback

Parameters:
  • return_code (ConnectReturnCode) – Connect return. code received from the server.

  • session_present (bool) – True if the connection resumes an existing session. False if new session. Note that the server has forgotten all previous subscriptions if this is False. Subscriptions can be re-established via resubscribe_existing_topics() if the connection was a reconnection.

class awscrt.mqtt.OnConnectionFailureData(error: AwsCrtError | None = None)

Dataclass containing data related to a on_connection_failure Callback

Parameters:

error (ConnectReturnCode) – Error code with reason for connection failure

class awscrt.mqtt.OnConnectionClosedData

Dataclass containing data related to a on_connection_closed Callback. Currently unused.

class awscrt.mqtt.Client(bootstrap=None, tls_ctx=None)

MQTT client.

Parameters:
  • bootstrap (Optional [ClientBootstrap]) – Client bootstrap to use when initiating new socket connections. If None is provided, the default singleton is used.

  • tls_ctx (Optional[ClientTlsContext]) – TLS context for secure socket connections. If None is provided, then an unencrypted connection is used.

class awscrt.mqtt.OperationStatisticsData(incomplete_operation_count: int = 0, incomplete_operation_size: int = 0, unacked_operation_count: int = 0, unacked_operation_size: int = 0)

Dataclass containing some simple statistics about the current state of the connection’s queue of operations

Parameters:
  • incomplete_operation_count (int) – total number of operations submitted to the connection that have not yet been completed. Unacked operations are a subset of this.

  • incomplete_operation_size (int) – total packet size of operations submitted to the connection that have not yet been completed. Unacked operations are a subset of this.

  • unacked_operation_count (int) – total number of operations that have been sent to the server and are waiting for a corresponding ACK before they can be completed.

  • unacked_operation_size (int) – total packet size of operations that have been sent to the server and are waiting for a corresponding ACK before they can be completed.

class awscrt.mqtt.Connection(client, host_name, port, client_id, clean_session=True, on_connection_interrupted=None, on_connection_resumed=None, reconnect_min_timeout_secs=5, reconnect_max_timeout_secs=60, keep_alive_secs=1200, ping_timeout_ms=3000, protocol_operation_timeout_ms=0, will=None, username=None, password=None, socket_options=None, use_websockets=False, websocket_proxy_options=None, websocket_handshake_transform=None, proxy_options=None, on_connection_success=None, on_connection_failure=None, on_connection_closed=None)

MQTT client connection.

Parameters:
  • client (Client) – MQTT client to spawn connection from.

  • host_name (str) – Server name to connect to.

  • port (int) – Server port to connect to.

  • client_id (str) – ID to place in CONNECT packet. Must be unique across all devices/clients. If an ID is already in use, the other client will be disconnected.

  • clean_session (bool) – Whether or not to start a clean session with each reconnect. If True, the server will forget all subscriptions with each reconnect. Set False to request that the server resume an existing session or start a new session that may be resumed after a connection loss. The session_present bool in the connection callback informs whether an existing session was successfully resumed. If an existing session is resumed, the server remembers previous subscriptions and sends messages (with QoS1 or higher) that were published while the client was offline.

  • on_connection_interrupted

    Optional callback invoked whenever the MQTT connection is lost. The MQTT client will automatically attempt to reconnect. The function should take the following arguments return nothing:

  • on_connection_resumed

    Optional callback invoked whenever the MQTT connection is automatically resumed. Function should take the following arguments and return nothing:

    • connection (Connection): This MQTT Connection

    • return_code (ConnectReturnCode): Connect return code received from the server.

    • session_present (bool): True if resuming existing session. False if new session. Note that the server has forgotten all previous subscriptions if this is False. Subscriptions can be re-established via resubscribe_existing_topics().

    • **kwargs (dict): Forward-compatibility kwargs.

  • on_connection_success

    Optional callback invoked whenever the connection successfully connects. This callback is invoked for every successful connect and every successful reconnect.

    Function should take the following arguments and return nothing:

  • on_connection_failure

    Optional callback invoked whenever the connection fails to connect. This callback is invoked for every failed connect and every failed reconnect.

    Function should take the following arguments and return nothing:

  • on_connection_closed

    Optional callback invoked whenever the connection has been disconnected and shutdown successfully. Function should take the following arguments and return nothing:

  • reconnect_min_timeout_secs (int) – Minimum time to wait between reconnect attempts. Must be <= reconnect_max_timeout_secs. Wait starts at min and doubles with each attempt until max is reached.

  • reconnect_max_timeout_secs (int) – Maximum time to wait between reconnect attempts. Must be >= reconnect_min_timeout_secs. Wait starts at min and doubles with each attempt until max is reached.

  • keep_alive_secs (int) – The keep alive value, in seconds, to send in CONNECT packet. A PING will automatically be sent at this interval. The server will assume the connection is lost if no PING is received after 1.5X this value. This duration must be longer than ping_timeout_ms.

  • ping_timeout_ms (int) – Milliseconds to wait for ping response before client assumes the connection is invalid and attempts to reconnect. This duration must be shorter than keep_alive_secs.

  • protocol_operation_timeout_ms (int) – Milliseconds to wait for the response to the operation requires response by protocol. Set to zero to disable timeout. Otherwise, the operation will fail if no response is received within this amount of time after the packet is written to the socket It applied to PUBLISH (QoS>0) and UNSUBSCRIBE now.

  • will (Will) – Will to send with CONNECT packet. The will is published by the server when its connection to the client is unexpectedly lost.

  • username (str) – Username to connect with.

  • password (str) – Password to connect with.

  • socket_options (Optional[awscrt.io.SocketOptions]) – Optional socket options.

  • use_websocket (bool) – If true, connect to MQTT over websockets.

  • websocket_proxy_options (Optional[awscrt.http.HttpProxyOptions]) – Optional proxy options for websocket connections. Deprecated, use proxy_options instead.

  • websocket_handshake_transform

    Optional function to transform websocket handshake request. If provided, function is called each time a websocket connection is attempted. The function may modify the HTTP request before it is sent to the server. See WebsocketHandshakeTransformArgs for more info. Function should take the following arguments and return nothing:

    • transform_args (WebsocketHandshakeTransformArgs): Contains HTTP request to be transformed. Function must call transform_args.done() when complete.

    • **kwargs (dict): Forward-compatibility kwargs.

  • proxy_options (Optional[awscrt.http.HttpProxyOptions]) – Optional proxy options for all connections.

connect()

Open the actual connection to the server (async).

Returns:

concurrent.futures.Future – Future which completes when connection succeeds or fails. If connection fails, Future will contain an exception. If connection succeeds, Future will contain a dict with the following members:

  • [‘session_present’] (bool): is True if resuming existing session and False if new session.

disconnect()

Close the connection (async).

Returns:

concurrent.futures.Future – Future which completes when the connection is closed. The future will contain an empty dict.

subscribe(topic, qos, callback=None)

Subscribe to a topic filter (async).

The client sends a SUBSCRIBE packet and the server responds with a SUBACK.

subscribe() may be called while the device is offline, though the async operation cannot complete successfully until the connection resumes.

Once subscribed, callback is invoked each time a message matching the topic is received. It is possible for such messages to arrive before the SUBACK is received.

Parameters:
  • topic (str) – Subscribe to this topic filter, which may include wildcards.

  • qos (QoS) – Maximum requested QoS that server may use when sending messages to the client. The server may grant a lower QoS in the SUBACK (see returned Future)

  • callback

    Optional callback invoked when message received. Function should take the following arguments and return nothing:

    • topic (str): Topic receiving message.

    • payload (bytes): Payload of message.

    • dup (bool): DUP flag. If True, this might be re-delivery of an earlier attempt to send the message.

    • qos (QoS): Quality of Service used to deliver the message.

    • retain (bool): Retain flag. If True, the message was sent as a result of a new subscription being made by the client.

    • **kwargs (dict): Forward-compatibility kwargs.

Returns:

Tuple[concurrent.futures.Future, int] – Tuple containing a Future and the ID of the SUBSCRIBE packet. The Future completes when a SUBACK is received from the server. If successful, the Future will contain a dict with the following members:

  • [‘packet_id’] (int): ID of the SUBSCRIBE packet being acknowledged.

  • [‘topic’] (str): Topic filter of the SUBSCRIBE packet being acknowledged.

  • [‘qos’] (QoS): Maximum QoS that was granted by the server. This may be lower than the requested QoS.

If unsuccessful, the Future contains an exception. The exception will be a SubscribeError if a SUBACK was received in which the server rejected the subscription. Other exception types indicate other errors with the operation.

on_message(callback)

Set callback to be invoked when ANY message is received.

callback: Callback to invoke when message received, or None to disable.

Function should take the following arguments and return nothing:

  • topic (str): Topic receiving message.

  • payload (bytes): Payload of message.

  • dup (bool): DUP flag. If True, this might be re-delivery of an earlier attempt to send the message.

  • qos (QoS): Quality of Service used to deliver the message.

  • retain (bool): Retain flag. If True, the message was sent as a result of a new subscription being made by the client.

  • **kwargs (dict): Forward-compatibility kwargs.

unsubscribe(topic)

Unsubscribe from a topic filter (async).

The client sends an UNSUBSCRIBE packet, and the server responds with an UNSUBACK.

Parameters:

topic (str) – Unsubscribe from this topic filter.

Returns:

Tuple[concurrent.futures.Future, int] – Tuple containing a Future and the ID of the UNSUBSCRIBE packet. The Future completes when an UNSUBACK is received from the server. If successful, the Future will contain a dict with the following members:

  • [‘packet_id’] (int): ID of the UNSUBSCRIBE packet being acknowledged.

resubscribe_existing_topics()

Subscribe again to all current topics.

This is to help when resuming a connection with a clean session.

Important: Currently the resubscribe function does not take the AWS IoT Core maximum subscriptions per subscribe request quota into account. If the client has more subscriptions than the maximum, resubscribing must be done manually using the subscribe() function for each desired topic filter. The client will be disconnected by AWS IoT Core if the resubscribe exceeds the subscriptions per subscribe request quota.

The AWS IoT Core maximum subscriptions per subscribe request quota is listed at the following URL: https://docs.aws.amazon.com/general/latest/gr/iot-core.html#genref_max_subscriptions_per_subscribe_request

Returns:

Tuple[concurrent.futures.Future, int] – Tuple containing a Future and the ID of the SUBSCRIBE packet. The Future completes when a SUBACK is received from the server. If successful, the Future will contain a dict with the following members:

  • [‘packet_id’]: ID of the SUBSCRIBE packet being acknowledged, or None if there were no topics to resubscribe to.

  • [‘topics’]: A list of (topic, qos) tuples, where qos will be None if the topic failed to resubscribe. If there were no topics to resubscribe to, then the list will be empty.

publish(topic, payload, qos, retain=False)

Publish message (async).

If the device is offline, the PUBLISH packet will be sent once the connection resumes.

Parameters:
  • topic (str) – Topic name.

  • payload (Union[str, bytes, bytearray]) – Contents of message.

  • qos (QoS) – Quality of Service for delivering this message.

  • retain (bool) – If True, the server will store the message and its QoS so that it can be delivered to future subscribers whose subscriptions match its topic name.

Returns:

Tuple[concurrent.futures.Future, int] – Tuple containing a Future and the ID of the PUBLISH packet. The QoS determines when the Future completes:

  • For QoS 0, completes as soon as the packet is sent.

  • For QoS 1, completes when PUBACK is received.

  • For QoS 2, completes when PUBCOMP is received.

If successful, the Future will contain a dict with the following members:

  • [‘packet_id’] (int): ID of the PUBLISH packet that is complete.

get_stats()

Queries the connection’s internal statistics for incomplete operations.

Returns:

The (OperationStatisticsData) containing the statistics

class awscrt.mqtt.WebsocketHandshakeTransformArgs(mqtt_connection, http_request, done_future)

Argument to a “websocket_handshake_transform” function.

A websocket_handshake_transform function has signature: fn(transform_args: WebsocketHandshakeTransformArgs, **kwargs) -> None

The function implementer may modify transform_args.http_request as desired. They MUST call transform_args.set_done() when complete, passing an exception if something went wrong. Failure to call set_done() will hang the application.

The implementer may do asynchronous work before calling transform_args.set_done(), they are not required to call set_done() within the scope of the transform function. An example of async work would be to fetch credentials from another service, sign the request headers, and finally call set_done() to mark the transform complete.

The default websocket handshake request uses path “/mqtt”. All required headers are present, plus the optional header “Sec-WebSocket-Protocol: mqtt”.

Parameters:
mqtt_connection

Connection this handshake is for.

Type:

Connection

http_request

HTTP request for this handshake.

Type:

awscrt.http.HttpRequest

set_done(exception=None)

Mark the transformation complete. If exception is passed in, the handshake is canceled.

exception awscrt.mqtt.SubscribeError

Subscription rejected by server.