mqtt#

Description#

This plugin allows you to send custom message to a message queue compatible with the MQTT protocol, see https://mqtt.org/

Configuration#

mqtt:
  # [Optional]
  # If set, MQTT messages will by default routed to this host
  # unless overridden in `send_message` (default: None)
  # host:   # type=Optional[str]

  # [Optional]
  # If a default host is set, specify the listen port
  # (default: 1883)
  # port: 1883  # type=int

  # [Optional]
  # If a default ``host`` is specified, then this list will
  # include a default list of topics that should be subscribed on that
  # broker at startup.
  # topics:   # type=Optional[Iterable[str]]

  # [Optional]
  # If a default host is set and requires TLS/SSL,
  # specify the certificate authority file (default: None)
  # tls_cafile:   # type=Optional[str]

  # [Optional]
  # If a default host is set and requires TLS/SSL,
  # specify the certificate file (default: None)
  # tls_certfile:   # type=Optional[str]

  # [Optional]
  # If a default host is set and requires TLS/SSL,
  # specify the key file (default: None)
  # tls_keyfile:   # type=Optional[str]

  # [Optional]
  # If TLS/SSL is enabled on the MQTT server and it
  # requires a certain TLS version, specify it here (default: None).
  # Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``,
  # ``tlsv1.2``.
  # tls_version:   # type=Optional[str]

  # [Optional]
  # If a default host is set and requires TLS/SSL,
  # specify the supported ciphers (default: None)
  # tls_ciphers:   # type=Optional[str]

  # [Optional]
  # Set to True to ignore TLS insecure warnings
  # (default: False).
  # tls_insecure: False  # type=bool

  # [Optional]
  # If a default host is set and requires user
  # authentication, specify the username ciphers (default: None)
  # username:   # type=Optional[str]

  # [Optional]
  # If a default host is set and requires user
  # authentication, specify the password ciphers (default: None)
  # password:   # type=Optional[str]

  # [Optional]
  # ID used to identify the client on the MQTT server
  # (default: None). If None is specified then
  # ``Config.get('device_id')`` will be used.
  # client_id:   # type=Optional[str]

  # [Optional]
  # Client timeout in seconds (default: 30 seconds).
  # timeout: 30  # type=Optional[int]

  # [Optional]
  # If specified, the MQTT plugin will listen for
  # messages on a topic in the format `{run_topic_prefix}/{device_id}.
  # When a message is received, it will interpret it as a JSON request
  # to execute, in the format
  # ``{"type": "request", "action": "plugin.action", "args": {...}}``.
  #
  # .. warning:: This parameter is mostly kept for backwards
  #     compatibility, but you should avoid it - unless the MQTT broker
  #     is on a personal safe network that you own, or it requires
  #     user authentication and it uses SSL. The reason is that the
  #     messages received on this topic won't be subject to token
  #     verification, allowing unauthenticated arbitrary command
  #     execution on the target host. If you still want the ability of
  #     running commands remotely over an MQTT broker, then you may
  #     consider creating a dedicated topic listener with an attached
  #     event hook on
  #     `MQTTMessageEvent <https://docs.platypush.tech/platypush/events/mqtt.html#platypush.message.event.mqtt.MQTTMessageEvent>`_. The
  #     hook can implement whichever authentication logic you like.
  # run_topic_prefix:   # type=Optional[str]

  # [Optional]
  # If specified, the MQTT plugin will listen for
  # messages on these topics. Use this parameter if you also want to
  # listen on other MQTT brokers other than the primary one. This
  # parameter supports a list of maps, where each item supports the
  # same arguments passed to the main configuration (host, port, topic,
  # password etc.). If host/port are omitted, then the host/port value
  # from the plugin configuration will be used. If any of the other
  # fields are omitted, then their default value will be used (usually
  # null). Example:
  #
  #     .. code-block:: yaml
  #
  #         listeners:
  #             # This listener use the default configured host/port
  #             - topics:
  #                   - topic1
  #                   - topic2
  #                   - topic3
  #
  #             # This will use a custom MQTT broker host
  #             - host: sensors
  #               port: 11883
  #               username: myuser
  #               password: secret
  #               topics:
  #                   - topic4
  #                   - topic5
  # listeners:   # type=Optional[Iterable[dict]]

  # [Optional]
  # How often the `RunnablePlugin.loop <https://docs.platypush.tech/platypush/plugins/.html#platypush.plugins.RunnablePlugin.loop>`_ function should be
  # executed (default: 15 seconds). *NOTE*: For back-compatibility
  # reasons, the `poll_seconds` argument is also supported, but it's
  # deprecated.
  # poll_interval: 15  # type=Optional[float]

  # [Optional]
  # How long we should wait for any running
  # threads/processes to stop before exiting (default: 5 seconds).
  # stop_timeout: 5  # type=Optional[float]

  # [Optional]
  # If set to True then the plugin will not monitor
  # for new events. This is useful if you want to run a plugin in
  # stateless mode and only leverage its actions, without triggering any
  # events. Defaults to False.
  # disable_monitor: False  # type=bool

Dependencies#

pip

pip install paho-mqtt

Alpine

apk add py3-paho-mqtt

Debian

apt install python3-paho-mqtt

Fedora

yum install python-paho-mqtt

Arch Linux

pacman -S python-paho-mqtt

Triggered events#

Actions#

Module reference#

