awscrt.mqtt

MQTT

All network operations in awscrt.mqtt are asynchronous.

class awscrt.mqtt.QoS

Quality of Service enumeration

[MQTT-4.3]

AT_MOST_ONCE = 0

QoS 0 - At most once delivery

The message is delivered according to the capabilities of the underlying network. No response is sent by the receiver and no retry is performed by the sender. The message arrives at the receiver either once or not at all.

AT_LEAST_ONCE = 1

QoS 1 - At least once delivery

This quality of service ensures that the message arrives at the receiver at least once.

EXACTLY_ONCE = 2

QoS 2 - Exactly once delivery

This is the highest quality of service, for use when neither loss nor duplication of messages are acceptable. There is an increased overhead associated with this quality of service.

Note that, while this client supports QoS 2, the AWS IoT Core server does not support QoS 2 at time of writing (May 2020).

class awscrt.mqtt.ConnectReturnCode

Connect return code enumeration.

[MQTT-3.2.2.3]

ACCEPTED = 0

Connection Accepted.

UNACCEPTABLE_PROTOCOL_VERSION = 1

Connection Refused, unacceptable protocol version.

The Server does not support the level of the MQTT protocol requested by the Client.

IDENTIFIER_REJECTED = 2

Connection Refused, identifier rejected.

The Client identifier is correct UTF-8 but not allowed by the Server.

SERVER_UNAVAILABLE = 3

Connection Refused, Server unavailable.

The Network Connection has been made but the MQTT service is unavailable.

BAD_USERNAME_OR_PASSWORD = 4

Connection Refused, bad user name or password.

The data in the user name or password is malformed.

NOT_AUTHORIZED = 5

Connection Refused, not authorized.

The Client is not authorized to connect.

class awscrt.mqtt.Will(topic, qos, payload, retain)

A Will message is published by the server if a client is lost unexpectedly.

The Will message is stored on the server when a client connects. It is published if the client connection is lost without the server receiving a DISCONNECT packet.

[MQTT-3.1.2-8]

Parameters
  • topic (str) – Topic to publish Will message on.

  • qos (QoS) – QoS used when publishing the Will message.

  • payload (bytes) – Content of Will message.

  • retain (bool) – Whether the Will message is to be retained when it is published.

topic

Topic to publish Will message on.

Type

str

qos

QoS used when publishing the Will message.

Type

QoS

payload

Content of Will message.

Type

bytes

retain

Whether the Will message is to be retained when it is published.

Type

bool

class awscrt.mqtt.Client(bootstrap, tls_ctx=None)

MQTT client.

Parameters
  • bootstrap (ClientBootstrap) – Client bootstrap to use when initiating new socket connections.

  • tls_ctx (Optional[ClientTlsContext]) – TLS context for secure socket connections. If None is provided, then an unencrypted connection is used.

class awscrt.mqtt.Connection(client, host_name, port, client_id, clean_session=True, on_connection_interrupted=None, on_connection_resumed=None, reconnect_min_timeout_secs=5, reconnect_max_timeout_secs=60, keep_alive_secs=1200, ping_timeout_ms=3000, will=None, username=None, password=None, socket_options=None, use_websockets=False, websocket_proxy_options=None, websocket_handshake_transform=None)

MQTT client connection.

