kafka#

Description#

Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)

Configuration#

kafka:
  # [Optional]
  # Default Kafka server name or address. If None (default),
  # then it has to be specified when calling the ``send_message`` action.
  # host:   # type=Optional[str]

  # [Optional]
  # Default Kafka server port (default: 9092).
  # port: 9092  # type=int

  # [Optional]
  # If specified, the Kafka plugin will listen for
  # messages on these topics. Use this parameter if you also want to
  # listen on other Kafka brokers other than the primary one. This
  # parameter supports a list of maps, where each item supports the
  # same arguments passed to the main configuration (host, port, topic,
  # password etc.). If host/port are omitted, then the host/port value
  # from the plugin configuration will be used. If any of the other
  # fields are omitted, then their default value will be used (usually
  # null). Example:
  #
  #     .. code-block:: yaml
  #
  #         listeners:
  #             # This listener use the default configured host/port
  #             - topics:
  #                   - topic1
  #                   - topic2
  #                   - topic3
  #
  #             # This will use a custom MQTT broker host
  #             - host: sensors
  #               port: 19200
  #               username: myuser
  #               password: secret
  #               topics:
  #                   - topic4
  #                   - topic5
  # listeners:   # type=Optional[Iterable[dict]]

  # [Optional]
  # Seconds to wait before retrying to
  # connect to the Kafka server after a connection error (default: 5).
  # connection_retry_secs: 5.0  # type=float

  # [Optional]
  # How often the `RunnablePlugin.loop <https://docs.platypush.tech/platypush/plugins/.html#platypush.plugins.RunnablePlugin.loop>`_ function should be
  # executed (default: 15 seconds). *NOTE*: For back-compatibility
  # reasons, the `poll_seconds` argument is also supported, but it's
  # deprecated.
  # poll_interval: 15  # type=Optional[float]

  # [Optional]
  # How long we should wait for any running
  # threads/processes to stop before exiting (default: 5 seconds).
  # stop_timeout: 5  # type=Optional[float]

  # [Optional]
  # If set to True then the plugin will not monitor
  # for new events. This is useful if you want to run a plugin in
  # stateless mode and only leverage its actions, without triggering any
  # events. Defaults to False.
  # disable_monitor: False  # type=bool

Dependencies#

pip

pip install kafka

Debian

apt install python-kafka

Fedora

yum install python-kafka

Triggered events#

Actions#

Module reference#

class platypush.plugins.kafka.KafkaPlugin(host: str | None = None, port: int = 9092, listeners: Iterable[dict] | None = None, connection_retry_secs: float = 5.0, **kwargs)[source]#

Bases: RunnablePlugin

Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)

__init__(host: str | None = None, port: int = 9092, listeners: Iterable[dict] | None = None, connection_retry_secs: float = 5.0, **kwargs)[source]#
Parameters:
  • host – Default Kafka server name or address. If None (default), then it has to be specified when calling the send_message action.

  • port – Default Kafka server port (default: 9092).

  • connection_retry_secs – Seconds to wait before retrying to connect to the Kafka server after a connection error (default: 5).

  • listeners

    If specified, the Kafka plugin will listen for messages on these topics. Use this parameter if you also want to listen on other Kafka brokers other than the primary one. This parameter supports a list of maps, where each item supports the same arguments passed to the main configuration (host, port, topic, password etc.). If host/port are omitted, then the host/port value from the plugin configuration will be used. If any of the other fields are omitted, then their default value will be used (usually null). Example:

    listeners:
        # This listener use the default configured host/port
        - topics:
              - topic1
              - topic2
              - topic3
    
        # This will use a custom MQTT broker host
        - host: sensors
          port: 19200
          username: myuser
          password: secret
          topics:
              - topic4
              - topic5
    

main()[source]#

Implementation of the main loop of the plugin.

publish(msg: str | list | dict | tuple | bytes, topic: str, **kwargs)[source]#
Parameters:
  • msg – Message to send.

  • topic – Topic to send the message to.

  • kwargs – Additional arguments to pass to the KafkaConsumer, including host and port.

send_message(msg: str | list | dict | tuple | bytes, topic: str, **kwargs)[source]#

Alias for publish().

start()#

Start the plugin.

stop()[source]#

Stop the plugin.

subscribe(topic: str, **kwargs)[source]#

Subscribe to a topic.

Parameters:
  • topic – Topic to subscribe to.

  • kwargs – Additional arguments to pass to the KafkaConsumer, including host and port.

unsubscribe(**kwargs)[source]#

Unsubscribe from all the topics on a consumer.

Parameters:

kwargs – Additional arguments to pass to the KafkaConsumer, including host and port.

wait_stop(timeout=None)#

Wait until a stop event is received.