kafka#

Description#

Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)

Configuration#

kafka:
  # [Optional]
  # Default Kafka server name or address. If None (default),
  # then it has to be specified when calling the ``send_message`` action.
  # host:   # type=Optional[str]

  # [Optional]
  # Default Kafka server port (default: 9092).
  # port: 9092  # type=int

  # [Optional]
  # If specified, the Kafka plugin will listen for
  # messages on these topics. Use this parameter if you also want to
  # listen on other Kafka brokers other than the primary one. This
  # parameter supports a list of maps, where each item supports the
  # same arguments passed to the main configuration (host, port, topic,
  # password etc.). If host/port are omitted, then the host/port value
  # from the plugin configuration will be used. If any of the other
  # fields are omitted, then their default value will be used (usually
  # null). Example:
  #
  #     .. code-block:: yaml
  #
  #         listeners:
  #             # This listener use the default configured host/port
  #             - topics:
  #                   - topic1
  #                   - topic2
  #                   - topic3
  #
  #             # This will use a custom MQTT broker host
  #             - host: sensors
  #               port: 19200
  #               username: myuser
  #               password: secret
  #               group_id: mygroup
  #               compression_type: gzip
  #               topics:
  #                   - topic4
  #                   - topic5
  # listeners:   # type=Optional[Iterable[dict]]

  # [Optional]
  # Seconds to wait before retrying to
  # connect to the Kafka server after a connection error (default: 5).
  # connection_retry_secs: 5.0  # type=float

  # [Optional]
  # Default Kafka consumer group ID (default: platypush).
  # group_id: platypush  # type=str

  # [Optional]
  # How often the `RunnablePlugin.loop <https://docs.platypush.tech/platypush/plugins/.html#platypush.plugins.RunnablePlugin.loop>`_ function should be
  # executed (default: 15 seconds). *NOTE*: For back-compatibility
  # reasons, the `poll_seconds` argument is also supported, but it's
  # deprecated.
  # poll_interval: 15  # type=Optional[float]

  # [Optional]
  # How long we should wait for any running
  # threads/processes to stop before exiting (default: 5 seconds).
  # stop_timeout: 5  # type=Optional[float]

  # [Optional]
  # If set to True then the plugin will not monitor
  # for new events. This is useful if you want to run a plugin in
  # stateless mode and only leverage its actions, without triggering any
  # events. Defaults to False.
  # disable_monitor: False  # type=bool

Dependencies#

pip

pip install kafka-python-ng

Triggered events#

Actions#

Module reference#

class platypush.plugins.kafka.KafkaPlugin(host: str | None = None, port: int = 9092, listeners: Iterable[dict] | None = None, connection_retry_secs: float = 5.0, group_id: str = 'platypush', **kwargs)[source]#

Bases: RunnablePlugin

Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)

__init__(host: str | None = None, port: int = 9092, listeners: Iterable[dict] | None = None, connection_retry_secs: float = 5.0, group_id: str = 'platypush', **kwargs)[source]#
Parameters:
  • host – Default Kafka server name or address. If None (default), then it has to be specified when calling the send_message action.

  • port – Default Kafka server port (default: 9092).

  • connection_retry_secs – Seconds to wait before retrying to connect to the Kafka server after a connection error (default: 5).

  • group_id – Default Kafka consumer group ID (default: platypush).

  • listeners

    If specified, the Kafka plugin will listen for messages on these topics. Use this parameter if you also want to listen on other Kafka brokers other than the primary one. This parameter supports a list of maps, where each item supports the same arguments passed to the main configuration (host, port, topic, password etc.). If host/port are omitted, then the host/port value from the plugin configuration will be used. If any of the other fields are omitted, then their default value will be used (usually null). Example:

    listeners:
        # This listener use the default configured host/port
        - topics:
              - topic1
              - topic2
              - topic3
    
        # This will use a custom MQTT broker host
        - host: sensors
          port: 19200
          username: myuser
          password: secret
          group_id: mygroup
          compression_type: gzip
          topics:
              - topic4
              - topic5
    

publish(msg: str | list | dict | tuple | bytes, topic: str, key: str | None = None, timeout: float | None = 60.0, compression_type: str | None = None, **kwargs)[source]#
Parameters:
  • msg – Message to send.

  • topic – Topic to send the message to.

  • key – For hashed-based partitioning, the key to use to determine the partition.

  • timeout – Timeout in seconds for the message to be sent.

  • compression_type – Compression type to use for the message (e.g. gzip).

  • kwargs – Additional arguments to pass to the KafkaConsumer, including host and port.

send_message(msg: str | list | dict | tuple | bytes, topic: str, **kwargs)[source]#

Alias for publish().

subscribe(topic: str, group_id: str | None = None, **kwargs)[source]#

Subscribe to a topic.

Parameters:
  • topic – Topic to subscribe to.

  • group_id – Group ID to use for the consumer. If None, then the group ID from the plugin configuration will be used.

  • kwargs – Additional arguments to pass to the KafkaConsumer, including host and port.

unsubscribe(**kwargs)[source]#

Unsubscribe from all the topics on a consumer.

Parameters:

kwargs – Additional arguments to pass to the KafkaConsumer, including host and port.