arduino#

Description#

Interact with an Arduino connected to the host machine over USB using the Firmata protocol.

You have two options to communicate with an Arduino-compatible board over USB:

  • Use this plugin if you want to use the general-purpose Firmata protocol - in this case most of your processing logic will be on the host side and you can read/write data to the Arduino transparently.

  • Use the platypush.plugins.serial.SerialPlugin if instead you want to run more custom logic on the Arduino and communicate back with the host computer through JSON formatted messages.

Download and flash the Standard Firmata firmware to the Arduino in order to use this plugin.

Configuration#

arduino:
  # [Optional]
  # Default board name or path (e.g. ``COM3`` on Windows or ``/dev/ttyUSB0`` on Unix). If not set
  # then the plugin will attempt an auto-discovery.
  # board:   # type=Optional[str]

  # [Optional]
  # Default board type. It can be 'mega', 'due' or 'nano'. Leave empty for auto-detection.
  # board_type:   # type=Optional[str]

  # [Optional]
  # Default serial baud rate (default: 9600)
  # baud_rate: 9600  # type=int

  # [Optional]
  # Optional analog PINs map name->pin_number.
  # analog_pins:   # type=Optional[Dict[str, int]]

  # [Optional]
  # Optional digital PINs map name->pin_number.
  # digital_pins:   # type=Optional[Dict[str, int]]

  # [Optional]
  # Board communication timeout in seconds.
  # timeout: 20.0  # type=float

  # [Optional]
  # Optional mapping of conversion functions to apply to the analog values read from a
  # certain PIN. The key can either be the PIN number or the name as specified in ``analog_pins``, the value
  # can be a function that takes an argument and transforms it or its lambda string representation.
  # Note that ``analog_read`` returns by default float values in the range [0.0, 1.0]. Example:
  #
  # .. code-block:: yaml
  #
  #     arduino:
  #         board: /dev/ttyUSB0
  #         analog_pins:
  #             temperature: 1  # Analog PIN 1
  #
  #         conv_functions:
  #             temperature: 'lambda t: t * 500.0'
  # conv_functions:   # type=Optional[Dict[Union[str, int], Union[str, Callable]]]

  # [Optional]
  # How often the `RunnablePlugin.loop <https://docs.platypush.tech/platypush/plugins/.html#platypush.plugins.RunnablePlugin.loop>`_ function should be
  # executed (default: 15 seconds). *NOTE*: For back-compatibility
  # reasons, the `poll_seconds` argument is also supported, but it's
  # deprecated.
  # poll_interval: 1.0  # type=float

  # [Optional]
  # A number, numeric pair or mapping of ``str`` to
  # number/numeric pair representing the thresholds for the sensor.
  #
  # Examples:
  #
  #     .. code-block:: yaml
  #
  #         # Any value below 25 from any sensor will trigger a
  #         # SensorDataBelowThresholdEvent, if the previous value was
  #         # equal or above, and any value above 25 will trigger a
  #         # SensorDataAboveThresholdEvent, if the previous value was
  #         # equal or below
  #         thresholds: 25.0
  #
  #         # Same as above, but the threshold is only applied to
  #         # ``temperature`` readings
  #         thresholds:
  #             temperature: 25.0
  #
  #         # Any value below 20 from any sensor will trigger a
  #         # SensorDataBelowThresholdEvent, if the previous value was
  #         # equal or above, and any value above 25 will trigger a
  #         # SensorDataAboveThresholdEvent, if the previous value was
  #         # equal or below (hysteresis configuration with double
  #         # threshold)
  #         thresholds:
  #             - 20.0
  #             - 25.0
  #
  #         # Same as above, but the threshold is only applied to
  #         # ``temperature`` readings
  #         thresholds:
  #             temperature:
  #                 - 20.0
  #                 - 25.0
  # thresholds:   # type=Union[float, int, Tuple[Union[float, int], Union[float, int]], Mapping[str, Union[float, int, Tuple[Union[float, int], Union[float, int]]]], NoneType]

  # [Optional]
  # If set, then the sensor change events will be
  # triggered only if the difference between the new value and the
  # previous value is higher than the specified tolerance. For example,
  # if the sensor data is mapped to a dictionary::
  #
  #     {
  #         "temperature": 0.01,  # Tolerance on the 2nd decimal digit
  #         "humidity": 0.1       # Tolerance on the 1st decimal digit
  #     }
  #
  # Or, if it's a raw scalar number::
  #
  #     0.1  # Tolerance on the 1st decimal digit
  #
  # Or, if it's a list of values::
  #
  #     [
  #         0.01,   # Tolerance on the 2nd decimal digit for the first value
  #         0.1     # Tolerance on the 1st decimal digit for the second value
  #     ]
  # tolerance: 0  # type=Union[float, int, Mapping[str, Union[float, int]], Iterable[Union[float, int]]]

  # [Optional]
  # If `SensorPlugin.get_measurement <https://docs.platypush.tech/platypush/plugins/sensor.html#platypush.plugins.sensor.SensorPlugin.get_measurement>`_ returns a key-value
  # mapping, and ``enabled_sensors`` is set, then only the reported
  # sensor keys will be returned.
  # enabled_sensors:   # type=Optional[Iterable[str]]

  # [Optional]
  # How long we should wait for any running
  # threads/processes to stop before exiting (default: 5 seconds).
  # stop_timeout: 5  # type=Optional[float]

  # [Optional]
  # If set to True then the plugin will not monitor
  # for new events. This is useful if you want to run a plugin in
  # stateless mode and only leverage its actions, without triggering any
  # events. Defaults to False.
  # disable_monitor: False  # type=bool

