import base64
import json
import logging
from collections import defaultdict
from threading import RLock, Thread
from typing import Dict, Iterable, Optional, Union
from kafka import KafkaConsumer, KafkaProducer
from platypush.message.event.kafka import KafkaMessageEvent
from platypush.plugins import RunnablePlugin, action
[docs]
class KafkaPlugin(RunnablePlugin):
"""
Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)
"""
[docs]
def __init__(
self,
host: Optional[str] = None,
port: int = 9092,
listeners: Optional[Iterable[dict]] = None,
connection_retry_secs: float = 5.0,
**kwargs,
):
"""
:param host: Default Kafka server name or address. If None (default),
then it has to be specified when calling the ``send_message`` action.
:param port: Default Kafka server port (default: 9092).
:param connection_retry_secs: Seconds to wait before retrying to
connect to the Kafka server after a connection error (default: 5).
:param listeners: If specified, the Kafka plugin will listen for
messages on these topics. Use this parameter if you also want to
listen on other Kafka brokers other than the primary one. This
parameter supports a list of maps, where each item supports the
same arguments passed to the main configuration (host, port, topic,
password etc.). If host/port are omitted, then the host/port value
from the plugin configuration will be used. If any of the other
fields are omitted, then their default value will be used (usually
null). Example:
.. code-block:: yaml
listeners:
# This listener use the default configured host/port
- topics:
- topic1
- topic2
- topic3
# This will use a custom MQTT broker host
- host: sensors
port: 19200
username: myuser
password: secret
topics:
- topic4
- topic5
"""
super().__init__(**kwargs)
self.host = host
self.port = port
self._conn_retry_secs = connection_retry_secs
self._listeners = listeners or []
# `server:port` -> KafkaProducer mapping
self.producers: Dict[str, KafkaProducer] = {}
# `server:port` -> KafkaConsumer mapping
self.consumers: Dict[str, KafkaConsumer] = {}
# Synchronization locks for the producers/consumers maps,
# since python-kafka is not thread-safe
self._producers_locks = defaultdict(RLock)
self._consumers_locks = defaultdict(RLock)
# Kafka can be very noisy
logging.getLogger('kafka').setLevel(logging.ERROR)
def _get_srv_str(
self, host: Optional[str] = None, port: Optional[int] = None
) -> str:
if not host:
host = self.host
assert host, 'No Kafka server specified'
if not port:
port = self.port
return f'{host}:{port}'
def _get_producer(
self, host: Optional[str] = None, port: Optional[int] = None, **kwargs
):
srv_str = self._get_srv_str(host, port)
with self._producers_locks[srv_str]:
if srv_str not in self.producers:
self.producers[srv_str] = KafkaProducer(
bootstrap_servers=srv_str, **kwargs
)
return self.producers[srv_str]
def _get_consumer(
self, host: Optional[str] = None, port: Optional[int] = None, **kwargs
):
srv_str = self._get_srv_str(host, port)
with self._consumers_locks[srv_str]:
if srv_str not in self.consumers:
self.consumers[srv_str] = KafkaConsumer(
bootstrap_servers=srv_str, **kwargs
)
return self.consumers[srv_str]
def _on_msg(self, record, host: str, port: int):
try:
msg = record.value.decode()
except UnicodeDecodeError:
msg = base64.b64encode(record.value).decode()
try:
msg = json.loads(msg)
except (TypeError, ValueError):
pass
self._bus.post(
KafkaMessageEvent(msg=msg, topic=record.topic, host=host, port=port)
)
def _consumer_monitor(self, consumer: KafkaConsumer, host: str, port: int):
while not self.should_stop():
try:
for msg in consumer:
self._on_msg(msg, host=host, port=port)
if self.should_stop():
break
except Exception as e:
if not self.should_stop():
self.logger.exception(e)
self.logger.warning(
'Kafka connection error to %s:%d, reconnecting in %f seconds',
host,
port,
self._conn_retry_secs,
)
self.wait_stop(self._conn_retry_secs)
[docs]
@action
def publish(self, msg: Union[str, list, dict, tuple, bytes], topic: str, **kwargs):
"""
:param msg: Message to send.
:param topic: Topic to send the message to.
:param kwargs: Additional arguments to pass to the KafkaConsumer,
including ``host`` and ``port``.
"""
if isinstance(msg, tuple):
msg = list(msg)
if isinstance(msg, (dict, list)):
msg = json.dumps(msg)
if not isinstance(msg, bytes):
msg = str(msg).encode()
producer = self._get_producer(**kwargs)
producer.send(topic, msg)
producer.flush()
[docs]
@action
def send_message(
self, msg: Union[str, list, dict, tuple, bytes], topic: str, **kwargs
):
"""
Alias for :meth:`.publish`.
"""
return self.send_message(msg=msg, topic=topic, **kwargs)
[docs]
@action
def subscribe(self, topic: str, **kwargs):
"""
Subscribe to a topic.
:param topic: Topic to subscribe to.
:param kwargs: Additional arguments to pass to the KafkaConsumer,
including ``host`` and ``port``.
"""
consumer = self._get_consumer(**kwargs)
consumer.subscribe([topic])
[docs]
@action
def unsubscribe(self, **kwargs):
"""
Unsubscribe from all the topics on a consumer.
:param kwargs: Additional arguments to pass to the KafkaConsumer,
including ``host`` and ``port``.
"""
consumer = self._get_consumer(**kwargs)
consumer.unsubscribe()
[docs]
def main(self):
for listener in self._listeners:
host = listener.get('host', self.host)
port = listener.get('port', self.port)
topics = listener.get('topics')
if not topics:
continue
consumer = self._get_consumer(
host=host,
port=port,
group_id='platypush',
auto_offset_reset='earliest',
)
consumer.subscribe(topics)
Thread(
target=self._consumer_monitor,
args=(consumer,),
kwargs={'host': host, 'port': port},
daemon=True,
).start()
self.wait_stop()
[docs]
def stop(self):
super().stop()
for srv, producer in self.producers.items():
try:
producer.flush()
producer.close()
except Exception as e:
self.logger.warning('Error while closing Kafka producer %s: %s', srv, e)
for srv, consumer in self.consumers.items():
try:
consumer.close()
except Exception as e:
self.logger.warning('Error while closing Kafka consumer %s: %s', srv, e)
# vim:sw=4:ts=4:et: