kafka
#
Description#
Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)
Configuration#
kafka:
# [Optional]
# Default Kafka server name or address. If None (default),
# then it has to be specified when calling the ``send_message`` action.
# host: # type=Optional[str]
# [Optional]
# Default Kafka server port (default: 9092).
# port: 9092 # type=int
# [Optional]
# If specified, the Kafka plugin will listen for
# messages on these topics. Use this parameter if you also want to
# listen on other Kafka brokers other than the primary one. This
# parameter supports a list of maps, where each item supports the
# same arguments passed to the main configuration (host, port, topic,
# password etc.). If host/port are omitted, then the host/port value
# from the plugin configuration will be used. If any of the other
# fields are omitted, then their default value will be used (usually
# null). Example:
#
# .. code-block:: yaml
#
# listeners:
# # This listener use the default configured host/port
# - topics:
# - topic1
# - topic2
# - topic3
#
# # This will use a custom MQTT broker host
# - host: sensors
# port: 19200
# username: myuser
# password: secret
# topics:
# - topic4
# - topic5
# listeners: # type=Optional[Iterable[dict]]
# [Optional]
# Seconds to wait before retrying to
# connect to the Kafka server after a connection error (default: 5).
# connection_retry_secs: 5.0 # type=float
# [Optional]
# How often the `RunnablePlugin.loop <https://docs.platypush.tech/platypush/plugins/.html#platypush.plugins.RunnablePlugin.loop>`_ function should be
# executed (default: 15 seconds). *NOTE*: For back-compatibility
# reasons, the `poll_seconds` argument is also supported, but it's
# deprecated.
# poll_interval: 15 # type=Optional[float]
# [Optional]
# How long we should wait for any running
# threads/processes to stop before exiting (default: 5 seconds).
# stop_timeout: 5 # type=Optional[float]
# [Optional]
# If set to True then the plugin will not monitor
# for new events. This is useful if you want to run a plugin in
# stateless mode and only leverage its actions, without triggering any
# events. Defaults to False.
# disable_monitor: False # type=bool
Dependencies#
pip
pip install kafka
Debian
apt install python-kafka
Fedora
yum install python-kafka
Triggered events#
Actions#
Module reference#
- class platypush.plugins.kafka.KafkaPlugin(host: str | None = None, port: int = 9092, listeners: Iterable[dict] | None = None, connection_retry_secs: float = 5.0, **kwargs)[source]#
Bases:
RunnablePlugin
Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)
- __init__(host: str | None = None, port: int = 9092, listeners: Iterable[dict] | None = None, connection_retry_secs: float = 5.0, **kwargs)[source]#
- Parameters:
host – Default Kafka server name or address. If None (default), then it has to be specified when calling the
send_message
action.port – Default Kafka server port (default: 9092).
connection_retry_secs – Seconds to wait before retrying to connect to the Kafka server after a connection error (default: 5).
listeners –
If specified, the Kafka plugin will listen for messages on these topics. Use this parameter if you also want to listen on other Kafka brokers other than the primary one. This parameter supports a list of maps, where each item supports the same arguments passed to the main configuration (host, port, topic, password etc.). If host/port are omitted, then the host/port value from the plugin configuration will be used. If any of the other fields are omitted, then their default value will be used (usually null). Example:
listeners: # This listener use the default configured host/port - topics: - topic1 - topic2 - topic3 # This will use a custom MQTT broker host - host: sensors port: 19200 username: myuser password: secret topics: - topic4 - topic5
- publish(msg: str | list | dict | tuple | bytes, topic: str, **kwargs)[source]#
- Parameters:
msg – Message to send.
topic – Topic to send the message to.
kwargs – Additional arguments to pass to the KafkaConsumer, including
host
andport
.
- send_message(msg: str | list | dict | tuple | bytes, topic: str, **kwargs)[source]#
Alias for
publish()
.
- start()#
Start the plugin.
- subscribe(topic: str, **kwargs)[source]#
Subscribe to a topic.
- Parameters:
topic – Topic to subscribe to.
kwargs – Additional arguments to pass to the KafkaConsumer, including
host
andport
.
- unsubscribe(**kwargs)[source]#
Unsubscribe from all the topics on a consumer.
- Parameters:
kwargs – Additional arguments to pass to the KafkaConsumer, including
host
andport
.
- wait_stop(timeout=None)#
Wait until a stop event is received.