Dependencies#

pip

pip install pyfirmata2

Triggered events#

Actions#

Module reference#

class platypush.plugins.arduino.ArduinoPlugin(*_, **__)[source]#

Bases: SensorPlugin

Interact with an Arduino connected to the host machine over USB using the Firmata protocol.

You have two options to communicate with an Arduino-compatible board over USB:

  • Use this plugin if you want to use the general-purpose Firmata protocol - in this case most of your processing logic will be on the host side and you can read/write data to the Arduino transparently.

  • Use the platypush.plugins.serial.SerialPlugin if instead you want to run more custom logic on the Arduino and communicate back with the host computer through JSON formatted messages.

Download and flash the Standard Firmata firmware to the Arduino in order to use this plugin.

__init__(board: str | None = None, board_type: str | None = None, baud_rate: int = 9600, analog_pins: Dict[str, int] | None = None, digital_pins: Dict[str, int] | None = None, timeout: float = 20.0, conv_functions: Dict[str | int, str | Callable] | None = None, poll_interval: float = 1.0, **kwargs)[source]#
Parameters:
  • board – Default board name or path (e.g. COM3 on Windows or /dev/ttyUSB0 on Unix). If not set then the plugin will attempt an auto-discovery.

  • board_type – Default board type. It can be ‘mega’, ‘due’ or ‘nano’. Leave empty for auto-detection.

  • baud_rate – Default serial baud rate (default: 9600)

  • analog_pins – Optional analog PINs map name->pin_number.

  • digital_pins – Optional digital PINs map name->pin_number.

  • timeout – Board communication timeout in seconds.

  • conv_functions

    Optional mapping of conversion functions to apply to the analog values read from a certain PIN. The key can either be the PIN number or the name as specified in analog_pins, the value can be a function that takes an argument and transforms it or its lambda string representation. Note that analog_read returns by default float values in the range [0.0, 1.0]. Example:

    arduino:
        board: /dev/ttyUSB0
        analog_pins:
            temperature: 1  # Analog PIN 1
    
        conv_functions:
            temperature: 'lambda t: t * 500.0'
    

analog_read(pin: int | str, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, conv_function: str | Callable | None = None, timeout: int | None = None) float | None[source]#

Read an analog value from a PIN.

Parameters:
  • pin – PIN number or configured name.

  • board – Board path or name (default: default configured board).

  • board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured board_type).

  • baud_rate – Baud rate (default: default configured baud_rate).

  • conv_function – Optional conversion function override to apply to the output. It can be either a function object or its lambda string representation (e.g. lambda x: x*x). Keep in mind that analog_read returns by default float values in the range [0.0, 1.0].

  • timeout – Communication timeout in seconds (default: default configured timeout).

analog_write(pin: int | str, value: float, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None)[source]#

Write a value to an analog PIN.

Parameters:
  • pin – PIN number or configured name.

  • value – Voltage to be sent, a real number normalized between 0 and 1.

  • board – Board path or name (default: default configured board).

  • board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured board_type).

  • baud_rate – Baud rate (default: default configured baud_rate).

  • timeout – Communication timeout in seconds (default: default configured timeout).

digital_read(pin: int | str, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None) bool[source]#

Read a digital value from a PIN.

Parameters:
  • pin – PIN number or configured name.

  • board – Board path or name (default: default configured board).

  • board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured board_type).

  • baud_rate – Baud rate (default: default configured baud_rate).

  • timeout – Communication timeout in seconds (default: default configured timeout).

digital_write(pin: int | str, value: bool, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None)[source]#

Write a value to a digital PIN.

Parameters:
  • pin – PIN number or configured name.

  • value – True (HIGH) or False (LOW).

  • board – Board path or name (default: default configured board).

  • board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured board_type).

  • baud_rate – Baud rate (default: default configured baud_rate).

  • timeout – Communication timeout in seconds (default: default configured timeout).

get_data(*args, **kwargs)#

(Deprecated) alias for get_measurement()

get_measurement(*_, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None, **__) Dict[str, bool | float | None][source]#

Get a measurement from all the configured PINs.

Parameters:
  • board – Board path or name (default: default configured board)

  • board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured board_type).

  • baud_rate – Baud rate (default: default configured baud_rate)

  • timeout – Communication timeout in seconds (default: default configured timeout).

Returns:

dict, where the keys are either the configured names of the PINs (see analog_pins configuration) or all the analog PINs (names will be in the format ‘A0..A7’ in that case), and the values will be the real values measured, either normalized between 0 and 1 if no conversion functions were provided, or transformed through the configured conv_functions.

main()#

Implementation of the main loop of the plugin.

publish_entities(entities: float | int | Mapping[str, float | int] | Iterable[float | int], *args, **kwargs) Collection[Entity]#

Publishes a list of entities. The downstream consumers include:

It also accepts an optional callback that will be called when each of the entities in the set is flushed to the database.

You usually don’t need to override this class (but you may want to extend transform_entities() instead if your extension doesn’t natively handle Entity objects).

pwm_write(pin: int | str, value: float, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None)[source]#

Write a PWM value to a digital PIN.

Parameters:
  • pin – PIN number or configured name.

  • value – PWM real value normalized between 0 and 1.

  • board – Board path or name (default: default configured board).

  • board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured board_type).

  • baud_rate – Baud rate (default: default configured baud_rate).

  • timeout – Communication timeout in seconds (default: default configured timeout).

start()#

Start the plugin.

status(*_, **__) float | int | Mapping[str, float | int] | Iterable[float | int] | None#

Returns the latest read values and publishes the platypush.message.event.entities.EntityUpdateEvent events if required.

stop()[source]#

Stop the plugin.

transform_entities(entities: Dict[str, int | float]) List[Device][source]#

This method takes a list of entities in any (plugin-specific) format and converts them into a standardized collection of Entity objects. Since this method is called by publish_entities() before entity updates are published, you may usually want to extend it to pre-process the entities managed by your extension into the standard format before they are stored and published to all the consumers.

wait_stop(timeout=None)#

Wait until a stop event is received.

class platypush.plugins.arduino.BoardType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Enum

Board types.

class platypush.plugins.arduino.PinType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: IntEnum

PIN type enumeration (analog or digital).

__init__(*args, **kwds)#
as_integer_ratio()#

Return a pair of integers, whose ratio is equal to the original int.

The ratio is in lowest terms and has a positive denominator.

>>> (10).as_integer_ratio()
(10, 1)
>>> (-10).as_integer_ratio()
(-10, 1)
>>> (0).as_integer_ratio()
(0, 1)
bit_count()#

Number of ones in the binary representation of the absolute value of self.

Also known as the population count.

>>> bin(13)
'0b1101'
>>> (13).bit_count()
3
bit_length()#

Number of bits necessary to represent self in binary.

>>> bin(37)
'0b100101'
>>> (37).bit_length()
6
conjugate()#

Returns self, the complex conjugate of any int.

denominator#

the denominator of a rational number in lowest terms

from_bytes(byteorder='big', *, signed=False)#

Return the integer represented by the given array of bytes.

bytes

Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol.

byteorder

The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.

signed

Indicates whether two’s complement is used to represent the integer.

imag#

the imaginary part of a complex number

is_integer()#

Returns True. Exists for duck type compatibility with float.is_integer.

numerator#

the numerator of a rational number in lowest terms

real#

the real part of a complex number

to_bytes(length=1, byteorder='big', *, signed=False)#

Return an array of bytes representing an integer.

length

Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1.

byteorder

The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.

signed

Determines whether two’s complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.