class platypush.plugins.mqtt.MqttPlugin(host: str | None = None, port: int = 1883, topics: Iterable[str] | None = None, tls_cafile: str | None = None, tls_certfile: str | None = None, tls_keyfile: str | None = None, tls_version: str | None = None, tls_ciphers: str | None = None, tls_insecure: bool = False, username: str | None = None, password: str | None = None, client_id: str | None = None, timeout: int | None = 30, run_topic_prefix: str | None = None, listeners: Iterable[dict] | None = None, **kwargs)[source]#

Bases: RunnablePlugin

This plugin allows you to send custom message to a message queue compatible with the MQTT protocol, see https://mqtt.org/

__init__(host: str | None = None, port: int = 1883, topics: Iterable[str] | None = None, tls_cafile: str | None = None, tls_certfile: str | None = None, tls_keyfile: str | None = None, tls_version: str | None = None, tls_ciphers: str | None = None, tls_insecure: bool = False, username: str | None = None, password: str | None = None, client_id: str | None = None, timeout: int | None = 30, run_topic_prefix: str | None = None, listeners: Iterable[dict] | None = None, **kwargs)[source]#
Parameters:
  • host – If set, MQTT messages will by default routed to this host unless overridden in send_message (default: None)

  • port – If a default host is set, specify the listen port (default: 1883)

  • topics – If a default host is specified, then this list will include a default list of topics that should be subscribed on that broker at startup.

  • tls_cafile – If a default host is set and requires TLS/SSL, specify the certificate authority file (default: None)

  • tls_certfile – If a default host is set and requires TLS/SSL, specify the certificate file (default: None)

  • tls_keyfile – If a default host is set and requires TLS/SSL, specify the key file (default: None)

  • tls_version – If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it here (default: None). Supported versions: tls (automatic), tlsv1, tlsv1.1, tlsv1.2.

  • tls_ciphers – If a default host is set and requires TLS/SSL, specify the supported ciphers (default: None)

  • tls_insecure – Set to True to ignore TLS insecure warnings (default: False).

  • username – If a default host is set and requires user authentication, specify the username ciphers (default: None)

  • password – If a default host is set and requires user authentication, specify the password ciphers (default: None)

  • client_id – ID used to identify the client on the MQTT server (default: None). If None is specified then Config.get('device_id') will be used.

  • timeout – Client timeout in seconds (default: 30 seconds).

  • run_topic_prefix

    If specified, the MQTT plugin will listen for messages on a topic in the format {run_topic_prefix}/{device_id}. When a message is received, it will interpret it as a JSON request to execute, in the format ``{“type”: “request”, “action”: “plugin.action”, “args”: {…}}`.

    Warning

    This parameter is mostly kept for backwards compatibility, but you should avoid it - unless the MQTT broker is on a personal safe network that you own, or it requires user authentication and it uses SSL. The reason is that the messages received on this topic won’t be subject to token verification, allowing unauthenticated arbitrary command execution on the target host. If you still want the ability of running commands remotely over an MQTT broker, then you may consider creating a dedicated topic listener with an attached event hook on platypush.message.event.mqtt.MQTTMessageEvent. The hook can implement whichever authentication logic you like.

  • listeners

    If specified, the MQTT plugin will listen for messages on these topics. Use this parameter if you also want to listen on other MQTT brokers other than the primary one. This parameter supports a list of maps, where each item supports the same arguments passed to the main configuration (host, port, topic, password etc.). If host/port are omitted, then the host/port value from the plugin configuration will be used. If any of the other fields are omitted, then their default value will be used (usually null). Example:

    listeners:
        # This listener use the default configured host/port
        - topics:
              - topic1
              - topic2
              - topic3
    
        # This will use a custom MQTT broker host
        - host: sensors
          port: 11883
          username: myuser
          password: secret
          topics:
              - topic4
              - topic5
    

main()[source]#

Implementation of the main loop of the plugin.

on_exec_message(client: MqttClient, msg)[source]#

Message handler for (legacy) application requests over MQTT.

on_mqtt_message() Callable[[MqttClient, Any, paho.mqtt.client.MQTTMessage], Any][source]#

Default MQTT message handler. It forwards a platypush.message.event.mqtt.MQTTMessageEvent event to the bus.

publish(topic: str, msg: Any, qos: int = 0, reply_topic: str | None = None, **mqtt_kwargs)[source]#

Sends a message to a topic.

Parameters:
  • topic – Topic/channel where the message will be delivered

  • msg – Message to be sent. It can be a list, a dict, or a Message object.

  • qos – Quality of Service (_QoS_) for the message - see MQTT QoS (default: 0).

  • reply_topic – If a reply_topic is specified, then the action will wait for a response on this topic.

  • mqtt_kwargs – MQTT broker configuration (host, port, username, password etc.). See __init__() parameters.

send_message(*args, **kwargs)[source]#

Legacy alias for platypush.plugins.mqtt.MqttPlugin.publish().

start()#

Start the plugin.

stop()[source]#

Disconnect all the clients upon plugin stop.

subscribe(topic: str, **mqtt_kwargs)[source]#

Programmatically subscribe to a topic on an MQTT broker.

Messages received on this topic will trigger a platypush.message.event.mqtt.MQTTMessageEvent event that you can subscribe to.

Parameters:
  • topic – Topic to subscribe to.

  • mqtt_kwargs – MQTT broker configuration (host, port, username, password etc.). See __init__() parameters.

unsubscribe(topic: str, **mqtt_kwargs)[source]#

Programmatically unsubscribe from a topic on an MQTT broker.

Parameters:
  • topic – Topic to unsubscribe from.

  • mqtt_kwargs – MQTT broker configuration (host, port, username, password etc.). See __init__() parameters.

wait_stop(timeout=None)#

Wait until a stop event is received.