import json
import select
import socket
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Collection, Iterable, List, Optional, Tuple, Union
from platypush.config import Config
from platypush.context import get_bus
from platypush.message.event.music.snapcast import (
ClientVolumeChangeEvent,
GroupMuteChangeEvent,
ClientConnectedEvent,
ClientDisconnectedEvent,
ClientLatencyChangeEvent,
ClientNameChangeEvent,
GroupStreamChangeEvent,
StreamUpdateEvent,
ServerUpdateEvent,
)
from platypush.plugins import RunnablePlugin, action
[docs]
class MusicSnapcastPlugin(RunnablePlugin):
"""
Plugin to interact with a `Snapcast <https://github.com/badaix/snapcast>`_
instance, control clients mute status, volume, playback etc.
See https://github.com/badaix/snapcast/blob/master/doc/json_rpc_api/v2_0_0.md
for further reference about the returned API types.
"""
_DEFAULT_SNAPCAST_PORT = 1705
_DEFAULT_POLL_SECONDS = 10 # Poll servers each 10 seconds
_SOCKET_EOL = '\r\n'.encode()
[docs]
def __init__(
self,
host: Optional[str] = None,
port: int = _DEFAULT_SNAPCAST_PORT,
hosts: Optional[Iterable[dict]] = None,
poll_interval: Optional[float] = _DEFAULT_POLL_SECONDS,
**kwargs,
):
"""
:param host: Default Snapcast server host.
:param port: Default Snapcast server control port (default: 1705).
:param hosts: If specified, then the provided list of Snapcast servers
will be monitored, rather than just the one provided on ``host``.
This setting can be used either in conjunction with ``host`` (in
that case, if the ``host`` is not specified on a request then
``host`` will be used as a fallback), or on its own (in that case
requests with no host specified will target the first server in the
list). Note however that either ``host`` or ``hosts`` must be
provided. Format:
.. code-block:: yaml
hosts:
- host: localhost
port: 1705 # Default port
- host: snapcast.example.com
port: 9999
:param poll_seconds: How often the plugin will poll remote servers for
status updates (default: 10 seconds).
"""
super().__init__(poll_interval=poll_interval, **kwargs)
self._hosts = self._get_hosts(host=host, port=port, hosts=hosts)
assert self._hosts, 'No Snapcast hosts specified'
self._latest_req_id = 0
self._latest_req_id_lock = threading.RLock()
self._socks = {}
self._threads = {}
self._statuses = {}
@property
def host(self) -> str:
return self._hosts[0][0]
@property
def port(self) -> int:
if not getattr(self, '_hosts', None):
return self._DEFAULT_SNAPCAST_PORT
return self._hosts[0][1]
def _get_hosts(
self,
host: Optional[str] = None,
port: Optional[int] = None,
hosts: Optional[Iterable[dict]] = None,
) -> List[Tuple[str, int]]:
ret = []
if hosts:
assert all(
isinstance(h, dict) and h.get('host') for h in hosts
), f'Expected a list of dicts with host and port keys, got: {hosts}'
ret.extend((h['host'], h.get('port', self.port)) for h in hosts)
if host and port:
ret.insert(0, (host, port))
return list(dict.fromkeys(ret))
def _next_req_id(self):
with self._latest_req_id_lock:
self._latest_req_id += 1
return self._latest_req_id
def _connect(self, host: str, port: int, reuse: bool = False):
if reuse and self._socks.get(host) and self._socks[host].fileno() >= 0:
return self._socks[host]
self.logger.debug('Connecting to %s:%d', host, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
if reuse:
self._socks[host] = sock
self.logger.info('Connected to %s:%d', host, port)
return sock
def _disconnect(self, host: str, port: int):
sock = self._socks.get(host)
if not sock:
self.logger.debug('Not connected to %s:%d', host, port)
return
try:
sock.close()
except Exception as e:
self.logger.warning(
'Exception while disconnecting from %s:%d: %s', host, port, e
)
finally:
self._socks[host] = None
@classmethod
def _send(cls, sock: socket.socket, req: Union[dict, str, bytes]):
if isinstance(req, dict):
req = json.dumps(req)
if isinstance(req, str):
req = req.encode()
assert isinstance(
req, bytes
), f'Unsupported type {type(req)} for Snapcast request'
sock.send(req + cls._SOCKET_EOL)
@classmethod
def _recv_result(cls, sock: socket.socket):
msg = cls._recv(sock)
if not msg:
return None
return msg.get('result')
@classmethod
def _recv(cls, sock: socket.socket):
sock.setblocking(False)
buf = b''
while buf[-2:] != cls._SOCKET_EOL:
ready = select.select([sock], [], [], 0.5)
if ready[0]:
ch = sock.recv(1)
if not ch:
raise ConnectionError('Connection reset by peer')
buf += ch
else:
return None
return json.loads(buf)
@classmethod
def _parse_event(cls, host, msg):
evt = None
if msg.get('method') == 'Client.OnVolumeChanged':
client_id = msg.get('params', {}).get('id')
volume = msg.get('params', {}).get('volume', {}).get('percent')
muted = msg.get('params', {}).get('volume', {}).get('muted')
evt = ClientVolumeChangeEvent(
host=host, client=client_id, volume=volume, muted=muted
)
elif msg.get('method') == 'Group.OnMute':
group_id = msg.get('params', {}).get('id')
muted = msg.get('params', {}).get('mute')
evt = GroupMuteChangeEvent(host=host, group=group_id, muted=muted)
elif msg.get('method') == 'Client.OnConnect':
client = msg.get('params', {}).get('client')
evt = ClientConnectedEvent(host=host, client=client)
elif msg.get('method') == 'Client.OnDisconnect':
client = msg.get('params', {}).get('client')
evt = ClientDisconnectedEvent(host=host, client=client)
elif msg.get('method') == 'Client.OnLatencyChanged':
client = msg.get('params', {}).get('id')
latency = msg.get('params', {}).get('latency')
evt = ClientLatencyChangeEvent(host=host, client=client, latency=latency)
elif msg.get('method') == 'Client.OnNameChanged':
client = msg.get('params', {}).get('id')
name = msg.get('params', {}).get('name')
evt = ClientNameChangeEvent(host=host, client=client, name=name)
elif msg.get('method') == 'Group.OnStreamChanged':
group_id = msg.get('params', {}).get('id')
stream_id = msg.get('params', {}).get('stream_id')
evt = GroupStreamChangeEvent(host=host, group=group_id, stream=stream_id)
elif msg.get('method') == 'Stream.OnUpdate':
stream_id = msg.get('params', {}).get('stream_id')
stream = msg.get('params', {}).get('stream')
evt = StreamUpdateEvent(host=host, stream_id=stream_id, stream=stream)
elif msg.get('method') == 'Server.OnUpdate':
server = msg.get('params', {}).get('server')
evt = ServerUpdateEvent(host=host, server=server)
return evt
def _event_listener(self, host: str, port: int):
def _thread():
while not self.should_stop():
try:
sock = self._connect(host, port, reuse=True)
msgs = self._recv(sock)
if msgs is None:
continue
if not isinstance(msgs, list):
msgs = [msgs]
for msg in msgs:
self.logger.debug(
'Received message on %s:%d: %s', host, port, msg
)
evt = self._parse_event(host=host, msg=msg)
if evt:
get_bus().post(evt)
except Exception as e:
self.logger.warning(
'Exception while getting the status of the Snapcast server %s:%d: %s',
host,
port,
e,
)
self._disconnect(host, port)
finally:
if self.poll_interval:
time.sleep(self.poll_interval)
return _thread
def _get_group(self, sock: socket.socket, group: str):
for g in self._get_status(sock).get('groups', []):
if group == g.get('id') or group == g.get('name'):
return g
return None
def _get_client(self, sock: socket.socket, client: str):
for g in self._get_status(sock).get('groups', []):
clients = g.get('clients', [])
for c in clients:
if (
client == c.get('id')
or client == c.get('name')
or client == c.get('host', {}).get('name')
or client == c.get('host', {}).get('ip')
):
c['group_id'] = g.get('id')
return c
return None
def _get_status(self, sock: socket.socket) -> dict:
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Server.GetStatus',
}
self._send(sock, request)
return (self._recv_result(sock) or {}).get('server', {})
def _status(
self,
host: Optional[str] = None,
port: Optional[int] = None,
client: Optional[str] = None,
group: Optional[str] = None,
):
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
if client:
return self._get_client(sock, client)
if group:
return self._get_group(sock, group)
return self._get_status(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
def _status_response(
self,
host: str,
port: int,
client: Optional[str] = None,
group: Optional[str] = None,
):
sock = None
try:
sock = self._connect(host, port)
if client:
return self._get_client(sock, client)
if group:
return self._get_group(sock, group)
return {'host': host, 'port': port, **self._get_status(sock)}
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def status(
self,
host: Optional[str] = None,
port: Optional[int] = None,
client: Optional[str] = None,
group: Optional[str] = None,
):
"""
Get the current status of a Snapcast server, client or group.
If not host, client or group is specified, the action will return the
status of all the Snapcast servers.
:param host: Snapcast server to query (default: default configured host)
:param port: Snapcast server port (default: default configured port)
:param client: Client ID or name (default: None)
:param group: Group ID or name (default: None)
:returns: dict. Example:
.. code-block:: json
"output": {
"host": "localhost",
"port": 1705,
"groups": [
{
"clients": [
{
"config": {
"instance": 1,
"latency": 0,
"name": "",
"volume": {
"muted": false,
"percent": 96
}
},
"connected": true,
"host": {
"arch": "x86_64",
"ip": "YOUR_IP",
"mac": "YOUR_MAC",
"name": "YOUR_NAME",
"os": "YOUR_OS"
},
"id": "YOUR_ID",
"lastSeen": {
"sec": 1546648311,
"usec": 86011
},
"snapclient": {
"name": "Snapclient",
"protocolVersion": 2,
"version": "0.15.0"
}
}
],
"id": "YOUR_ID",
"muted": false,
"name": "",
"stream_id": "mopidy"
}
],
"server": {
"host": {
"arch": "armv7l",
"ip": "",
"mac": "",
"name": "YOUR_NAME",
"os": "YOUR_OS"
},
"snapserver": {
"controlProtocolVersion": 1,
"name": "Snapserver",
"protocolVersion": 1,
"version": "0.15.0"
}
},
"streams": [
{
"id": "mopidy",
"meta": {
"STREAM": "mopidy"
},
"status": "playing",
"uri": {
"fragment": "",
"host": "",
"path": "/tmp/snapfifo",
"query": {
"buffer_ms": "20",
"codec": "pcm",
"name": "mopidy",
"sampleformat": "48000:16:2"
},
"raw": "pipe:////tmp/fifo?buffer_ms=20&codec=pcm&name=mopidy&sampleformat=48000:16:2",
"scheme": "pipe"
}
}
]
}
"""
if client or group or host:
return self._status_response(
host=host or self.host,
port=port or self.port,
client=client,
group=group,
)
# Run status in parallel on all the hosts and return a list with all the
# results
with ThreadPoolExecutor(max_workers=len(self._hosts)) as executor:
return list(
executor.map(lambda h: self._status_response(h[0], h[1]), self._hosts)
)
[docs]
@action
def mute(
self,
client: Optional[str] = None,
group: Optional[str] = None,
mute: Optional[bool] = None,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set the mute status of a connected client or group
:param client: Client name or ID to mute
:param group: Group ID to mute
:param mute: Mute status. If not set, the mute status of the selected
client/group will be toggled.
:param host: Snapcast server to query (default: default configured host)
:param port: Snapcast server port (default: default configured port)
"""
assert client or group, 'Please specify either a client or a group'
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetMute' if group else 'Client.SetVolume',
'params': {},
}
if group:
g = self._get_group(sock, group)
assert g, f'No such group: {group}'
cur_muted = g['muted']
request['params']['id'] = g['id']
request['params']['mute'] = not cur_muted if mute is None else mute
elif client:
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
cur_muted = c['config']['volume']['muted']
request['params']['id'] = c['id']
request['params']['volume'] = {}
request['params']['volume']['percent'] = c['config']['volume'][
'percent'
]
request['params']['volume']['muted'] = (
not cur_muted if mute is None else mute
)
self._send(sock, request)
return self._recv_result(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def volume(
self,
client: str,
volume: Optional[int] = None,
delta: Optional[int] = None,
mute: Optional[bool] = None,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set the volume of a connected client.
:param client: Client name or ID
:param volume: Absolute volume to set between 0 and 100
:param delta: Relative volume change in percentage (e.g. +10 or -10)
:param mute: Set to true or false if you want to toggle the muted status
:param host: Snapcast server (default: default configured host)
:param port: Snapcast server port (default: default configured port)
"""
assert not (volume is None and delta is None and mute is None), (
'Please specify either an absolute volume, a relative delta or '
+ 'a mute status'
)
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetVolume',
'params': {},
}
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
cur_volume = int(c['config']['volume']['percent'])
cur_mute = bool(c['config']['volume']['muted'])
if volume is not None:
volume = int(volume)
elif delta is not None:
volume = cur_volume + int(delta)
volume = max(0, min(100, volume)) if volume is not None else cur_volume
if mute is None:
mute = cur_mute
request['params']['id'] = c['id']
request['params']['volume'] = {}
request['params']['volume']['percent'] = volume
request['params']['volume']['muted'] = mute
self._send(sock, request)
return self._recv_result(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def set_client_name(
self,
client: str,
name: str,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set/change the name of a connected client
:param client: Current client name or ID to rename
:param name: New name
:param host: Snapcast server (default: default configured host)
:param port: Snapcast server port (default: default configured port)
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetName',
'params': {},
}
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
request['params']['id'] = c['id']
request['params']['name'] = name
self._send(sock, request)
return self._recv_result(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def set_group_name(
self,
group: str,
name: str,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set/change the name of a group
:param group: Group ID to rename
:param name: New name
:param host: Snapcast server (default: default configured host)
:param port: Snapcast server port (default: default configured port)
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetName',
'params': {
'id': group,
'name': name,
},
}
self._send(sock, request)
return self._recv_result(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def set_latency(
self,
client: str,
latency: float,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set/change the latency of a connected client
:param client: Client name or ID
:param latency: New latency in milliseconds
:param host: Snapcast server (default: default configured host)
:param port: Snapcast server port (default: default configured port)
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetLatency',
'params': {'latency': latency},
}
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
request['params']['id'] = c['id']
self._send(sock, request)
return self._recv_result(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def delete_client(
self, client: str, host: Optional[str] = None, port: Optional[int] = None
):
"""
Delete a client from the Snapcast server
:param client: Client name or ID
:param host: Snapcast server (default: default configured host)
:param port: Snapcast server port (default: default configured port)
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Server.DeleteClient',
'params': {},
}
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
request['params']['id'] = c['id']
self._send(sock, request)
return self._recv_result(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def group_set_clients(
self,
group: str,
clients: Collection[str],
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Sets the clients for a group on a Snapcast server
:param group: Group name or ID
:param clients: List of client names or IDs
:param host: Snapcast server (default: default configured host)
:param port: Snapcast server port (default: default configured port)
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
g = self._get_group(sock, group)
assert g, f'No such group: {group}'
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetClients',
'params': {'id': g['id'], 'clients': []},
}
for client in clients:
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
request['params']['clients'].append(c['id'])
self._send(sock, request)
return self._recv_result(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def group_set_stream(
self,
group: str,
stream_id: str,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Sets the active stream for a group.
:param group: Group name or ID
:param stream_id: Stream ID
:param host: Snapcast server (default: default configured host)
:param port: Snapcast server port (default: default configured port)
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
g = self._get_group(sock, group)
assert g, f'No such group: {group}'
request = {
'id': self._next_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetStream',
'params': {
'id': g['id'],
'stream_id': stream_id,
},
}
self._send(sock, request)
return self._recv_result(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
[docs]
@action
def get_playing_streams(self, exclude_local: bool = False):
"""
Returns the configured remote streams that are currently active and
unmuted.
.. warning:: This action is deprecated and mostly kept for
backward-compatibility purposes, as it doesn't allow the case where
multiple Snapcast instances can be running on the same host, nor it
provides additional information other that something is playing on a
certain host and port. Use :meth:`.status` instead.
:param exclude_local: Exclude localhost connections (default: False)
:returns: dict with the host->port mapping. Example:
.. code-block:: json
{
"hosts": {
"server_1": 1705,
"server_2": 1705,
"server_3": 1705
}
}
"""
playing_hosts = {}
def _worker(host: str, port: int):
try:
if exclude_local and (
host == 'localhost' or host == Config.get('device_id')
):
return
server_status = self._status(host=host, port=port) or {}
device_id: str = Config.get('device_id') # type: ignore
client_status = (
self._status(host=host, port=port, client=device_id) or {}
)
if client_status.get('config', {}).get('volume', {}).get('muted'):
return
group = next(
iter(
g
for g in server_status.get('groups', {})
if g.get('id') == client_status.get('group_id')
)
)
if group.get('muted'):
return
stream = next(
iter(
s
for s in server_status.get('streams', {})
if s.get('id') == group.get('stream_id')
)
)
if stream.get('status') != 'playing':
return
playing_hosts[host] = port
except Exception as e:
self.logger.warning(
'Error while retrieving the status of Snapcast host at %s:%d: %s',
host,
port,
e,
)
workers = []
for host, port in self._hosts:
w = threading.Thread(target=_worker, args=(host, port))
w.start()
workers.append(w)
while workers:
w = workers.pop()
w.join()
return {'hosts': playing_hosts}
[docs]
def main(self):
while not self.should_stop():
for host, port in self._hosts:
thread_name = f'Snapcast-{host}-{port}'
self._threads[host] = threading.Thread(
target=self._event_listener(host, port), name=thread_name
)
self._threads[host].start()
for thread in self._threads.values():
thread.join()
self._threads = {}
[docs]
def stop(self):
for host, sock in self._socks.items():
if sock:
try:
sock.close()
except Exception as e:
self.logger.warning(
'Could not close Snapcast connection to %s: %s: %s',
host,
type(e),
e,
)
super().stop()
# vim:sw=4:ts=4:et: