arduino
#
Description#
Interact with an Arduino connected to the host machine over USB using the Firmata protocol.
You have two options to communicate with an Arduino-compatible board over USB:
Use this plugin if you want to use the general-purpose Firmata protocol - in this case most of your processing logic will be on the host side and you can read/write data to the Arduino transparently.
Use the
platypush.plugins.serial.SerialPlugin
if instead you want to run more custom logic on the Arduino and communicate back with the host computer through JSON formatted messages.
Download and flash the Standard Firmata firmware to the Arduino in order to use this plugin.
Configuration#
arduino:
# [Optional]
# Default board name or path (e.g. ``COM3`` on Windows or ``/dev/ttyUSB0`` on Unix). If not set
# then the plugin will attempt an auto-discovery.
# board: # type=Optional[str]
# [Optional]
# Default board type. It can be 'mega', 'due' or 'nano'. Leave empty for auto-detection.
# board_type: # type=Optional[str]
# [Optional]
# Default serial baud rate (default: 9600)
# baud_rate: 9600 # type=int
# [Optional]
# Optional analog PINs map name->pin_number.
# analog_pins: # type=Optional[Dict[str, int]]
# [Optional]
# Optional digital PINs map name->pin_number.
# digital_pins: # type=Optional[Dict[str, int]]
# [Optional]
# Board communication timeout in seconds.
# timeout: 20.0 # type=float
# [Optional]
# Optional mapping of conversion functions to apply to the analog values read from a
# certain PIN. The key can either be the PIN number or the name as specified in ``analog_pins``, the value
# can be a function that takes an argument and transforms it or its lambda string representation.
# Note that ``analog_read`` returns by default float values in the range [0.0, 1.0]. Example:
#
# .. code-block:: yaml
#
# arduino:
# board: /dev/ttyUSB0
# analog_pins:
# temperature: 1 # Analog PIN 1
#
# conv_functions:
# temperature: 'lambda t: t * 500.0'
# conv_functions: # type=Optional[Dict[Union[str, int], Union[str, Callable]]]
# [Optional]
# How often the `RunnablePlugin.loop <https://docs.platypush.tech/platypush/plugins/.html#platypush.plugins.RunnablePlugin.loop>`_ function should be
# executed (default: 15 seconds). *NOTE*: For back-compatibility
# reasons, the `poll_seconds` argument is also supported, but it's
# deprecated.
# poll_interval: 1.0 # type=float
# [Optional]
# A number, numeric pair or mapping of ``str`` to
# number/numeric pair representing the thresholds for the sensor.
#
# Examples:
#
# .. code-block:: yaml
#
# # Any value below 25 from any sensor will trigger a
# # SensorDataBelowThresholdEvent, if the previous value was
# # equal or above, and any value above 25 will trigger a
# # SensorDataAboveThresholdEvent, if the previous value was
# # equal or below
# thresholds: 25.0
#
# # Same as above, but the threshold is only applied to
# # ``temperature`` readings
# thresholds:
# temperature: 25.0
#
# # Any value below 20 from any sensor will trigger a
# # SensorDataBelowThresholdEvent, if the previous value was
# # equal or above, and any value above 25 will trigger a
# # SensorDataAboveThresholdEvent, if the previous value was
# # equal or below (hysteresis configuration with double
# # threshold)
# thresholds:
# - 20.0
# - 25.0
#
# # Same as above, but the threshold is only applied to
# # ``temperature`` readings
# thresholds:
# temperature:
# - 20.0
# - 25.0
# thresholds: # type=Union[float, int, Tuple[Union[float, int], Union[float, int]], Mapping[str, Union[float, int, Tuple[Union[float, int], Union[float, int]]]], NoneType]
# [Optional]
# If set, then the sensor change events will be
# triggered only if the difference between the new value and the
# previous value is higher than the specified tolerance. For example,
# if the sensor data is mapped to a dictionary::
#
# {
# "temperature": 0.01, # Tolerance on the 2nd decimal digit
# "humidity": 0.1 # Tolerance on the 1st decimal digit
# }
#
# Or, if it's a raw scalar number::
#
# 0.1 # Tolerance on the 1st decimal digit
#
# Or, if it's a list of values::
#
# [
# 0.01, # Tolerance on the 2nd decimal digit for the first value
# 0.1 # Tolerance on the 1st decimal digit for the second value
# ]
# tolerance: 0 # type=Union[float, int, Mapping[str, Union[float, int]], Iterable[Union[float, int]]]
# [Optional]
# If `SensorPlugin.get_measurement <https://docs.platypush.tech/platypush/plugins/sensor.html#platypush.plugins.sensor.SensorPlugin.get_measurement>`_ returns a key-value
# mapping, and ``enabled_sensors`` is set, then only the reported
# sensor keys will be returned.
# enabled_sensors: # type=Optional[Iterable[str]]
# [Optional]
# How long we should wait for any running
# threads/processes to stop before exiting (default: 5 seconds).
# stop_timeout: 5 # type=Optional[float]
# [Optional]
# If set to True then the plugin will not monitor
# for new events. This is useful if you want to run a plugin in
# stateless mode and only leverage its actions, without triggering any
# events. Defaults to False.
# disable_monitor: False # type=bool
Dependencies#
pip
pip install pyfirmata2
Triggered events#
Actions#
Module reference#
- class platypush.plugins.arduino.ArduinoPlugin(*_, **__)[source]#
Bases:
SensorPlugin
Interact with an Arduino connected to the host machine over USB using the Firmata protocol.
You have two options to communicate with an Arduino-compatible board over USB:
Use this plugin if you want to use the general-purpose Firmata protocol - in this case most of your processing logic will be on the host side and you can read/write data to the Arduino transparently.
Use the
platypush.plugins.serial.SerialPlugin
if instead you want to run more custom logic on the Arduino and communicate back with the host computer through JSON formatted messages.
Download and flash the Standard Firmata firmware to the Arduino in order to use this plugin.
- __init__(board: str | None = None, board_type: str | None = None, baud_rate: int = 9600, analog_pins: Dict[str, int] | None = None, digital_pins: Dict[str, int] | None = None, timeout: float = 20.0, conv_functions: Dict[str | int, str | Callable] | None = None, poll_interval: float = 1.0, **kwargs)[source]#
- Parameters:
board – Default board name or path (e.g.
COM3
on Windows or/dev/ttyUSB0
on Unix). If not set then the plugin will attempt an auto-discovery.board_type – Default board type. It can be ‘mega’, ‘due’ or ‘nano’. Leave empty for auto-detection.
baud_rate – Default serial baud rate (default: 9600)
analog_pins – Optional analog PINs map name->pin_number.
digital_pins – Optional digital PINs map name->pin_number.
timeout – Board communication timeout in seconds.
conv_functions –
Optional mapping of conversion functions to apply to the analog values read from a certain PIN. The key can either be the PIN number or the name as specified in
analog_pins
, the value can be a function that takes an argument and transforms it or its lambda string representation. Note thatanalog_read
returns by default float values in the range [0.0, 1.0]. Example:arduino: board: /dev/ttyUSB0 analog_pins: temperature: 1 # Analog PIN 1 conv_functions: temperature: 'lambda t: t * 500.0'
- analog_read(pin: int | str, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, conv_function: str | Callable | None = None, timeout: int | None = None) float | None [source]#
Read an analog value from a PIN.
- Parameters:
pin – PIN number or configured name.
board – Board path or name (default: default configured
board
).board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured
board_type
).baud_rate – Baud rate (default: default configured
baud_rate
).conv_function – Optional conversion function override to apply to the output. It can be either a function object or its lambda string representation (e.g.
lambda x: x*x
). Keep in mind thatanalog_read
returns by default float values in the range[0.0, 1.0]
.timeout – Communication timeout in seconds (default: default configured
timeout
).
- analog_write(pin: int | str, value: float, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None)[source]#
Write a value to an analog PIN.
- Parameters:
pin – PIN number or configured name.
value – Voltage to be sent, a real number normalized between 0 and 1.
board – Board path or name (default: default configured
board
).board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured
board_type
).baud_rate – Baud rate (default: default configured
baud_rate
).timeout – Communication timeout in seconds (default: default configured
timeout
).
- digital_read(pin: int | str, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None) bool [source]#
Read a digital value from a PIN.
- Parameters:
pin – PIN number or configured name.
board – Board path or name (default: default configured
board
).board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured
board_type
).baud_rate – Baud rate (default: default configured
baud_rate
).timeout – Communication timeout in seconds (default: default configured
timeout
).
- digital_write(pin: int | str, value: bool, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None)[source]#
Write a value to a digital PIN.
- Parameters:
pin – PIN number or configured name.
value – True (HIGH) or False (LOW).
board – Board path or name (default: default configured
board
).board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured
board_type
).baud_rate – Baud rate (default: default configured
baud_rate
).timeout – Communication timeout in seconds (default: default configured
timeout
).
- get_data(*args, **kwargs)#
(Deprecated) alias for
get_measurement()
- get_measurement(*_, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None, **__) Dict[str, bool | float | None] [source]#
Get a measurement from all the configured PINs.
- Parameters:
board – Board path or name (default: default configured
board
)board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured
board_type
).baud_rate – Baud rate (default: default configured
baud_rate
)timeout – Communication timeout in seconds (default: default configured
timeout
).
- Returns:
dict, where the keys are either the configured names of the PINs (see
analog_pins
configuration) or all the analog PINs (names will be in the format ‘A0..A7’ in that case), and the values will be the real values measured, either normalized between 0 and 1 if no conversion functions were provided, or transformed through the configuredconv_functions
.
- main()#
Implementation of the main loop of the plugin.
- publish_entities(entities: float | int | Mapping[str, float | int] | Iterable[float | int], *args, **kwargs) Collection[Entity] #
Publishes a list of entities. The downstream consumers include:
The entity persistence manager
The web server
- Any consumer subscribed to
platypush.message.event.entities.EntityUpdateEvent
events (e.g. web clients)
It also accepts an optional callback that will be called when each of the entities in the set is flushed to the database.
You usually don’t need to override this class (but you may want to extend
transform_entities()
instead if your extension doesn’t natively handle Entity objects).
- pwm_write(pin: int | str, value: float, board: str | None = None, board_type: str | None = None, baud_rate: int | None = None, timeout: int | None = None)[source]#
Write a PWM value to a digital PIN.
- Parameters:
pin – PIN number or configured name.
value – PWM real value normalized between 0 and 1.
board – Board path or name (default: default configured
board
).board_type – Board type. It can be ‘mega’, ‘due’ or ‘nano’ (default: configured
board_type
).baud_rate – Baud rate (default: default configured
baud_rate
).timeout – Communication timeout in seconds (default: default configured
timeout
).
- start()#
Start the plugin.
- status(*_, **__) float | int | Mapping[str, float | int] | Iterable[float | int] | None #
Returns the latest read values and publishes the
platypush.message.event.entities.EntityUpdateEvent
events if required.
- transform_entities(entities: Dict[str, int | float]) List[Device] [source]#
This method takes a list of entities in any (plugin-specific) format and converts them into a standardized collection of Entity objects. Since this method is called by
publish_entities()
before entity updates are published, you may usually want to extend it to pre-process the entities managed by your extension into the standard format before they are stored and published to all the consumers.
- wait_stop(timeout=None)#
Wait until a stop event is received.
- class platypush.plugins.arduino.BoardType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
Board types.
- class platypush.plugins.arduino.PinType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
IntEnum
PIN type enumeration (analog or digital).
- __init__(*args, **kwds)#
- as_integer_ratio()#
Return a pair of integers, whose ratio is equal to the original int.
The ratio is in lowest terms and has a positive denominator.
>>> (10).as_integer_ratio() (10, 1) >>> (-10).as_integer_ratio() (-10, 1) >>> (0).as_integer_ratio() (0, 1)
- bit_count()#
Number of ones in the binary representation of the absolute value of self.
Also known as the population count.
>>> bin(13) '0b1101' >>> (13).bit_count() 3
- bit_length()#
Number of bits necessary to represent self in binary.
>>> bin(37) '0b100101' >>> (37).bit_length() 6
- conjugate()#
Returns self, the complex conjugate of any int.
- denominator#
the denominator of a rational number in lowest terms
- from_bytes(byteorder='big', *, signed=False)#
Return the integer represented by the given array of bytes.
- bytes
Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol.
- byteorder
The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.
- signed
Indicates whether two’s complement is used to represent the integer.
- imag#
the imaginary part of a complex number
- is_integer()#
Returns True. Exists for duck type compatibility with float.is_integer.
- numerator#
the numerator of a rational number in lowest terms
- real#
the real part of a complex number
- to_bytes(length=1, byteorder='big', *, signed=False)#
Return an array of bytes representing an integer.
- length
Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1.
- byteorder
The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.
- signed
Determines whether two’s complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.