Source code for platypush.plugins.media.gstreamer

import os
from typing import Optional

from platypush.plugins.media import PlayerState, MediaPlugin
from platypush.message.event.media import MediaPlayRequestEvent, MediaVolumeChangedEvent

from platypush.plugins import action
from platypush.plugins.media import MediaResource
from platypush.plugins.media.gstreamer.model import MediaPipeline


[docs] class MediaGstreamerPlugin(MediaPlugin): """ Plugin to play media over GStreamer. """
[docs] def __init__(self, sink: Optional[str] = None, *args, **kwargs): """ :param sink: GStreamer audio sink (default: ``None``, automatic). """ super().__init__(*args, **kwargs) self.sink = sink self._player: Optional[MediaPipeline] = None
def _allocate_pipeline(self, resource: MediaResource) -> MediaPipeline: pipeline = MediaPipeline(resource) if self.sink: sink = pipeline.add_sink(self.sink, sync=False) pipeline.link(pipeline.get_source(), sink) self._player = pipeline self._latest_resource = resource return pipeline def _status(self) -> dict: if not self._player: return {'state': PlayerState.STOP.value} pos = self._player.get_position() length = self._player.get_duration() status = { 'duration': length, 'mute': self._player.is_muted(), 'pause': self._player.is_paused(), 'percent_pos': ( pos / length if pos is not None and length is not None and pos >= 0 and length > 0 else 0 ), 'position': pos, 'seekable': length is not None and length > 0, 'state': self._gst_to_player_state(self._player.get_state()).value, 'volume': self._player.get_volume() * 100, } if self._latest_resource: status.update( { k: v for k, v in self._latest_resource.to_dict().items() if v is not None } ) return status def _get_volume(self) -> float: assert self._player, 'No instance is running' return self._player.get_volume() * 100.0 def _set_position(self, position: float) -> dict: assert self._player, 'No instance is running' self._player.seek(position) return self._status()
[docs] @action def play(self, resource: Optional[str] = None, **kwargs): """ Play a resource. :param resource: Resource to play - can be a local file or a remote URL """ if not resource: if self._player: self._player.play() return self._status() self._bus.post( MediaPlayRequestEvent( player='local', plugin='media.gstreamer', resource=resource ) ) media = self._latest_resource = self._get_resource(resource, **kwargs) media.open(**kwargs) if media.resource and os.path.isfile(os.path.abspath(media.resource)): media.resource = 'file://' + media.resource pipeline = self._allocate_pipeline(media) pipeline.play() if self.volume: pipeline.set_volume(self.volume / 100.0) return self._status()
[docs] @action def pause(self, *_, **__): """Toggle the paused state""" assert self._player, 'No instance is running' self._player.pause() return self._status()
[docs] @action def quit(self, *_, **__): """Stop and quit the player (alias for :meth:`.stop`)""" if self._latest_resource: self._latest_resource.close() self._latest_resource = None if self._player: self._player.stop() self._player = None else: self.logger.info('No instance is running') return {'state': PlayerState.STOP.value}
[docs] @action def stop(self): """Stop and quit the player (alias for :meth:`.quit`)""" return self.quit()
[docs] @action def voldown(self, step=10.0): """Volume down by (default: 10)%""" return self.set_volume(self._get_volume() - step)
[docs] @action def volup(self, step=10.0): """Volume up by (default: 10)%""" return self.set_volume(self._get_volume() + step)
[docs] @action def get_volume(self) -> float: """ Get the volume. :return: Volume value between 0 and 100. """ assert self._player, 'No instance is running' return self._player.get_volume() * 100.0
[docs] @action def set_volume(self, volume): """ Set the volume. :param volume: Volume value between 0 and 100. """ assert self._player, 'Player not running' volume = max(0, min(1, volume / 100.0)) self._player.set_volume(volume) self._player.post_event(MediaVolumeChangedEvent, volume=volume * 100) return self._status()
[docs] @action def seek(self, position: float) -> dict: """ Seek backward/forward by the specified number of seconds. :param position: Number of seconds relative to the current cursor. """ assert self._player, 'No instance is running' cur_pos = self._player.get_position() # return self._set_position(cur_pos + position) return self._set_position((cur_pos or 0) + float(position))
[docs] @action def back(self, offset=60.0): """Back by (default: 60) seconds""" return self.seek(-offset)
[docs] @action def forward(self, offset=60.0): """Forward by (default: 60) seconds""" return self.seek(offset)
[docs] @action def is_playing(self): """ :returns: True if it's playing, False otherwise """ return self._player and self._player.is_playing()
[docs] @action def load(self, resource, **_): """ Load/queue a resource/video to the player (alias for :meth:`.play`). """ return self.play(resource)
[docs] @action def mute(self): """Toggle mute state""" assert self._player, 'No instance is running' muted = self._player.is_muted() if muted: self._player.unmute() else: self._player.mute() return {'muted': self._player.is_muted()}
[docs] @action def set_position(self, position: float) -> dict: """ Seek backward/forward to the specified absolute position. :param position: Stream position in seconds. :return: Player state. """ assert self._player, 'No instance is running' self._player.seek(position) return self._status()
[docs] @action def status(self) -> dict: """ Get the current player state. """ return self._status()
@staticmethod def _gst_to_player_state(state) -> PlayerState: from gi.repository import Gst # type: ignore if state == Gst.State.PAUSED: return PlayerState.PAUSE if state == Gst.State.PLAYING: return PlayerState.PLAY return PlayerState.STOP def toggle_subtitles(self, *_, **__): raise NotImplementedError def set_subtitles(self, *_, **__): raise NotImplementedError def remove_subtitles(self, *_, **__): raise NotImplementedError
# vim:sw=4:ts=4:et: