Source code for platypush.plugins.gpio

import threading
from typing import Any, Optional, Dict, Union, Collection

from platypush.context import get_bus
from platypush.message.event.gpio import GPIOEvent
from platypush.plugins import RunnablePlugin, action


[docs] class GpioPlugin(RunnablePlugin): """ This plugin can be used to interact with custom electronic devices connected to a Raspberry Pi (or compatible device) over GPIO pins. """
[docs] def __init__( self, pins: Optional[Dict[str, int]] = None, monitored_pins: Optional[Collection[Union[str, int]]] = None, mode: str = 'board', **kwargs ): """ :param mode: Specify ``board`` if you want to use the board PIN numbers, ``bcm`` for Broadcom PIN numbers (default: ``board``) :param pins: Custom `GPIO name` -> `PIN number` mapping. This can be useful if you want to reference your GPIO ports by name instead of PIN number. Example: .. code-block:: yaml pins: LED_1: 14, LED_2: 15, MOTOR: 16, SENSOR_1: 17 SENSOR_2: 18 :param monitored_pins: List of PINs to monitor. If a new value is detected on these pins then a :class:`platypush.message.event.gpio.GPIOEvent` event will be triggered. GPIO PINS can be referenced either by number or name, if a name is specified on the `pins` argument. """ super().__init__(**kwargs) self.mode = self._get_mode(mode) self._initialized = False self._init_lock = threading.RLock() self._initialized_pins = {} self._monitored_pins = monitored_pins or [] self.pins_by_name = pins if pins else {} self.pins_by_number = { number: name for (name, number) in self.pins_by_name.items() }
def _init_board(self): import RPi.GPIO as GPIO with self._init_lock: if self._initialized and GPIO.getmode(): return GPIO.setmode(self.mode) self._initialized = True def _get_pin_number(self, pin): try: pin = int(str(pin)) except ValueError: pin = self.pins_by_name.get(pin) if not pin: raise RuntimeError('Unknown PIN name: "{}"'.format(pin)) return pin @staticmethod def _get_mode(mode_str: str) -> int: import RPi.GPIO as GPIO mode_str = mode_str.upper() assert mode_str in ['BOARD', 'BCM'], 'Invalid mode: {}'.format(mode_str) return getattr(GPIO, mode_str) def on_gpio_event(self): def callback(pin: int): import RPi.GPIO as GPIO value = GPIO.input(pin) pin = self.pins_by_number.get(pin, pin) get_bus().post(GPIOEvent(pin=pin, value=value)) return callback
[docs] def main(self): import RPi.GPIO as GPIO if not self._monitored_pins: return # No need to start the monitor self._init_board() monitored_pins = [self._get_pin_number(pin) for pin in self._monitored_pins] for pin in monitored_pins: GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.add_event_detect(pin, GPIO.BOTH, callback=self.on_gpio_event()) self.wait_stop()
[docs] @action def write( self, pin: Union[int, str], value: Union[int, bool], name: Optional[str] = None ) -> Dict[str, Any]: """ Write a byte value to a pin. :param pin: PIN number or configured name :param name: Optional name for the written value (e.g. "temperature" or "humidity") :param value: Value to write Response:: output = { "name": <pin or metric name>, "pin": <pin>, "value": <value>, "method": "write" } """ import RPi.GPIO as GPIO self._init_board() name = name or pin pin = self._get_pin_number(pin) if pin not in self._initialized_pins or self._initialized_pins[pin] != GPIO.OUT: GPIO.setup(pin, GPIO.OUT) self._initialized_pins[pin] = GPIO.OUT GPIO.setup(pin, GPIO.OUT) GPIO.output(pin, value) return { 'name': name, 'pin': pin, 'value': value, 'method': 'write', }
[docs] @action def read(self, pin: Union[int, str], name: Optional[str] = None) -> Dict[str, Any]: """ Reads a value from a PIN. :param pin: PIN number or configured name. :param name: Optional name for the read value (e.g. "temperature" or "humidity") Response:: output = { "name": <pin number or pin/metric name>, "pin": <pin>, "value": <value>, "method": "read" } """ import RPi.GPIO as GPIO self._init_board() name = name or pin pin = self._get_pin_number(pin) if pin not in self._initialized_pins: GPIO.setup(pin, GPIO.IN) self._initialized_pins[pin] = GPIO.IN val = GPIO.input(pin) return { 'name': name, 'pin': pin, 'value': val, 'method': 'read', }
@action def get_measurement(self, pin=None): if pin is None: return self.read_all() return self.read(pin)
[docs] @action def read_all(self): """ Reads the values from all the configured PINs and returns them as a list. It will raise a RuntimeError if no PIN mappings were configured. """ if not self.pins_by_number: raise RuntimeError("No PIN mappings were provided/configured") values = [] for pin, name in self.pins_by_number.items(): # noinspection PyUnresolvedReferences values.append(self.read(pin=pin, name=name).output) return values
[docs] @action def cleanup(self): """ Cleanup the state of the GPIO and resets PIN values. """ import RPi.GPIO as GPIO with self._init_lock: if self._initialized: GPIO.cleanup() self._initialized_pins = {} self._initialized = False
# vim:sw=4:ts=4:et: