Source code for platypush.plugins.hid

from collections.abc import Iterable
from multiprocessing import Process
from typing import Dict, List, Optional
from time import sleep

import hid

from platypush.context import get_bus
from platypush.message.event.hid import (
    HidDeviceConnectedEvent,
    HidDeviceDataEvent,
    HidDeviceDisconnectedEvent,
)
from platypush.plugins import RunnablePlugin, action
from platypush.schemas.hid import HidDeviceSchema, HidMonitoredDeviceSchema


[docs] class HidPlugin(RunnablePlugin): """ This plugin can be used to interact directly with HID devices (including Bluetooth, USB and several serial and wireless devices) over the raw interface. This is the preferred way of communicating with joypads. Note that on Linux reading from the devices requires the user running the Platypush service to have (at least) read access to the ``/dev/hidraw*`` devices. However, it is still possible to get connected/disconnected events even without having to open a connection to the device. A way to make HID raw devices accessible to e.g. a particular user group is via udev rules. For example, create a file named ``/etc/udev/rules.d/99-hid.rules`` with the following content:: # Make all /dev/hidraw* devices accessible in read/write to users in # the group input KERNEL=="hidraw*", GROUP="input", MODE="0660" A more granular solution is to provide read and/or write access only to a specific device that you want to access, and only to the specific user running the Platypush service:: KERNEL=="hidraw*", ATTRS{idVendor}=="1234", ATTRS{idProduct}=="5678", USER="user", MODE="0660" If you don't want to reboot the device after adding the rules, then you can reload the rules for udev service and re-trigger them:: # udevadm control --reload && udevadm trigger """
[docs] def __init__( self, monitored_devices: Optional[Iterable[dict]] = None, poll_seconds: int = 1, **kwargs, ): """ :param monitored_devices: Map of devices that should be monitored for new data. Format (note that all the device filtering attributes are optional): .. schema:: hid.HidMonitoredDeviceSchema(many=True) :param poll_seconds: How often the plugin should check for changes in the list of devices (default: 1 second). """ super().__init__(**kwargs) self._poll_seconds = poll_seconds self._filters = HidMonitoredDeviceSchema().load( monitored_devices or [], many=True ) self._device_monitors: Dict[str, Process] = {} self._available_devices: Dict[str, dict] = {}
[docs] def main(self): while not self.should_stop(): scanned_devices = { dev['path']: dev for dev in self.get_devices().output # type: ignore } self._handle_device_events(scanned_devices) self._available_devices = scanned_devices if self._poll_seconds and self._poll_seconds > 0: sleep(self._poll_seconds)
[docs] def stop(self): device_monitors = self._device_monitors.copy() for monitored_device in device_monitors: self._unregister_device_monitor(monitored_device) super().stop()
[docs] @action def get_devices(self) -> List[dict]: """ Get the HID devices available on the host. :return: .. schema:: hid.HidDeviceSchema(many=True) """ return list( # type: ignore { dev['path']: HidDeviceSchema().load(dev) for dev in hid.enumerate() # type: ignore }.values() )
def _get_monitor_rule(self, device: dict) -> Optional[dict]: """ :return: .. schema:: hid.HidMonitoredDeviceSchema """ matching_rules = [ rule for rule in (self._filters or []) if all( rule[attr] == device.get(attr) for attr in ( 'path', 'serial_number', 'vendor_id', 'product_id', 'manufacturer_string', 'product_string', ) if rule.get(attr) ) ] if matching_rules: return matching_rules[0] def _device_monitor(self, dev_def: dict, rule: dict): path = dev_def['path'] data_size = rule['data_size'] poll_seconds = rule['poll_seconds'] notify_only_if_changed = rule['notify_only_if_changed'] last_data = None self.logger.info(f'Starting monitor for device {path}') def wait(): if poll_seconds and poll_seconds > 0: sleep(poll_seconds) while True: device = None try: if not device: device = hid.Device(dev_def['vendor_id'], dev_def['product_id']) # type: ignore data = device.read(data_size) except Exception as e: if device: self.logger.warning(f'Read error from {path}: {e}') device = None sleep(5) continue if not notify_only_if_changed or data != last_data: data_dump = ''.join(f'{x:02x}' for x in data) get_bus().post(HidDeviceDataEvent(data=data_dump, **dev_def)) last_data = data wait() def _register_device_monitor(self, device: dict, rule: dict): """ Register a monitor for a device. """ path = device['path'] # Make sure that no other monitors are registered for this device self._unregister_device_monitor(path) monitor_proc = self._device_monitors[path] = Process( target=self._device_monitor, args=(device, rule) ) monitor_proc.start() def _unregister_device_monitor(self, path: str): """ Unregister a monitor for a device. """ monitor_proc = self._device_monitors.get(path) if monitor_proc and monitor_proc.is_alive(): self.logger.info(f'Terminating monitor for device {path}') try: monitor_proc.terminate() monitor_proc.join(2) if monitor_proc.is_alive(): monitor_proc.kill() except Exception as e: self.logger.warning(f'Error while terminating monitor for {path}: {e}') del self._device_monitors[path] def _handle_new_devices(self, scanned_devices: dict): """ Handles connection events and monitor registering. """ scanned_device_paths = set(scanned_devices.keys()) available_device_paths = set(self._available_devices) new_device_paths = scanned_device_paths.difference(available_device_paths) for path in new_device_paths: device = scanned_devices[path] get_bus().post(HidDeviceConnectedEvent(**device)) monitor_rule = self._get_monitor_rule(device) if monitor_rule: self._register_device_monitor(device, monitor_rule) def _handle_disconnected_devices(self, scanned_devices: dict): """ Handles disconnection events and monitor unregistering. """ scanned_device_paths = set(scanned_devices.keys()) available_device_paths = set(self._available_devices) lost_device_paths = available_device_paths.difference(scanned_device_paths) for path in lost_device_paths: device = self._available_devices.get(path) if device: get_bus().post(HidDeviceDisconnectedEvent(**device)) self._unregister_device_monitor(device['path']) def _handle_device_events(self, scanned_devices: dict): """ Handles connected/disconnected device events and register/unregister the monitors based on the user-provided rules when required. """ self._handle_new_devices(scanned_devices) self._handle_disconnected_devices(scanned_devices)