Source code for platypush.plugins.media.gstreamer
import os
from typing import Optional
from platypush.plugins.media import PlayerState, MediaPlugin
from platypush.message.event.media import MediaPlayRequestEvent, MediaVolumeChangedEvent
from platypush.plugins import action
from platypush.plugins.media import MediaResource
from platypush.plugins.media.gstreamer.model import MediaPipeline
[docs]
class MediaGstreamerPlugin(MediaPlugin):
    """
    Plugin to play media over GStreamer.
    """
[docs]
    def __init__(self, sink: Optional[str] = None, *args, **kwargs):
        """
        :param sink: GStreamer audio sink (default: ``None``, automatic).
        """
        super().__init__(*args, **kwargs)
        self.sink = sink
        self._player: Optional[MediaPipeline] = None
    def _allocate_pipeline(self, resource: MediaResource) -> MediaPipeline:
        pipeline = MediaPipeline(resource)
        if self.sink:
            sink = pipeline.add_sink(self.sink, sync=False)
            pipeline.link(pipeline.get_source(), sink)
        self._player = pipeline
        self._latest_resource = resource
        return pipeline
    def _status(self) -> dict:
        if not self._player:
            return {'state': PlayerState.STOP.value}
        pos = self._player.get_position()
        length = self._player.get_duration()
        status = {
            'duration': length,
            'mute': self._player.is_muted(),
            'pause': self._player.is_paused(),
            'percent_pos': (
                pos / length
                if pos is not None and length is not None and pos >= 0 and length > 0
                else 0
            ),
            'position': pos,
            'seekable': length is not None and length > 0,
            'state': self._gst_to_player_state(self._player.get_state()).value,
            'volume': self._player.get_volume() * 100,
        }
        if self._latest_resource:
            status.update(
                {
                    k: v
                    for k, v in self._latest_resource.to_dict().items()
                    if v is not None
                }
            )
        return status
    def _get_volume(self) -> float:
        assert self._player, 'No instance is running'
        return self._player.get_volume() * 100.0
    def _set_position(self, position: float) -> dict:
        assert self._player, 'No instance is running'
        self._player.seek(position)
        return self._status()
[docs]
    @action
    def play(self, resource: Optional[str] = None, **kwargs):
        """
        Play a resource.
        :param resource: Resource to play - can be a local file or a remote URL
        """
        if not resource:
            if self._player:
                self._player.play()
            return self._status()
        self._bus.post(
            MediaPlayRequestEvent(
                player='local', plugin='media.gstreamer', resource=resource
            )
        )
        media = self._latest_resource = self._get_resource(resource, **kwargs)
        media.open(**kwargs)
        if media.resource and os.path.isfile(os.path.abspath(media.resource)):
            media.resource = 'file://' + media.resource
        pipeline = self._allocate_pipeline(media)
        pipeline.play()
        if self.volume:
            pipeline.set_volume(self.volume / 100.0)
        return self._status()
[docs]
    @action
    def pause(self, *_, **__):
        """Toggle the paused state"""
        assert self._player, 'No instance is running'
        self._player.pause()
        return self._status()
[docs]
    @action
    def quit(self, *_, **__):
        """Stop and quit the player (alias for :meth:`.stop`)"""
        if self._latest_resource:
            self._latest_resource.close()
            self._latest_resource = None
        if self._player:
            self._player.stop()
            self._player = None
        else:
            self.logger.info('No instance is running')
        return {'state': PlayerState.STOP.value}
[docs]
    @action
    def stop(self):
        """Stop and quit the player (alias for :meth:`.quit`)"""
        return self.quit()
[docs]
    @action
    def voldown(self, step=10.0):
        """Volume down by (default: 10)%"""
        return self.set_volume(self._get_volume() - step)
[docs]
    @action
    def volup(self, step=10.0):
        """Volume up by (default: 10)%"""
        return self.set_volume(self._get_volume() + step)
[docs]
    @action
    def get_volume(self) -> float:
        """
        Get the volume.
        :return: Volume value between 0 and 100.
        """
        assert self._player, 'No instance is running'
        return self._player.get_volume() * 100.0
[docs]
    @action
    def set_volume(self, volume):
        """
        Set the volume.
        :param volume: Volume value between 0 and 100.
        """
        assert self._player, 'Player not running'
        volume = max(0, min(1, volume / 100.0))
        self._player.set_volume(volume)
        self._player.post_event(MediaVolumeChangedEvent, volume=volume * 100)
        return self._status()
[docs]
    @action
    def seek(self, position: float) -> dict:
        """
        Seek backward/forward by the specified number of seconds.
        :param position: Number of seconds relative to the current cursor.
        """
        assert self._player, 'No instance is running'
        cur_pos = self._player.get_position()
        # return self._set_position(cur_pos + position)
        return self._set_position((cur_pos or 0) + float(position))
[docs]
    @action
    def back(self, offset=60.0):
        """Back by (default: 60) seconds"""
        return self.seek(-offset)
[docs]
    @action
    def forward(self, offset=60.0):
        """Forward by (default: 60) seconds"""
        return self.seek(offset)
[docs]
    @action
    def is_playing(self):
        """
        :returns: True if it's playing, False otherwise
        """
        return self._player and self._player.is_playing()
[docs]
    @action
    def load(self, resource, **_):
        """
        Load/queue a resource/video to the player (alias for :meth:`.play`).
        """
        return self.play(resource)
[docs]
    @action
    def mute(self):
        """Toggle mute state"""
        assert self._player, 'No instance is running'
        muted = self._player.is_muted()
        if muted:
            self._player.unmute()
        else:
            self._player.mute()
        return {'muted': self._player.is_muted()}
[docs]
    @action
    def set_position(self, position: float) -> dict:
        """
        Seek backward/forward to the specified absolute position.
        :param position: Stream position in seconds.
        :return: Player state.
        """
        assert self._player, 'No instance is running'
        self._player.seek(position)
        return self._status()
[docs]
    @action
    def status(self) -> dict:
        """
        Get the current player state.
        """
        return self._status()
    @staticmethod
    def _gst_to_player_state(state) -> PlayerState:
        from gi.repository import Gst  # type: ignore
        if state == Gst.State.PAUSED:
            return PlayerState.PAUSE
        if state == Gst.State.PLAYING:
            return PlayerState.PLAY
        return PlayerState.STOP
    def toggle_subtitles(self, *_, **__):
        raise NotImplementedError
    def set_subtitles(self, *_, **__):
        raise NotImplementedError
    def remove_subtitles(self, *_, **__):
        raise NotImplementedError
# vim:sw=4:ts=4:et:
