Source code for platypush.plugins.media.subtitles
import gzip
import os
import requests
import tempfile
import threading
from platypush.plugins import Plugin, action
from platypush.utils import find_files_by_ext
[docs]
class MediaSubtitlesPlugin(Plugin):
"""
Plugin to get video subtitles from OpenSubtitles.
"""
[docs]
def __init__(self, username, password, language=None, **kwargs):
"""
:param username: Your OpenSubtitles username
:type username: str
:param password: Your OpenSubtitles password
:type password: str
:param language: Preferred language name, ISO639 code or OpenSubtitles
language ID to be used for the subtitles. Also supports an (ordered)
list of preferred languages
:type language: str or list[str]
"""
from pythonopensubtitles.opensubtitles import OpenSubtitles
super().__init__(**kwargs)
self._ost = OpenSubtitles()
self._token = self._ost.login(username, password)
self.languages = []
self._file_lock = threading.RLock()
if language:
if isinstance(language, str):
self.languages.append(language.lower())
elif isinstance(language, list):
self.languages.extend([lang.lower() for lang in language])
else:
raise AttributeError(
'{} is neither a string nor a list'.format(language)
)
[docs]
@action
def get_subtitles(self, resource, language=None):
"""
Get the subtitles data for a video resource
:param resource: Media file, torrent or URL to the media resource
:type resource: str
:param language: Language name or code (default: configured preferred language).
Choose 'all' for all the languages
:type language: str
"""
from pythonopensubtitles.utils import File
if resource.startswith('file://'):
resource = resource[len('file://') :]
resource = os.path.abspath(os.path.expanduser(resource))
if not os.path.isfile(resource):
return None, '{} is not a valid file'.format(resource)
file = resource
cwd = os.getcwd()
media_dir = os.path.dirname(resource)
os.chdir(media_dir)
file = file.split(os.sep)[-1]
local_subs = [
{
'IsLocal': True,
'MovieName': '[Local subtitle]',
'SubFileName': sub.split(os.sep)[-1],
'SubDownloadLink': 'file://' + os.path.join(media_dir, sub),
}
for sub in find_files_by_ext(media_dir, '.srt', '.vtt')
]
self.logger.info(
'Found {} local subtitles for {}'.format(len(local_subs), file)
)
languages = [language.lower()] if language else self.languages
try:
file_hash = File(file).get_hash()
subs = self._ost.search_subtitles(
[
{
'sublanguageid': 'all',
'moviehash': file_hash,
}
]
)
subs = [
sub
for sub in subs
if not languages
or languages[0] == 'all'
or sub.get('LanguageName', '').lower() in languages
or sub.get('SubLanguageID', '').lower() in languages
or sub.get('ISO639', '').lower() in languages
]
for sub in subs:
sub['IsLocal'] = False
self.logger.info(
'Found {} OpenSubtitles items for {}'.format(len(subs), file)
)
return local_subs + subs
finally:
os.chdir(cwd)
[docs]
@action
def search(self, resource, language=None):
"""
Alias for :meth:`.get_subtitles`.
:param resource: Media file, torrent or URL to the media resource
:type resource: str
:param language: Language name or code (default: configured preferred language).
Choose 'all' for all the languages
:type language: str
"""
return self.get_subtitles(resource=resource, language=language)
[docs]
@action
def download(self, link, media_resource=None, path=None, convert_to_vtt=False):
"""
Downloads a subtitle link (.srt/.vtt file or gzip/zip OpenSubtitles archive link) to the specified directory
:param link: Local subtitles file or OpenSubtitles gzip download link
:type link: str
:param path: Path where the subtitle file will be downloaded (default: temporary file under /tmp)
:type path: str
:param media_resource: Name of the media resource. If set and if it's a
media local file then the subtitles will be saved in the same folder
:type media_resource: str
:param convert_to_vtt: If set to True, then the downloaded subtitles
will be converted to VTT format (default: no conversion)
:type convert_to_vtt: bool
:returns: dict.
Format::
{
"filename": "/path/to/subtitle/file.srt"
}
"""
if link.startswith('file://'):
link = link[len('file://') :]
if os.path.isfile(link):
if convert_to_vtt:
link = self.to_vtt(link).output
return {'filename': link}
gzip_content = requests.get(link).content
if not path and media_resource:
if media_resource.startswith('file://'):
media_resource = media_resource[len('file://') :]
if os.path.isfile(media_resource):
media_resource = os.path.abspath(media_resource)
path = (
os.path.join(
os.path.dirname(media_resource),
'.'.join(os.path.basename(media_resource).split('.')[:-1]),
)
+ '.srt'
)
if path:
f = open(path, 'wb') # noqa
else:
f = tempfile.NamedTemporaryFile(
prefix='media_subs_', suffix='.srt', delete=False
)
path = f.name
try:
with f:
f.write(gzip.decompress(gzip_content))
if convert_to_vtt:
path = self.to_vtt(path).output
except Exception as e:
os.unlink(path)
raise e
return {'filename': path}
[docs]
@action
def to_vtt(self, filename):
"""
Get the VTT content given an SRT file. Will return the original content if
the file is already in VTT format.
"""
if filename.lower().endswith('.vtt'):
return filename
import webvtt
with self._file_lock:
try:
webvtt.read(filename)
return filename
except Exception:
webvtt.from_srt(filename).save()
return '.'.join(filename.split('.')[:-1]) + '.vtt'
# vim:sw=4:ts=4:et: