kafka
#
Description#
Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)
Configuration#
kafka:
# [Optional]
# Default Kafka server name or address. If None (default),
# then it has to be specified when calling the ``send_message`` action.
# host: # type=Optional[str]
# [Optional]
# Default Kafka server port (default: 9092).
# port: 9092 # type=int
# [Optional]
# If specified, the Kafka plugin will listen for
# messages on these topics. Use this parameter if you also want to
# listen on other Kafka brokers other than the primary one. This
# parameter supports a list of maps, where each item supports the
# same arguments passed to the main configuration (host, port, topic,
# password etc.). If host/port are omitted, then the host/port value
# from the plugin configuration will be used. If any of the other
# fields are omitted, then their default value will be used (usually
# null). Example:
#
# .. code-block:: yaml
#
# listeners:
# # This listener use the default configured host/port
# - topics:
# - topic1
# - topic2
# - topic3
#
# # This will use a custom MQTT broker host
# - host: sensors
# port: 19200
# username: myuser
# password: secret
# group_id: mygroup
# compression_type: gzip
# topics:
# - topic4
# - topic5
# listeners: # type=Optional[Iterable[dict]]
# [Optional]
# Seconds to wait before retrying to
# connect to the Kafka server after a connection error (default: 5).
# connection_retry_secs: 5.0 # type=float
# [Optional]
# Default Kafka consumer group ID (default: platypush).
# group_id: platypush # type=str
# [Optional]
# How often the `RunnablePlugin.loop <https://docs.platypush.tech/platypush/plugins/.html#platypush.plugins.RunnablePlugin.loop>`_ function should be
# executed (default: 15 seconds). *NOTE*: For back-compatibility
# reasons, the `poll_seconds` argument is also supported, but it's
# deprecated.
# poll_interval: 15 # type=Optional[float]
# [Optional]
# How long we should wait for any running
# threads/processes to stop before exiting (default: 5 seconds).
# stop_timeout: 5 # type=Optional[float]
# [Optional]
# If set to True then the plugin will not monitor
# for new events. This is useful if you want to run a plugin in
# stateless mode and only leverage its actions, without triggering any
# events. Defaults to False.
# disable_monitor: False # type=bool
Dependencies#
pip
pip install kafka-python-ng
Triggered events#
Actions#
Module reference#
- class platypush.plugins.kafka.KafkaPlugin(host: str | None = None, port: int = 9092, listeners: Iterable[dict] | None = None, connection_retry_secs: float = 5.0, group_id: str = 'platypush', **kwargs)[source]#
Bases:
RunnablePlugin
Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)
- __init__(host: str | None = None, port: int = 9092, listeners: Iterable[dict] | None = None, connection_retry_secs: float = 5.0, group_id: str = 'platypush', **kwargs)[source]#
- Parameters:
host – Default Kafka server name or address. If None (default), then it has to be specified when calling the
send_message
action.port – Default Kafka server port (default: 9092).
connection_retry_secs – Seconds to wait before retrying to connect to the Kafka server after a connection error (default: 5).
group_id – Default Kafka consumer group ID (default: platypush).
listeners –
If specified, the Kafka plugin will listen for messages on these topics. Use this parameter if you also want to listen on other Kafka brokers other than the primary one. This parameter supports a list of maps, where each item supports the same arguments passed to the main configuration (host, port, topic, password etc.). If host/port are omitted, then the host/port value from the plugin configuration will be used. If any of the other fields are omitted, then their default value will be used (usually null). Example:
listeners: # This listener use the default configured host/port - topics: - topic1 - topic2 - topic3 # This will use a custom MQTT broker host - host: sensors port: 19200 username: myuser password: secret group_id: mygroup compression_type: gzip topics: - topic4 - topic5
- publish(msg: str | list | dict | tuple | bytes, topic: str, key: str | None = None, timeout: float | None = 60.0, compression_type: str | None = None, **kwargs)[source]#
- Parameters:
msg – Message to send.
topic – Topic to send the message to.
key – For hashed-based partitioning, the key to use to determine the partition.
timeout – Timeout in seconds for the message to be sent.
compression_type – Compression type to use for the message (e.g.
gzip
).kwargs – Additional arguments to pass to the KafkaConsumer, including
host
andport
.
- send_message(msg: str | list | dict | tuple | bytes, topic: str, **kwargs)[source]#
Alias for
publish()
.
- subscribe(topic: str, group_id: str | None = None, **kwargs)[source]#
Subscribe to a topic.
- Parameters:
topic – Topic to subscribe to.
group_id – Group ID to use for the consumer. If None, then the group ID from the plugin configuration will be used.
kwargs – Additional arguments to pass to the KafkaConsumer, including
host
andport
.