Source code for platypush.plugins.music.mpd

import re
import threading
from typing import Collection, Optional, Union

from platypush.plugins import RunnablePlugin, action
from platypush.plugins.music import MusicPlugin

from ._conf import MpdConfig
from ._listener import MpdListener


[docs] class MusicMpdPlugin(MusicPlugin, RunnablePlugin): """ This plugin allows you to interact with an MPD/Mopidy music server. `MPD <https://www.musicpd.org/>`_ is a flexible server-side protocol/application for handling music collections and playing music, mostly aimed to manage local libraries. `Mopidy <https://www.mopidy.com/>`_ is an evolution of MPD, compatible with the original protocol and with support for multiple music sources through plugins (e.g. Spotify, TuneIn, Soundcloud, local files etc.). .. note:: If you use Mopidy, and unless you have quite specific use-cases (like you don't want to expose the Mopidy HTTP interface, or you have some legacy automation that uses the MPD interface), you should use the :class:`platypush.plugins.music.mopidy.MusicMopidyPlugin` plugin instead of this. The Mopidy plugin provides a more complete and feature-rich experience, as not all the features of Mopidy are available through the MPD interface, and its API is 100% compatible with this plugin. Also, this plugin operates a synchronous/polling logic because of the limitations of the MPD protocol, while the Mopidy plugin, as it uses the Mopidy Websocket API, can operate in a more efficient way and provide real-time updates. .. note:: As of Mopidy 3.0 MPD is an optional interface provided by the ``mopidy-mpd`` extension. Make sure that you have the extension installed and enabled on your instance to use this plugin if you want to use it with Mopidy instead of MPD. """ _client_lock = threading.RLock()
[docs] def __init__( self, host: str, port: int = 6600, poll_interval: Optional[float] = 20.0, **kwargs, ): """ :param host: MPD IP/hostname. :param port: MPD port (default: 6600). :param poll_interval: Polling interval in seconds. If set, the plugin will poll the MPD server for status updates and trigger change events when required. Default: 20 seconds. """ super().__init__(poll_interval=poll_interval, **kwargs) self.conf = MpdConfig(host=host, port=port) self.client = None
def _connect(self, n_tries: int = 2): import mpd with self._client_lock: if self.client: return self.client error = None while n_tries > 0: try: n_tries -= 1 self.client = mpd.MPDClient() self.client.connect(self.conf.host, self.conf.port) return self.client except Exception as e: error = e self.logger.warning( 'Connection exception: %s%s', e, (': Retrying' if n_tries > 0 else ''), ) self.wait_stop(0.5) self.client = None if error: raise error return self.client def _exec(self, method: str, *args, **kwargs): error = None n_tries = int(kwargs.pop('n_tries')) if 'n_tries' in kwargs else 2 return_status = ( kwargs.pop('return_status') if 'return_status' in kwargs else True ) while n_tries > 0: try: self._connect() n_tries -= 1 with self._client_lock: response = getattr(self.client, method)(*args, **kwargs) if return_status: return self._status() return response except Exception as e: error = str(e) self.logger.warning( 'Exception while executing MPD method %s: %s', method, error ) self.client = None return None, error @staticmethod def _dump_directory(item: dict): item['type'] = 'directory' item['uri'] = item['name'] = item['directory'] return item @staticmethod def _dump_playlist(item: dict): item['type'] = 'playlist' item['uri'] = item['name'] = item['playlist'] item['last_modified'] = item.pop('last-modified', None) return item @staticmethod def _dump_track(item: dict, pos: Optional[int] = None): item['type'] = 'track' item['uri'] = item['file'] for attr in ('time', 'track', 'disc'): item[attr] = int(item[attr]) if item.get(attr) is not None else None if 'pos' in item: item['playlist_pos'] = int(item.pop('pos')) elif pos is not None: item['playlist_pos'] = pos return item
[docs] @action def play(self, resource: Optional[str] = None, **__): """ Play a resource by path/URI. :param resource: Resource path/URI :type resource: str """ if resource: self.add(resource, position=0) return self.play_pos(0) return self._exec('play')
[docs] @action def play_pos(self, pos: int): """ Play a track in the current playlist by position number. :param pos: Position number. """ return self._exec('play', pos)
[docs] @action def pause(self, *_, **__): """Pause playback""" status = self._status()['state'] return self._exec('pause') if status == 'play' else self._exec('play')
[docs] @action def pause_if_playing(self): """Pause playback only if it's playing""" status = self._status()['state'] return self._exec('pause') if status == 'play' else None
[docs] @action def play_if_paused(self): """Play only if it's paused (resume)""" status = self._status()['state'] return self._exec('play') if status == 'pause' else None
[docs] @action def play_if_paused_or_stopped(self): """Play only if it's paused or stopped""" status = self._status()['state'] return self._exec('play') if status in ('pause', 'stop') else None
[docs] @action def stop(self, *_, **__): # type: ignore """Stop playback""" # Note: stop could be interpreted both as "stop the playback" and "stop # the plugin". If the stop event is set, we assume that the user wants # to stop the plugin. if self.should_stop(): if self.client: self.client.disconnect() return None return self._exec('stop')
[docs] @action def play_or_stop(self): """Play or stop (play state toggle)""" status = self._status()['state'] if status == 'play': return self._exec('stop') return self._exec('play')
[docs] @action def next(self, *_, **__): """Play the next track""" return self._exec('next')
[docs] @action def previous(self, **__): """Play the previous track""" return self._exec('previous')
[docs] @action def set_volume(self, volume: int, **__): """ Set the volume. :param volume: Volume value (range: 0-100). """ return self._exec('setvol', str(volume))
[docs] @action def volup(self, step: Optional[float] = None, **kwargs): """ Turn up the volume. :param step: Volume up step (default: 5%). """ step = step or kwargs.get('delta') or 5 volume = int(self._status()['volume']) new_volume = min(volume + step, 100) return self.set_volume(new_volume)
[docs] @action def voldown(self, step: Optional[float] = None, **kwargs): """ Turn down the volume. :param step: Volume down step (default: 5%). """ step = step or kwargs.get('delta') or 5 volume = int(self._status()['volume']) new_volume = max(volume - step, 0) return self.set_volume(new_volume)
def _toggle(self, key: str, value: Optional[bool] = None): if value is None: value = bool(self._status()[key]) return self._exec(key, int(value))
[docs] @action def random(self, value: Optional[bool] = None): """ Set random mode. :param value: If set, set the random state this value (true/false). Default: None (toggle current state). """ return self._toggle('random', value)
[docs] @action def consume(self, value: Optional[bool] = None): """ Set consume mode. :param value: If set, set the consume state this value (true/false). Default: None (toggle current state) """ return self._toggle('consume', value)
[docs] @action def single(self, value: Optional[bool] = None): """ Set single mode. :param value: If set, set the consume state this value (true/false). Default: None (toggle current state) """ return self._toggle('single', value)
[docs] @action def repeat(self, value: Optional[bool] = None): """ Set repeat mode. :param value: If set, set the repeat state this value (true/false). Default: None (toggle current state) """ return self._toggle('repeat', value)
[docs] @action def shuffle(self): """ Shuffles the current playlist. """ return self._exec('shuffle')
[docs] @action def save(self, name: str): """ Save the current tracklist to a new playlist with the specified name. :param name: Name of the playlist """ return self._exec('save', name)
[docs] @action def add(self, resource: str, *_, position: Optional[int] = None, **__): """ Add a resource (track, album, artist, folder etc.) to the current playlist. :param resource: Resource path or URI. :param position: Position where the track(s) will be inserted (default: end of the playlist). """ if isinstance(resource, list): for r in resource: r = self._parse_resource(r) try: if position is None: self._exec('add', r) else: self._exec('addid', r, position) except Exception as e: self.logger.warning('Could not add %s: %s', r, e) return self.status().output r = self._parse_resource(resource) if position is None: return self._exec('add', r) return self._exec('addid', r, position)
[docs] @action def delete(self, positions): """ Delete the playlist item(s) in the specified position(s). :param positions: Positions of the tracks to be removed :type positions: list[int] :return: The modified playlist """ for pos in sorted(positions, key=int, reverse=True): self._exec('delete', pos) return self.get_tracks()
[docs] @action def delete_playlist(self, playlist: Union[str, Collection[str]]): """ Permanently remove playlist(s) by name :param playlist: Name or list of playlist names to remove :type playlist: str or list[str] """ if isinstance(playlist, str): playlist = [playlist] elif not isinstance(playlist, list): raise RuntimeError(f'Invalid type for playlist: {type(playlist)}') for p in playlist: self._exec('rm', p)
[docs] @action def move( self, start: Optional[int] = None, end: Optional[int] = None, position: Optional[int] = None, from_pos: Optional[int] = None, to_pos: Optional[int] = None, ): """ Move the playlist items from the positions ``start`` to ``end`` to the new position ``position``. You can pass either: - ``start``, ``end`` and ``position`` to move a slice of tracks from ``start`` to ``end`` to the new position ``position``. - ``from_pos`` and ``to_pos`` to move a single track from ``from_pos`` to ``to_pos``. .. note: Positions are 0-based (i.e. the first track has position 0). :param start: Start position of the selection. :param end: End position of the selection. :param position: New position. :param from_pos: Alias for ``start`` - it only works with one track at the time. :param to_pos: Alias for ``position`` - it only works with one track at the time. """ assert (start is not None and end is not None and position is not None) or ( from_pos is not None and to_pos is not None ), 'Specify either (start, end, position) or (from_pos, to_pos)' if from_pos is not None and to_pos is not None: return self._exec('move', from_pos, to_pos) chunk = start if start == end else f'{start}:{end}' return self._exec('move', chunk, position)
@classmethod def _parse_resource(cls, resource): if not resource: return None m = re.search(r'^https?://open\.spotify\.com/([^?]+)', resource) if m: resource = 'spotify:' + m.group(1).replace('/', ':') if resource.startswith('spotify:'): resource = resource.split('?')[0] m = re.match(r'spotify:playlist:(.*)', resource) if m: # Old Spotify URI format, convert it to new resource = 'spotify:user:spotify:playlist:' + m.group(1) return resource
[docs] @action def load(self, playlist, play=True): """ Load and play a playlist by name :param playlist: Playlist name :type playlist: str :param play: Start playback after loading the playlist (default: True) :type play: bool """ ret = self._exec('load', playlist) if play: self.play() return ret
[docs] @action def clear(self, **__): """Clear the current playlist""" return self._exec('clear')
[docs] @action def seek(self, position: float, **__): """ Seek to the specified position :param position: Seek position in seconds, or delta string (e.g. '+15' or '-15') to indicate a seek relative to the current position """ return self._exec('seekcur', position)
[docs] @action def forward(self): """Go forward by 15 seconds""" return self._exec('seekcur', '+15')
[docs] @action def back(self): """Go backward by 15 seconds""" return self._exec('seekcur', '-15')
def _status(self) -> dict: n_tries = 2 error = None while n_tries > 0: try: n_tries -= 1 self._connect() if self.client: return self.client.status() # type: ignore except Exception as e: error = e self.logger.warning('Exception while getting MPD status: %s', e) self.client = None raise AssertionError(str(error))
[docs] @action def status(self, *_, **__): """ :returns: The current state. Example response:: output = { "volume": "9", "repeat": "0", "random": "0", "single": "0", "consume": "0", "playlist": "52", "playlistlength": "14", "xfade": "0", "state": "play", "song": "9", "songid": "3061", "nextsong": "10", "nextsongid": "3062", "time": "161:255", "elapsed": "161.967", "bitrate": "320" } """ return self._status()
def _current_track(self): track = self._exec('currentsong', return_status=False) if not isinstance(track, dict): return None if 'title' in track and ( 'artist' not in track or not track['artist'] or re.search('^https?://', track['file']) or re.search('^tunein:', track['file']) ): m = re.match(r'^\s*(.+?)\s+-\s+(.*)\s*$', track['title']) if m and m.group(1) and m.group(2): track['artist'] = m.group(1) track['title'] = m.group(2) return track
[docs] @action def currentsong(self): """ Legacy alias for :meth:`.current_track`. """ return self.current_track()
[docs] @action def current_track(self, *_, **__): """ :returns: The currently played track. Example response:: output = { "file": "spotify:track:7CO5ADlDN3DcR2pwlnB14P", "time": "255", "artist": "Elbow", "album": "Little Fictions", "title": "Kindling", "date": "2017", "track": "10", "pos": "9", "id": "3061", "albumartist": "Elbow", "x-albumuri": "spotify:album:6q5KhDhf9BZkoob7uAnq19" } """ return self._current_track()
[docs] @action def get_tracks(self, *_, **__): """ :returns: The tracks in the current playlist as a list of dicts. Example output:: output = [ { "file": "spotify:track:79VtgIoznishPUDWO7Kafu", "time": "355", "artist": "Elbow", "album": "Little Fictions", "title": "Trust the Sun", "date": "2017", "track": "3", "pos": "10", "id": "3062", "albumartist": "Elbow", "x-albumuri": "spotify:album:6q5KhDhf9BZkoob7uAnq19" }, { "file": "spotify:track:3EzTre0pxmoMYRuhJKMHj6", "time": "219", "artist": "Elbow", "album": "Little Fictions", "title": "Gentle Storm", "date": "2017", "track": "2", "pos": "11", "id": "3063", "albumartist": "Elbow", "x-albumuri": "spotify:album:6q5KhDhf9BZkoob7uAnq19" }, ] """ return [ self._dump_track(track, pos=i) # type: ignore for i, track in enumerate(self._exec('playlistinfo', return_status=False)) ]
[docs] @action def get_playlists(self, *_, **__): """ :returns: The playlists available on the server as a list of dicts. Example response:: output = [ { "playlist": "Rock", "last_modified": "2018-06-25T21:28:19Z" }, { "playlist": "Jazz", "last_modified": "2018-06-24T22:28:29Z" }, { # ... } ] """ playlists: list = self._exec( # type: ignore 'listplaylists', return_status=False ) return sorted( [self._dump_playlist(pl) for pl in playlists], key=lambda p: p['playlist'] )
[docs] @action def get_playlist(self, playlist: str, *_, **__): """ Get the tracks in a playlist. :param playlist: Name of the playlist """ return [ self._dump_track(track) # type: ignore for track in self._exec( 'listplaylistinfo', # if with_tracks else 'listplaylist', playlist, return_status=False, ) if track ]
[docs] @action def add_to_playlist( self, playlist: str, resources: Union[str, Collection[str]], **_ ): """ Add one or multiple resources to a playlist. :param playlist: Playlist name :param resources: URI or path of the resource(s) to be added """ if isinstance(resources, str): resources = [resources] for res in resources: self._exec('playlistadd', playlist, res)
[docs] @action def remove_from_playlist( self, playlist: str, resources: Union[int, Collection[int]], *_, **__ ): """ Remove one or multiple tracks from a playlist. :param playlist: Playlist name :param resources: Position or list of positions to remove """ if isinstance(resources, str): resources = int(resources) if isinstance(resources, int): resources = [resources] for p in sorted(resources, reverse=True): self._exec('playlistdelete', playlist, p)
[docs] @action def playlist_move(self, playlist: str, from_pos: int, to_pos: int, *_, **__): """ Change the position of a track in the specified playlist. :param playlist: Playlist name :param from_pos: Original track position :param to_pos: New track position """ self._exec('playlistmove', playlist, from_pos, to_pos)
[docs] @action def playlist_clear(self, name: str): """ Clears all the elements from the specified playlist. :param name: Playlist name. """ self._exec('playlistclear', name)
[docs] @action def rename_playlist(self, playlist: str, new_name: str): """ Rename a playlist. :param playlist: Original playlist name or URI :param new_name: New playlist name """ self._exec('rename', playlist, new_name)
[docs] @action def browse(self, uri: Optional[str] = None): """ Browse the items under the specified URI. :param uri: URI to browse (default: root directory). """ resp: dict = ( # type: ignore self._exec('lsinfo', uri, return_status=False) if uri else self._exec('lsinfo', return_status=False) ) ret = [] for item in resp: if item.get('directory'): item = self._dump_directory(item) elif item.get('playlist'): item = self._dump_playlist(item) elif item.get('file'): item = self._dump_track(item) else: continue ret.append(item) return ret
[docs] @action def plchanges(self, version: int): """ Show what has changed on the current playlist since a specified playlist version number. :param version: Version number :returns: A list of dicts representing the songs being added since the specified version """ return self._exec('plchanges', version, return_status=False)
[docs] @action def searchaddplaylist(self, name: str): """ Search and add a playlist by (partial or full) name. :param name: Playlist name, can be partial. """ resp: list = self._exec('listplaylists', return_status=False) # type: ignore playlists = [ pl['playlist'] for pl in resp if name.lower() in pl['playlist'].lower() ] if not playlists: return None self._exec('clear') self._exec('load', playlists[0]) self._exec('play') return {'playlist': playlists[0]}
@staticmethod def _make_filter(f: dict) -> list: ll = [] for k, v in f.items(): ll.extend([k, v]) return ll
[docs] @action def find(self, filter: dict, *args, **kwargs): # pylint: disable=redefined-builtin """ Find in the database/library by filter. :param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``) :returns: list[dict] """ filter_list = self._make_filter(filter) return self._exec('find', *filter_list, *args, return_status=False, **kwargs)
[docs] @action def findadd( self, filter: dict, *args, **kwargs # pylint: disable=redefined-builtin ): """ Find in the database/library by filter and add to the current playlist. :param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``) :returns: list[dict] """ filter_list = self._make_filter(filter) return self._exec('findadd', *filter_list, *args, return_status=False, **kwargs)
[docs] @action def search( self, *args, query: Optional[Union[str, dict]] = None, filter: Optional[dict] = None, # pylint: disable=redefined-builtin **kwargs, ): """ Free search by filter. :param query: Free-text query or search structured filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``). :param filter: Structured search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``) - same as ``query``, it's still here for back-compatibility reasons. :returns: list[dict] """ assert query or filter, 'Specify either `query` or `filter`' filt = filter if isinstance(query, str): filt = query elif isinstance(query, dict): filt = {**(filter or {}), **query} filter_list = self._make_filter(filt) if isinstance(filt, dict) else [query] items: list = self._exec( # type: ignore 'search', *filter_list, *args, return_status=False, **kwargs ) # Spotify results first return sorted( items, key=lambda item: 0 if item['file'].startswith('spotify:') else 1 )
[docs] @action def searchadd( self, filter: dict, *args, **kwargs # pylint: disable=redefined-builtin ): """ Free search by filter and add the results to the current playlist. :param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``) :returns: list[dict] """ filter_list = self._make_filter(filter) return self._exec( 'searchadd', *filter_list, *args, return_status=False, **kwargs )
[docs] def main(self): listener = None if self.poll_interval is not None: listener = MpdListener(self) listener.start() self.wait_stop() if listener: listener.join()
# vim:sw=4:ts=4:et: