mqtt#

class platypush.backend.mqtt.MqttBackend(*args, host: str | None = None, port: int = 1883, topic: str = 'platypush_bus_mq', subscribe_default_topic: bool = True, tls_cafile: str | None = None, tls_certfile: str | None = None, tls_keyfile: str | None = None, tls_version: str | None = None, tls_ciphers: str | None = None, tls_insecure: bool = False, username: str | None = None, password: str | None = None, client_id: str | None = None, listeners=None, **kwargs)[source]#

Bases: Backend

Backend that reads messages from a configured MQTT topic (default: platypush_bus_mq/<device_id>) and posts them to the application bus.

Triggers:

Requires:

  • paho-mqtt (pip install paho-mqtt)

__init__(*args, host: str | None = None, port: int = 1883, topic: str = 'platypush_bus_mq', subscribe_default_topic: bool = True, tls_cafile: str | None = None, tls_certfile: str | None = None, tls_keyfile: str | None = None, tls_version: str | None = None, tls_ciphers: str | None = None, tls_insecure: bool = False, username: str | None = None, password: str | None = None, client_id: str | None = None, listeners=None, **kwargs)[source]#
Parameters:
  • host – MQTT broker host. If no host configuration is specified then the backend will use the host configuration specified on the mqtt plugin if it’s available.

  • port – MQTT broker port (default: 1883)

  • topic – Topic to read messages from (default: platypush_bus_mq/<device_id>)

  • subscribe_default_topic – Whether the backend should subscribe the default topic (default: platypush_bus_mq/<device_id>) and execute the messages received there as action requests (default: True).

  • tls_cafile – If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority to authenticate it, ssl_cafile will point to the provided ca.crt file (default: None)

  • tls_certfile – If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it here (default: None)

  • tls_keyfile – If TLS/SSL is enabled on the MQTT server and a client certificate key it required, specify it here (default: None) :type tls_keyfile: str

  • tls_version – If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it here (default: None). Supported versions: tls (automatic), tlsv1, tlsv1.1, tlsv1.2.

  • tls_ciphers – If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is required, specify it here (default: None)

  • tls_insecure – Set to True to ignore TLS insecure warnings (default: False).

  • username – Specify it if the MQTT server requires authentication (default: None)

  • password – Specify it if the MQTT server requires authentication (default: None)

  • client_id – ID used to identify the client on the MQTT server (default: None). If None is specified then Config.get('device_id') will be used.

  • listeners

    If specified then the MQTT backend will also listen for messages on the additional configured message queues. This parameter is a list of maps where each item supports the same arguments passed to the main backend configuration (host, port, topic, password etc.). Note that the message queue configured on the main configuration will expect valid Platypush messages that then can execute, while message queues registered to the listeners will accept any message. Example:

    listeners:
        - host: localhost
          topics:
              - topic1
              - topic2
              - topic3
        - host: sensors
          topics:
              - topic4
              - topic5
    

on_stop()[source]#

Callback invoked when the process stops

run()[source]#

Starts the backend thread. To be implemented in the derived classes if the loop method isn’t defined.

send_message(msg, *_, topic: str | None = None, **kwargs)[source]#

Sends a platypush.message.Message to a node. To be implemented in the derived classes. By default, if the Redis backend is configured then it will try to deliver the message to other consumers through the configured Redis main queue.

Parameters:
  • msg – The message to send

  • queue_name – Send the message on a specific queue (default: the queue_name configured on the Redis backend)

class platypush.backend.mqtt.MqttClient(*args, host: str, port: int, topics: List[str] | None = None, on_message: Callable | None = None, username: str | None = None, password: str | None = None, client_id: str | None = None, tls_cafile: str | None = None, tls_certfile: str | None = None, tls_keyfile: str | None = None, tls_version=None, tls_ciphers=None, tls_insecure: bool = False, keepalive: int | None = 60, **kwargs)[source]#

Bases: Client, Thread

Wrapper class for an MQTT client executed in a separate thread.

__init__(*args, host: str, port: int, topics: List[str] | None = None, on_message: Callable | None = None, username: str | None = None, password: str | None = None, client_id: str | None = None, tls_cafile: str | None = None, tls_certfile: str | None = None, tls_keyfile: str | None = None, tls_version=None, tls_ciphers=None, tls_insecure: bool = False, keepalive: int | None = 60, **kwargs)[source]#

client_id is the unique client id string used when connecting to the broker. If client_id is zero length or None, then the behaviour is defined by which protocol version is in use. If using MQTT v3.1.1, then a zero length client id will be sent to the broker and the broker will generate a random for the client. If using MQTT v3.1 then an id will be randomly generated. In both cases, clean_session must be True. If this is not the case a ValueError will be raised.

clean_session is a boolean that determines the client type. If True, the broker will remove all information about this client when it disconnects. If False, the client is a persistent client and subscription information and queued messages will be retained when the client disconnects. Note that a client will never discard its own outgoing messages on disconnect. Calling connect() or reconnect() will cause the messages to be resent. Use reinitialise() to reset a client to its original state. The clean_session argument only applies to MQTT versions v3.1.1 and v3.1. It is not accepted if the MQTT version is v5.0 - use the clean_start argument on connect() instead.

userdata is user defined data of any type that is passed as the “userdata” parameter to callbacks. It may be updated at a later point with the user_data_set() function.

The protocol argument allows explicit setting of the MQTT version to use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1), paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 (v5.0), with the default being v3.1.1.

Set transport to “websockets” to use WebSockets as the transport mechanism. Set to “tcp” to use raw TCP, which is the default.

run()[source]#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

subscribe(*topics, **kwargs)[source]#

Client subscription handler.

unsubscribe(*topics, **kwargs)[source]#

Client unsubscribe handler.