Parameters
  • client (Client) – MQTT client to spawn connection from.

  • host_name (str) – Server name to connect to.

  • port (int) – Server port to connect to.

  • client_id (str) – ID to place in CONNECT packet. Must be unique across all devices/clients. If an ID is already in use, the other client will be disconnected.

  • clean_session (bool) – Whether or not to start a clean session with each reconnect. If True, the server will forget all subscriptions with each reconnect. Set False to request that the server resume an existing session or start a new session that may be resumed after a connection loss. The session_present bool in the connection callback informs whether an existing session was successfully resumed. If an existing session is resumed, the server remembers previous subscriptions and sends mesages (with QoS1 or higher) that were published while the client was offline.

  • on_connection_interrupted

    Optional callback invoked whenever the MQTT connection is lost. The MQTT client will automatically attempt to reconnect. The function should take the following arguments return nothing:

  • on_connection_resumed

    Optional callback invoked whenever the MQTT connection is automatically resumed. Function should take the following arguments and return nothing:

    • connection (Connection): This MQTT Connection

    • return_code (ConnectReturnCode): Connect return code received from the server.

    • session_present (bool): True if resuming existing session. False if new session. Note that the server has forgotten all previous subscriptions if this is False. Subscriptions can be re-established via resubscribe_existing_topics().

    • **kwargs (dict): Forward-compatibility kwargs.

  • reconnect_min_timeout_secs (int) – Minimum time to wait between reconnect attempts. Must be <= reconnect_max_timeout_secs. Wait starts at min and doubles with each attempt until max is reached.

  • reconnect_max_timeout_secs (int) – Maximum time to wait between reconnect attempts. Must be >= reconnect_min_timeout_secs. Wait starts at min and doubles with each attempt until max is reached.

  • keep_alive_secs (int) – The keep alive value, in seconds, to send in CONNECT packet. A PING will automatically be sent at this interval. The server will assume the connection is lost if no PING is received after 1.5X this value. This duration must be longer than ping_timeout_ms.

  • ping_timeout_ms (int) – Milliseconds to wait for ping response before client assumes the connection is invalid and attempts to reconnect. This duration must be shorter than keep_alive_secs. Alternatively, TCP keep-alive via SocketOptions.keep_alive may accomplish this in a more efficient (low-power) scenario, but keep-alive options may not work the same way on every platform and OS version.

  • will (Will) – Will to send with CONNECT packet. The will is published by the server when its connection to the client is unexpectedly lost.

  • username (str) – Username to connect with.

  • password (str) – Password to connect with.

  • socket_options (Optional[awscrt.io.SocketOptions]) – Optional socket options.

  • use_websocket (bool) – If true, connect to MQTT over websockets.

  • websocket_proxy_options (Optional[awscrt.http.HttpProxyOptions]) – Optional proxy options for websocket connections.

  • websocket_handshake_transform

    Optional function to transform websocket handshake request. If provided, function is called each time a websocket connection is attempted. The function may modify the HTTP request before it is sent to the server. See WebsocketHandshakeTransformArgs for more info. Function should take the following arguments and return nothing:

    • transform_args (WebsocketHandshakeTransformArgs): Contains HTTP request to be transformed. Function must call transform_args.done() when complete.

    • **kwargs (dict): Forward-compatibility kwargs.

connect()

Open the actual connection to the server (async).

Returns

concurrent.futures.Future – Future which completes when connection succeeds or fails. If connection fails, Future will contain an exception. If connection succeeds, Future will contain a dict with the following members:

  • [‘session_present’] (bool): is True if resuming existing session and False if new session.

disconnect()

Close the connection (async).

Returns

concurrent.futures.Future – Future which completes when the connection is closed. The future will contain an empty dict.

subscribe(topic, qos, callback=None)

Subscribe to a topic filter (async).

The client sends a SUBSCRIBE packet and the server responds with a SUBACK.

subscribe() may be called while the device is offline, though the async operation cannot complete successfully until the connection resumes.

Once subscribed, callback is invoked each time a message matching the topic is received. It is possible for such messages to arrive before the SUBACK is received.

Parameters
  • topic (str) – Subscribe to this topic filter, which may include wildcards.

  • qos (QoS) – Maximum requested QoS that server may use when sending messages to the client. The server may grant a lower QoS in the SUBACK (see returned Future)

  • callback

    Optional callback invoked when message received. Function should take the following arguments and return nothing:

    • topic (str): Topic receiving message.

    • payload (bytes): Payload of message.

    • **kwargs (dict): Forward-compatibility kwargs.

Returns

Tuple[concurrent.futures.Future, int] – Tuple containing a Future and the ID of the SUBSCRIBE packet. The Future completes when a SUBACK is received from the server. If successful, the Future will contain a dict with the following members:

  • [‘packet_id’] (int): ID of the SUBSCRIBE packet being acknowledged.

  • [‘topic’] (str): Topic filter of the SUBSCRIBE packet being acknowledged.

  • [‘qos’] (QoS): Maximum QoS that was granted by the server. This may be lower than the requested QoS.

If unsuccessful, the Future contains an exception. The exception will be a SubscribeError if a SUBACK was received in which the server rejected the subscription. Other exception types indicate other errors with the operation.

on_message(callback)

Set callback to be invoked when ANY message is received.

callback: Callback to invoke when message received, or None to disable.

Function should take the following arguments and return nothing:

  • topic (str): Topic receiving message.

  • payload (bytes): Payload of message.

  • **kwargs (dict): Forward-compatibility kwargs.

unsubscribe(topic)

Unsubscribe from a topic filter (async).

The client sends an UNSUBSCRIBE packet, and the server responds with an UNSUBACK.

Parameters

topic (str) – Unsubscribe from this topic filter.

Returns

Tuple[concurrent.futures.Future, int] – Tuple containing a Future and the ID of the UNSUBSCRIBE packet. The Future completes when an UNSUBACK is received from the server. If successful, the Future will contain a dict with the following members:

  • [‘packet_id’] (int): ID of the UNSUBSCRIBE packet being acknowledged.

resubscribe_existing_topics()

Subscribe again to all current topics.

This is to help when resuming a connection with a clean session.

Returns

Tuple[concurrent.futures.Future, int] – Tuple containing a Future and the ID of the SUBSCRIBE packet. The Future completes when a SUBACK is received from the server. If successful, the Future will contain a dict with the following members:

  • [‘packet_id’]: ID of the SUBSCRIBE packet being acknowledged, or None if there were no topics to resubscribe to.

  • [‘topics’]: A list of (topic, qos) tuples, where qos will be None if the topic failed to resubscribe. If there were no topics to resubscribe to, then the list will be empty.

publish(topic, payload, qos, retain=False)

Publish message (async).

If the device is offline, the PUBLISH packet will be sent once the connection resumes.

Parameters
  • topic (str) – Topic name.

  • payload (buffer) – Contents of message.

  • qos (QoS) – Quality of Service for delivering this message.

  • retain (bool) – If True, the server will store the message and its QoS so that it can be delivered to future subscribers whose subscriptions match its topic name.

Returns

Tuple[concurrent.futures.Future, int] – Tuple containing a Future and the ID of the PUBLISH packet. The QoS determines when the Future completes:

  • For QoS 0, completes as soon as the packet is sent.

  • For QoS 1, completes when PUBACK is received.

  • For QoS 2, completes when PUBCOMP is received.

If successful, the Future will contain a dict with the following members:

  • [‘packet_id’] (int): ID of the PUBLISH packet that is complete.

class awscrt.mqtt.WebsocketHandshakeTransformArgs(mqtt_connection, http_request, done_future)

Argument to a “websocket_handshake_transform” function.

A websocket_handshake_transform function has signature: fn(transform_args: WebsocketHandshakeTransformArgs, **kwargs) -> None

The function implementer may modify transform_args.http_request as desired. They MUST call transform_args.set_done() when complete, passing an exception if something went wrong. Failure to call set_done() will hang the application.

The implementer may do asynchronous work before calling transform_args.set_done(), they are not required to call set_done() within the scope of the transform function. An example of async work would be to fetch credentials from another service, sign the request headers, and finally call set_done() to mark the transform complete.

The default websocket handshake request uses path “/mqtt”. All required headers are present, plus the optional header “Sec-WebSocket-Protocol: mqtt”.

Parameters
  • mqtt_connection (Connection) – Connection this handshake is for.

  • http_request (awscrt.http.HttpRequest) – HTTP request for this handshake.

  • done_future (concurrent.futures.Future) – Future to complete when the set_done() is called. It will contain None if successful, or an exception will be set.

mqtt_connection

Connection this handshake is for.

Type

Connection

http_request

HTTP request for this handshake.

Type

awscrt.http.HttpRequest

set_done(exception=None)

Mark the transformation complete. If exception is passed in, the handshake is canceled.

exception awscrt.mqtt.SubscribeError

Subscription rejected by server.