import os
import pathlib
from urllib.parse import quote_plus
from datetime import datetime
from threading import Event
from typing import Iterable, Optional, Union
from platypush.backend.http import HttpBackend
from platypush.config import Config
from platypush.context import Variable, get_backend, get_bus
from platypush.message import Message
from platypush.message.event.custom import CustomEvent
from platypush.message.event.music.tidal import TidalPlaylistUpdatedEvent
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.music.tidal.workers import get_items
from platypush.schemas.tidal import (
TidalAlbumSchema,
TidalPlaylistSchema,
TidalArtistSchema,
TidalSearchResultsSchema,
TidalTrackSchema,
)
[docs]
class MusicTidalPlugin(RunnablePlugin):
"""
Plugin to interact with the user's Tidal account and library.
Upon the first login, the application will prompt you with a link to
connect to your Tidal account. Once authorized, you should no longer be
required to explicitly login.
"""
_base_url = 'https://api.tidalhifi.com/v1/'
_default_credentials_file = os.path.join(
str(Config.get('workdir')), 'tidal', 'credentials.json'
)
[docs]
def __init__(
self,
quality: str = 'high',
credentials_file: str = _default_credentials_file,
**kwargs,
):
"""
:param quality: Default audio quality. Default: ``high``.
Supported: [``loseless``, ``master``, ``high``, ``low``].
:param credentials_file: Path to the file where the OAuth session
parameters will be stored (default:
``<WORKDIR>/tidal/credentials.json``).
"""
super().__init__(**kwargs)
self._credentials_file = os.path.expanduser(credentials_file)
self._user_playlists = {}
self._quality = self._get_quality(quality)
self._session = None
self._login_event = Event()
@staticmethod
def _get_quality(quality: str):
from tidalapi import Quality
try:
return getattr(Quality, quality.lower())
except AttributeError:
try:
return Quality(quality.upper())
except ValueError as e:
raise AssertionError(
f'Invalid quality: {quality}. Supported values: '
f'{[q.value for q in Quality]}'
) from e
def _open_saved_session(self):
if not self._session:
return
try:
# Attempt to reload an existing session from file
self._session.load_session_from_file(pathlib.Path(self._credentials_file))
except Exception as e:
self.logger.warning(
"Could not load PKCE session from %s: %s", self._credentials_file, e
)
def _create_new_session(self):
if not self._session:
return
http: HttpBackend = get_backend('http') # type: ignore
assert http, 'The HTTP backend is required to perform the TIDAL PKCE login flow'
self._login_event.clear()
login_url = self._session.pkce_login_url()
self.logger.info(
'\n\nPlease open the following URL in your browser to login to TIDAL:\n%s\n\n',
http.local_base_url + f'/tidal/login?url={quote_plus(login_url)}',
)
assert self.bus, 'Event bus is not available'
self.bus.register_handler(
handler=self._custom_event_callback,
type=CustomEvent,
)
try:
self.logger.info('Waiting for TIDAL login callback...')
self._login_event.wait(timeout=300)
finally:
self.bus.unregister_handler(
handler=self._custom_event_callback,
type=CustomEvent,
)
assert self._session.check_login(), 'TIDAL PKCE login failed'
self.logger.info("PKCE login successful.")
pathlib.Path(self._credentials_file).parent.mkdir(parents=True, exist_ok=True)
self._session.save_session_to_file(pathlib.Path(self._credentials_file))
def _custom_event_callback(self, event: Message):
if not (
isinstance(event, CustomEvent)
and event.args.get('subtype') == 'tidal/login_callback'
):
return
url = event.args.get('url')
assert url, 'Missing "url" in TIDAL login callback event'
assert self._session, 'TIDAL session is not initialized'
token_data = self._session.pkce_get_auth_token(url)
self._session.process_auth_token(token_data)
self._login_event.set()
@staticmethod
def _ensure_user_playlist(playlist):
from tidalapi import UserPlaylist
assert isinstance(
playlist, UserPlaylist
), 'This operation is only possible on user playlists'
return playlist
@property
def session(self):
from tidalapi import Config, Session
if self._session and self._session.check_login():
return self._session
# Attempt to reload the existing session from file
self._session = Session(config=Config(quality=self._quality))
self._open_saved_session()
if not self._session.check_login():
# Create a new session if we couldn't load an existing one
self._create_new_session()
assert (
self._session.user and self._session.check_login()
), 'Could not connect to TIDAL'
return self._session
@property
def user(self):
user = self.session.user
assert user, 'Not logged in'
return user
[docs]
@action
def create_playlist(self, name: str, description: Optional[str] = None):
"""
Create a new playlist.
:param name: Playlist name.
:param description: Optional playlist description.
:return: .. schema:: tidal.TidalPlaylistSchema
"""
ret = self.user.create_playlist(name, description)
return TidalPlaylistSchema().dump(ret)
[docs]
@action
def delete_playlist(self, playlist_id: str):
"""
Delete a playlist by ID.
:param playlist_id: ID of the playlist to delete.
"""
pl = self.session.playlist(playlist_id)
pl = self._ensure_user_playlist(pl)
pl.delete()
[docs]
@action
def edit_playlist(self, playlist_id: str, title=None, description=None):
"""
Edit a playlist's metadata.
:param name: New name.
:param description: New description.
"""
pl = self.session.playlist(playlist_id)
pl = self._ensure_user_playlist(pl)
pl.edit(title=title, description=description)
[docs]
@action
def get_playlists(self):
"""
Get the user's playlists (track lists are excluded).
:return: .. schema:: tidal.TidalPlaylistSchema(many=True)
"""
ret = self.user.playlists() + self.user.favorites.playlists()
return TidalPlaylistSchema().dump(ret, many=True)
[docs]
@action
def get_playlist(self, playlist_id: str):
"""
Get the details of a playlist (including tracks).
:param playlist_id: Playlist ID.
:return: .. schema:: tidal.TidalPlaylistSchema
"""
pl = self.session.playlist(playlist_id)
pl._tracks = get_items(pl.tracks) # type: ignore
return TidalPlaylistSchema().dump(pl)
[docs]
@action
def get_artist(self, artist_id: Union[str, int]):
"""
Get the details of an artist.
:param artist_id: Artist ID.
:return: .. schema:: tidal.TidalArtistSchema
"""
ret = self.session.artist(artist_id)
ret.albums = get_items(ret.get_albums) # type: ignore
return TidalArtistSchema().dump(ret)
[docs]
@action
def get_album(self, album_id: Union[str, int]):
"""
Get the details of an album.
:param artist_id: Album ID.
:return: .. schema:: tidal.TidalAlbumSchema
"""
ret = self.session.album(album_id)
return TidalAlbumSchema(with_tracks=True).dump(ret)
[docs]
@action
def get_track(self, track_id: Union[str, int]):
"""
Get the details of an track.
:param artist_id: Track ID.
:return: .. schema:: tidal.TidalTrackSchema
"""
ret = self.session.album(track_id)
return TidalTrackSchema().dump(ret)
[docs]
@action
def search(
self,
query: str,
limit: int = 50,
offset: int = 0,
type: Optional[str] = None,
):
"""
Perform a search.
:param query: Query string.
:param limit: Maximum results that should be returned (default: 50).
:param offset: Search offset (default: 0).
:param type: Type of results that should be returned. Default: None
(return all the results that match the query). Supported:
``artist``, ``album``, ``track`` and ``playlist``.
:return: .. schema:: tidal.TidalSearchResultsSchema
"""
from tidalapi.artist import Artist
from tidalapi.album import Album
from tidalapi.media import Track
from tidalapi.playlist import Playlist
models = None
if type is not None:
if type == 'artist':
models = [Artist]
elif type == 'album':
models = [Album]
elif type == 'track':
models = [Track]
elif type == 'playlist':
models = [Playlist]
else:
raise AssertionError(f'Unsupported search type: {type}')
ret = self.session.search(query, models=models, limit=limit, offset=offset)
return TidalSearchResultsSchema().dump(ret)
[docs]
@action
def get_download_url(self, track_id: str) -> str:
"""
Get the direct download URL of a track.
:param artist_id: Track ID.
"""
return self.session.track(track_id).get_url()
[docs]
@action
def add_to_playlist(self, playlist_id: str, track_ids: Iterable[Union[str, int]]):
"""
Append one or more tracks to a playlist.
:param playlist_id: Target playlist ID.
:param track_ids: List of track IDs to append.
"""
pl = self.session.playlist(playlist_id)
pl = self._ensure_user_playlist(pl)
pl.add(track_ids)
[docs]
@action
def remove_from_playlist(
self,
playlist_id: str,
track_id: Optional[Union[str, int]] = None,
index: Optional[int] = None,
):
"""
Remove a track from a playlist.
Specify either the ``track_id`` or the ``index``.
:param playlist_id: Target playlist ID.
:param track_id: ID of the track to remove.
:param index: Index of the track to remove.
"""
assert not (
track_id is None and index is None
), 'Please specify either track_id or index'
pl = self.session.playlist(playlist_id)
pl = self._ensure_user_playlist(pl)
if index:
pl.remove_by_index(index)
if track_id:
pl.remove_by_id(track_id)
[docs]
@action
def add_track(self, track_id: Union[str, int]):
"""
Add a track to the user's collection.
:param track_id: Track ID.
"""
self.user.favorites.add_track(track_id)
[docs]
@action
def add_album(self, album_id: Union[str, int]):
"""
Add an album to the user's collection.
:param album_id: Album ID.
"""
self.user.favorites.add_album(album_id)
[docs]
@action
def add_artist(self, artist_id: Union[str, int]):
"""
Add an artist to the user's collection.
:param artist_id: Artist ID.
"""
self.user.favorites.add_artist(artist_id)
[docs]
@action
def add_playlist(self, playlist_id: str):
"""
Add a playlist to the user's collection.
:param playlist_id: Playlist ID.
"""
self.user.favorites.add_playlist(playlist_id)
[docs]
@action
def remove_track(self, track_id: Union[str, int]):
"""
Remove a track from the user's collection.
:param track_id: Track ID.
"""
self.user.favorites.remove_track(track_id)
[docs]
@action
def remove_album(self, album_id: Union[str, int]):
"""
Remove an album from the user's collection.
:param album_id: Album ID.
"""
self.user.favorites.remove_album(album_id)
[docs]
@action
def remove_artist(self, artist_id: Union[str, int]):
"""
Remove an artist from the user's collection.
:param artist_id: Artist ID.
"""
self.user.favorites.remove_artist(artist_id)
[docs]
@action
def remove_playlist(self, playlist_id: str):
"""
Remove a playlist from the user's collection.
:param playlist_id: Playlist ID.
"""
self.user.favorites.remove_playlist(playlist_id)
def main(self):
while not self.should_stop():
playlists = self.session.user.playlists() # type: ignore
for pl in playlists:
last_updated_var = Variable(f'TIDAL_PLAYLIST_LAST_UPDATE[{pl.id}]')
prev_last_updated = last_updated_var.get()
if prev_last_updated:
prev_last_updated = datetime.fromisoformat(prev_last_updated)
if pl.last_updated > prev_last_updated:
get_bus().post(TidalPlaylistUpdatedEvent(playlist_id=pl.id))
if not prev_last_updated or pl.last_updated > prev_last_updated:
last_updated_var.set(pl.last_updated.isoformat())
self.wait_stop(self.poll_interval)