Source code for platypush.plugins.media.kodi

import json
import threading
import time
from typing import Optional
from urllib.parse import urlparse

from platypush.context import get_bus
from platypush.plugins import action
from platypush.plugins.media import MediaPlugin, PlayerState
from platypush.message.event.media import (
    MediaPlayEvent,
    MediaPauseEvent,
    MediaStopEvent,
    MediaSeekEvent,
    MediaVolumeChangedEvent,
)


[docs] class MediaKodiPlugin(MediaPlugin): """ Plugin to interact with a Kodi media player instance """
[docs] def __init__( self, rpc_url: str = 'http://localhost:8080/jsonrpc', websocket_port: int = 9090, username: Optional[str] = None, password: Optional[str] = None, **kwargs, ): """ :param rpc_url: Base URL for the Kodi JSON RPC API (default: http://localhost:8080/jsonrpc). You need to make sure that the RPC API is enabled on your Kodi instance - you can enable it from the settings. :param websocket_port: Kodi JSON RPC websocket port, used to receive player events :param username: Kodi username (optional) :param password: Kodi password (optional) """ super().__init__(**kwargs) self.url = rpc_url host, port = kwargs.get('host'), kwargs.get('port', 8080) if host and port: self.logger.warning('host and port are deprecated, use rpc_url instead') self.url = f'http://{host}:{port}/jsonrpc' self.host = urlparse(self.url).hostname self.websocket_port = websocket_port self.websocket_url = f'ws://{self.host}:{websocket_port}/jsonrpc' self.username = username self.password = password self._ws = None threading.Thread(target=self._websocket_thread()).start()
def _get_kodi(self): from kodijson import Kodi args = [self.url] if self.username: args += [self.username] if self.password: args += [self.password] return Kodi(*args) def _get_player_id(self): kodi = self._get_kodi() players = kodi.Player.GetActivePlayers().get('result', []) if not players: return None return players.pop().get('playerid') def _websocket_thread(self): """ Initialize the websocket JSON RPC interface, if available, to receive player notifications """ def thread_hndl(): try: import websocket except ImportError: self.logger.warning( 'websocket-client is not installed, Kodi events will be disabled' ) return if not self._ws: self._ws = websocket.WebSocketApp( self.websocket_url, on_message=self._on_ws_msg(), on_error=self._on_ws_error(), on_close=self._on_ws_close(), ) self.logger.info('Kodi websocket interface for events started') self._ws.run_forever() return thread_hndl def _post_event(self, evt_type, **evt): bus = get_bus() bus.post(evt_type(player=self.host, plugin='media.kodi', **evt)) def _on_ws_msg(self): def hndl(*args): msg = args[1] if len(args) > 1 else args[0] self.logger.info("Received Kodi message: %s", msg) msg = json.loads(msg) method = msg.get('method') if method == 'Player.OnPlay': item = msg.get('params', {}).get('data', {}).get('item', {}) player = msg.get('params', {}).get('data', {}).get('player', {}) self._post_event( MediaPlayEvent, player_id=player.get('playerid'), title=item.get('title'), media_type=item.get('type'), ) elif method == 'Player.OnPause': item = msg.get('params', {}).get('data', {}).get('item', {}) player = msg.get('params', {}).get('data', {}).get('player', {}) self._post_event( MediaPauseEvent, player_id=player.get('playerid'), title=item.get('title'), media_type=item.get('type'), ) elif method == 'Player.OnStop': player = msg.get('params', {}).get('data', {}).get('player', {}) self._post_event(MediaStopEvent, player_id=player.get('playerid')) self._clear_resource() elif method == 'Player.OnSeek': player = msg.get('params', {}).get('data', {}).get('player', {}) position = self._time_obj_to_pos(player.get('seekoffset')) self._post_event( MediaSeekEvent, position=position, player_id=player.get('playerid') ) elif method == 'Application.OnVolumeChanged': volume = msg.get('params', {}).get('data', {}).get('volume') self._post_event(MediaVolumeChangedEvent, volume=volume) return hndl def _on_ws_error(self): def hndl(*args): error = args[1] if len(args) > 1 else args[0] self.logger.warning("Kodi websocket connection error: %s", error) return hndl def _on_ws_close(self): def hndl(*_): self._ws = None self.logger.warning("Kodi websocket connection closed") time.sleep(5) self._websocket_thread() return hndl def _build_result(self, result): status = self.status().output status['result'] = result.get('result') return status, result.get('error') def _clear_resource(self): if self._latest_resource: self._latest_resource.close() self._latest_resource = None
[docs] @action def play(self, resource: str, **kwargs): """ Open and play the specified file or URL :param resource: URL or path to the media to be played """ media = self._latest_resource = self._get_resource(resource, **kwargs) media.open(**kwargs) result = self._get_kodi().Player.Open(item={'file': media.resource}) if self.volume: self.set_volume(volume=int(self.volume)) return self._build_result(result)
[docs] @action def pause(self, player_id=None, **_): """ Play/pause the current media """ if player_id is None: player_id = self._get_player_id() if player_id is None: return None, 'No active players found' result = self._get_kodi().Player.PlayPause(playerid=player_id) return self._build_result(result)
[docs] @action def get_active_players(self): """ Get the list of active players """ result = self._get_kodi().Player.GetActivePlayers() return result.get('result'), result.get('error')
[docs] @action def get_movies(self, **_): """ Get the list of movies on the Kodi server """ result = self._get_kodi().VideoLibrary.GetMovies() return result.get('result'), result.get('error')
[docs] @action def stop(self, player_id=None, **_): """ Stop the current media """ if player_id is None: player_id = self._get_player_id() if player_id is None: return None, 'No active players found' result = self._get_kodi().Player.Stop(playerid=player_id) self._clear_resource() return self._build_result(result)
[docs] @action def notify(self, title, message, **_): """ Send a notification to the Kodi UI """ result = self._get_kodi().GUI.ShowNotification(title=title, message=message) return result.get('result'), result.get('error')
[docs] @action def left(self, **_): """ Simulate a left input event """ result = self._get_kodi().Input.Left() return result.get('result'), result.get('error')
[docs] @action def right(self, **_): """ Simulate a right input event """ result = self._get_kodi().Input.Right() return result.get('result'), result.get('error')
[docs] @action def up(self, **_): """ Simulate an up input event """ result = self._get_kodi().Input.Up() return result.get('result'), result.get('error')
[docs] @action def down(self, **_): """ Simulate a down input event """ result = self._get_kodi().Input.Down() return result.get('result'), result.get('error')
[docs] @action def back_btn(self, **_): """ Simulate a back input event """ result = self._get_kodi().Input.Back() return result.get('result'), result.get('error')
[docs] @action def select(self, **_): """ Simulate a select input event """ result = self._get_kodi().Input.Select() return result.get('result'), result.get('error')
[docs] @action def send_text(self, text, **_): """ Simulate a send_text input event :param text: Text to send :type text: str """ result = self._get_kodi().Input.SendText(text=text) return result.get('result'), result.get('error')
@action def get_volume(self, **_): result = self._get_kodi().Application.GetProperties(properties=['volume']) return result.get('result'), result.get('error')
[docs] @action def volup(self, step=10.0, **_): """Volume up (default: +10%)""" volume = ( self._get_kodi() .Application.GetProperties(properties=['volume']) .get('result', {}) .get('volume') ) result = self._get_kodi().Application.SetVolume( volume=int(min(volume + step, 100)) ) return self._build_result(result)
[docs] @action def voldown(self, step=10.0, **_): """Volume down (default: -10%)""" volume = ( self._get_kodi() .Application.GetProperties(properties=['volume']) .get('result', {}) .get('volume') ) result = self._get_kodi().Application.SetVolume( volume=int(max(volume - step, 0)) ) return self._build_result(result)
[docs] @action def set_volume(self, volume, **_): """ Set the application volume :param volume: Volume to set between 0 and 100 :type volume: int """ result = self._get_kodi().Application.SetVolume(volume=int(volume)) return self._build_result(result)
[docs] @action def mute(self, **_): """ Mute/unmute the application """ muted = ( self._get_kodi() .Application.GetProperties(properties=['muted']) .get('result', {}) .get('muted') ) result = self._get_kodi().Application.SetMute(mute=(not muted)) return self._build_result(result)
[docs] @action def is_muted(self, **_): """ Return the muted status of the application """ result = self._get_kodi().Application.GetProperties(properties=['muted']) return result.get('result')
[docs] @action def scan_video_library(self, **_): """ Scan the video library """ result = self._get_kodi().VideoLibrary.Scan() return result.get('result'), result.get('error')
[docs] @action def scan_audio_library(self, **_): """ Scan the audio library """ result = self._get_kodi().AudioLibrary.Scan() return result.get('result'), result.get('error')
[docs] @action def clean_video_library(self, **_): """ Clean the video library """ result = self._get_kodi().VideoLibrary.Clean() return result.get('result'), result.get('error')
[docs] @action def clean_audio_library(self, **_): """ Clean the audio library """ result = self._get_kodi().AudioLibrary.Clean() return result.get('result'), result.get('error')
[docs] @action def quit(self, **_): """ Quit the application """ result = self._get_kodi().Application.Quit() self._clear_resource() return result.get('result'), result.get('error')
[docs] @action def get_songs(self, **_): """ Get the list of songs in the audio library """ result = self._get_kodi().Application.GetSongs() return result.get('result'), result.get('error')
[docs] @action def get_artists(self, **_): """ Get the list of artists in the audio library """ result = self._get_kodi().Application.GetArtists() return result.get('result'), result.get('error')
[docs] @action def get_albums(self, **_): """ Get the list of albums in the audio library """ result = self._get_kodi().Application.GetAlbums() return result.get('result'), result.get('error')
[docs] @action def fullscreen(self, **_): """ Set/unset fullscreen mode """ fullscreen = ( self._get_kodi() .GUI.GetProperties(properties=['fullscreen']) .get('result', {}) .get('fullscreen') ) result = self._get_kodi().GUI.SetFullscreen(fullscreen=(not fullscreen)) return result.get('result'), result.get('error')
[docs] @action def shuffle(self, player_id=None, shuffle=None, **_): """ Set/unset shuffle mode """ if player_id is None: player_id = self._get_player_id() if player_id is None: return None, 'No active players found' if shuffle is None: shuffle = ( self._get_kodi() .Player.GetProperties(playerid=player_id, properties=['shuffled']) .get('result', {}) .get('shuffled') ) result = self._get_kodi().Player.SetShuffle( playerid=player_id, shuffle=(not shuffle) ) return result.get('result'), result.get('error')
[docs] @action def repeat(self, player_id=None, repeat=None, **_): """ Set/unset repeat mode """ if player_id is None: player_id = self._get_player_id() if player_id is None: return None, 'No active players found' if repeat is None: repeat = ( self._get_kodi() .Player.GetProperties(playerid=player_id, properties=['repeat']) .get('result', {}) .get('repeat') ) result = self._get_kodi().Player.SetRepeat( playerid=player_id, repeat='off' if repeat in ('one', 'all') else 'off' ) return result.get('result'), result.get('error')
@staticmethod def _time_pos_to_obj(t): hours = int(t / 3600) minutes = int((t - hours * 3600) / 60) seconds = t - hours * 3600 - minutes * 60 milliseconds = t - int(t) return { 'hours': hours, 'minutes': minutes, 'seconds': seconds, 'milliseconds': milliseconds, } @staticmethod def _time_obj_to_pos(t): return ( t.get('hours', 0) * 3600 + t.get('minutes', 0) * 60 + t.get('seconds', 0) + t.get('milliseconds', 0) / 1000 )
[docs] @action def seek(self, position, player_id=None, **_): """ Move to the specified time position in seconds :param position: Seek time in seconds :type position: float :param player_id: ID of the target player (default: configured/current player). """ if player_id is None: player_id = self._get_player_id() if player_id is None: return None, 'No active players found' position = self._time_pos_to_obj(position) result = self._get_kodi().Player.Seek(playerid=player_id, value=position) return self._build_result(result)
[docs] @action def set_position(self, position, player_id=None, *args, **kwargs): """ Move to the specified time position in seconds :param position: Seek time in seconds :type position: float :param player_id: ID of the target player (default: configured/current player). """ return self.seek(*args, position=position, player_id=player_id, **kwargs)
[docs] @action def back(self, offset=30, player_id=None, **_): """ Move the player execution backward by delta_seconds :param offset: Backward seek duration (default: 30 seconds) :type offset: float :param player_id: ID of the target player (default: configured/current player). """ if player_id is None: player_id = self._get_player_id() if player_id is None: return None, 'No active players found' position = ( self._get_kodi() .Player.GetProperties(playerid=player_id, properties=['time']) .get('result', {}) .get('time', {}) ) position = self._time_obj_to_pos(position) - offset return self.seek(player_id=player_id, position=position)
[docs] @action def forward(self, offset=30, player_id=None, **_): """ Move the player execution forward by delta_seconds :param offset: Forward seek duration (default: 30 seconds) :type offset: float :param player_id: ID of the target player (default: configured/current player). """ if player_id is None: player_id = self._get_player_id() if player_id is None: return None, 'No active players found' position = ( self._get_kodi() .Player.GetProperties(playerid=player_id, properties=['time']) .get('result', {}) .get('time', {}) ) position = self._time_obj_to_pos(position) + offset return self.seek(player_id=player_id, position=position)
@action def status(self, player_id=None): media_props = { 'album': 'album', 'artist': 'artist', 'duration': 'duration', 'fanart': 'fanart', 'file': 'file', 'season': 'season', 'showtitle': 'showtitle', 'streamdetails': 'streamdetails', 'thumbnail': 'thumbnail', 'title': 'title', 'tvshowid': 'tvshowid', 'url': 'file', } app_props = { 'volume': 'volume', 'mute': 'muted', } player_props = { "duration": "totaltime", "position": "time", "repeat": "repeat", "seekable": "canseek", 'speed': 'speed', "subtitles": "subtitles", } ret = {'state': PlayerState.IDLE.value} try: kodi = self._get_kodi() players = kodi.Player.GetActivePlayers().get('result', []) except Exception as e: self.logger.debug(f'Could not get active players: {str(e)}') return ret ret['state'] = PlayerState.STOP.value app = kodi.Application.GetProperties( properties=list(set(app_props.values())) ).get('result', {}) for status_prop, kodi_prop in app_props.items(): ret[status_prop] = app.get(kodi_prop) if not players: return ret if player_id is None: player_id = players.pop().get('playerid') else: for p in players: if p['player_id'] == player_id: player_id = p break if player_id is None: return ret media = ( kodi.Player.GetItem( playerid=player_id, properties=list(set(media_props.values())) ) .get('result', {}) .get('item', {}) ) for status_prop, kodi_prop in media_props.items(): ret[status_prop] = media.get(kodi_prop) player_info = kodi.Player.GetProperties( playerid=player_id, properties=list(set(player_props.values())) ).get('result', {}) for status_prop, kodi_prop in player_props.items(): ret[status_prop] = player_info.get(kodi_prop) if ret['duration']: ret['duration'] = self._time_obj_to_pos(ret['duration']) if ret['position']: ret['position'] = self._time_obj_to_pos(ret['position']) ret['state'] = ( PlayerState.PAUSE.value if player_info.get('speed', 0) == 0 else PlayerState.PLAY.value ) return ret def toggle_subtitles(self, *_, **__): raise NotImplementedError def set_subtitles(self, *_, **__): raise NotImplementedError def remove_subtitles(self, *_, **__): raise NotImplementedError def is_playing(self, *_, **__): raise NotImplementedError def load(self, *_, **__): raise NotImplementedError @property def supports_local_media(self) -> bool: return False @property def supports_local_pipe(self) -> bool: return False
# vim:sw=4:ts=4:et: