545 lines
16 KiB
Python
545 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""All functionality that requires Kodi imports"""
|
|
|
|
from __future__ import absolute_import, division, unicode_literals
|
|
|
|
import logging
|
|
from contextlib import contextmanager
|
|
|
|
import xbmc
|
|
import xbmcaddon
|
|
import xbmcgui
|
|
import xbmcplugin
|
|
|
|
ADDON = xbmcaddon.Addon()
|
|
|
|
SORT_METHODS = dict(
|
|
unsorted=xbmcplugin.SORT_METHOD_UNSORTED,
|
|
label=xbmcplugin.SORT_METHOD_LABEL_IGNORE_FOLDERS,
|
|
episode=xbmcplugin.SORT_METHOD_EPISODE,
|
|
duration=xbmcplugin.SORT_METHOD_DURATION,
|
|
year=xbmcplugin.SORT_METHOD_VIDEO_YEAR,
|
|
date=xbmcplugin.SORT_METHOD_DATE,
|
|
)
|
|
DEFAULT_SORT_METHODS = [
|
|
'unsorted', 'label'
|
|
]
|
|
|
|
_LOGGER = logging.getLogger('kodiutils')
|
|
|
|
|
|
class TitleItem:
|
|
""" This helper object holds all information to be used with Kodi xbmc's ListItem object """
|
|
|
|
def __init__(self, title, path=None, art_dict=None, info_dict=None, prop_dict=None, stream_dict=None, context_menu=None, subtitles_path=None,
|
|
is_playable=False):
|
|
""" The constructor for the TitleItem class
|
|
:type title: str
|
|
:type path: str
|
|
:type art_dict: dict
|
|
:type info_dict: dict
|
|
:type prop_dict: dict
|
|
:type stream_dict: dict
|
|
:type context_menu: list[tuple[str, str]]
|
|
:type subtitles_path: list[str]
|
|
:type is_playable: bool
|
|
"""
|
|
self.title = title
|
|
self.path = path
|
|
self.art_dict = art_dict
|
|
self.info_dict = info_dict
|
|
self.stream_dict = stream_dict
|
|
self.prop_dict = prop_dict
|
|
self.context_menu = context_menu
|
|
self.subtitles_path = subtitles_path
|
|
self.is_playable = is_playable
|
|
|
|
def __repr__(self):
|
|
return "%r" % self.__dict__
|
|
|
|
|
|
class SafeDict(dict):
|
|
"""A safe dictionary implementation that does not break down on missing keys"""
|
|
|
|
def __missing__(self, key):
|
|
"""Replace missing keys with the original placeholder"""
|
|
return '{' + key + '}'
|
|
|
|
|
|
def to_unicode(text, encoding='utf-8', errors='strict'):
|
|
"""Force text to unicode"""
|
|
if isinstance(text, bytes):
|
|
return text.decode(encoding, errors=errors)
|
|
return text
|
|
|
|
|
|
def from_unicode(text, encoding='utf-8', errors='strict'):
|
|
"""Force unicode to text"""
|
|
import sys
|
|
if sys.version_info.major == 2 and isinstance(text, unicode): # noqa: F821; pylint: disable=undefined-variable
|
|
return text.encode(encoding, errors)
|
|
return text
|
|
|
|
|
|
def addon_icon():
|
|
"""Cache and return add-on icon"""
|
|
return get_addon_info('icon')
|
|
|
|
|
|
def addon_id():
|
|
"""Cache and return add-on ID"""
|
|
return get_addon_info('id')
|
|
|
|
|
|
def addon_fanart():
|
|
"""Cache and return add-on fanart"""
|
|
return get_addon_info('fanart')
|
|
|
|
|
|
def addon_name():
|
|
"""Cache and return add-on name"""
|
|
return get_addon_info('name')
|
|
|
|
|
|
def addon_path():
|
|
"""Cache and return add-on path"""
|
|
return get_addon_info('path')
|
|
|
|
|
|
def addon_profile():
|
|
"""Cache and return add-on profile"""
|
|
return to_unicode(xbmc.translatePath(ADDON.getAddonInfo('profile')))
|
|
|
|
|
|
def url_for(name, *args, **kwargs):
|
|
"""Wrapper for routing.url_for() to lookup by name"""
|
|
from resources.lib import addon
|
|
return addon.routing.url_for(getattr(addon, name), *args, **kwargs)
|
|
|
|
|
|
def show_listing(title_items, category=None, sort=None, content=None, cache=True):
|
|
""" Show a virtual directory in Kodi """
|
|
from resources.lib import addon
|
|
|
|
if content:
|
|
# content is one of: files, songs, artists, albums, movies, tvshows, episodes, musicvideos, videos, images, games
|
|
xbmcplugin.setContent(addon.routing.handle, content=content)
|
|
|
|
# Jump through hoops to get a stable breadcrumbs implementation
|
|
category_label = ''
|
|
if category:
|
|
if not content:
|
|
category_label = addon_name() + ' / '
|
|
if isinstance(category, int):
|
|
category_label += localize(category)
|
|
else:
|
|
category_label += category
|
|
elif not content:
|
|
category_label = addon_name()
|
|
|
|
xbmcplugin.setPluginCategory(handle=addon.routing.handle, category=category_label)
|
|
|
|
# Add all sort methods to GUI (start with preferred)
|
|
if sort is None:
|
|
sort = DEFAULT_SORT_METHODS
|
|
elif not isinstance(sort, list):
|
|
sort = [sort] + DEFAULT_SORT_METHODS
|
|
|
|
for key in sort:
|
|
xbmcplugin.addSortMethod(handle=addon.routing.handle, sortMethod=SORT_METHODS[key])
|
|
|
|
# Add the listings
|
|
listing = []
|
|
for title_item in title_items:
|
|
# Three options:
|
|
# - item is a virtual directory/folder (not playable, path)
|
|
# - item is a playable file (playable, path)
|
|
# - item is non-actionable item (not playable, no path)
|
|
is_folder = bool(not title_item.is_playable and title_item.path)
|
|
is_playable = bool(title_item.is_playable and title_item.path)
|
|
|
|
list_item = xbmcgui.ListItem(label=title_item.title, path=title_item.path)
|
|
|
|
if title_item.prop_dict:
|
|
list_item.setProperties(title_item.prop_dict)
|
|
list_item.setProperty(key='IsPlayable', value='true' if is_playable else 'false')
|
|
|
|
list_item.setIsFolder(is_folder)
|
|
|
|
if title_item.art_dict:
|
|
list_item.setArt(title_item.art_dict)
|
|
|
|
if title_item.info_dict:
|
|
# type is one of: video, music, pictures, game
|
|
list_item.setInfo(type='video', infoLabels=title_item.info_dict)
|
|
|
|
if title_item.stream_dict:
|
|
# type is one of: video, audio, subtitle
|
|
list_item.addStreamInfo('video', title_item.stream_dict)
|
|
|
|
if title_item.context_menu:
|
|
list_item.addContextMenuItems(title_item.context_menu)
|
|
|
|
is_folder = bool(not title_item.is_playable and title_item.path)
|
|
url = title_item.path if title_item.path else None
|
|
listing.append((url, list_item, is_folder))
|
|
|
|
succeeded = xbmcplugin.addDirectoryItems(addon.routing.handle, listing, len(listing))
|
|
xbmcplugin.endOfDirectory(addon.routing.handle, succeeded, cacheToDisc=cache)
|
|
|
|
|
|
def play(stream, title=None, art_dict=None, info_dict=None, prop_dict=None):
|
|
"""Play the given stream"""
|
|
from resources.lib.addon import routing
|
|
|
|
play_item = xbmcgui.ListItem(label=title, path=stream)
|
|
if art_dict:
|
|
play_item.setArt(art_dict)
|
|
if info_dict:
|
|
play_item.setInfo(type='video', infoLabels=info_dict)
|
|
if prop_dict:
|
|
play_item.setProperties(prop_dict)
|
|
|
|
xbmcplugin.setResolvedUrl(routing.handle, True, listitem=play_item)
|
|
|
|
|
|
def get_search_string(heading='', message=''):
|
|
""" Ask the user for a search string """
|
|
search_string = None
|
|
keyboard = xbmc.Keyboard(message, heading)
|
|
keyboard.doModal()
|
|
if keyboard.isConfirmed():
|
|
search_string = to_unicode(keyboard.getText())
|
|
return search_string
|
|
|
|
|
|
def ok_dialog(heading='', message=''):
|
|
"""Show Kodi's OK dialog"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = addon_name()
|
|
return Dialog().ok(heading=heading, line1=message)
|
|
|
|
|
|
def notification(heading='', message='', icon='info', time=4000):
|
|
"""Show a Kodi notification"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = addon_name()
|
|
if not icon:
|
|
icon = addon_icon()
|
|
Dialog().notification(heading=heading, message=message, icon=icon, time=time)
|
|
|
|
|
|
def multiselect(heading='', options=None, autoclose=0, preselect=None, use_details=False):
|
|
"""Show a Kodi multi-select dialog"""
|
|
from xbmcgui import Dialog
|
|
if not heading:
|
|
heading = addon_name()
|
|
return Dialog().multiselect(heading=heading, options=options, autoclose=autoclose, preselect=preselect, useDetails=use_details)
|
|
|
|
|
|
def progress(heading='', message=''):
|
|
""" Show a Kodi progress dialog """
|
|
from xbmcgui import DialogProgress
|
|
if not heading:
|
|
heading = ADDON.getAddonInfo('name')
|
|
dialog_progress = DialogProgress()
|
|
dialog_progress.create(heading=heading, line1=message)
|
|
return dialog_progress
|
|
|
|
|
|
def set_locale():
|
|
"""Load the proper locale for date strings, only once"""
|
|
if hasattr(set_locale, 'cached'):
|
|
return getattr(set_locale, 'cached')
|
|
from locale import Error, LC_ALL, setlocale
|
|
locale_lang = get_global_setting('locale.language').split('.')[-1]
|
|
locale_lang = locale_lang[:-2] + locale_lang[-2:].upper()
|
|
# NOTE: setlocale() only works if the platform supports the Kodi configured locale
|
|
try:
|
|
setlocale(LC_ALL, locale_lang)
|
|
except (Error, ValueError) as exc:
|
|
if locale_lang != 'en_GB':
|
|
_LOGGER.debug("Your system does not support locale '{locale}': {error}", locale=locale_lang, error=exc)
|
|
set_locale.cached = False
|
|
return False
|
|
set_locale.cached = True
|
|
return True
|
|
|
|
|
|
def localize(string_id, **kwargs):
|
|
"""Return the translated string from the .po language files, optionally translating variables"""
|
|
if kwargs:
|
|
from string import Formatter
|
|
return Formatter().vformat(ADDON.getLocalizedString(string_id), (), SafeDict(**kwargs))
|
|
return ADDON.getLocalizedString(string_id)
|
|
|
|
|
|
def get_setting(key, default=None):
|
|
"""Get an add-on setting as string"""
|
|
try:
|
|
value = to_unicode(ADDON.getSetting(key))
|
|
except RuntimeError: # Occurs when the add-on is disabled
|
|
return default
|
|
if value == '' and default is not None:
|
|
return default
|
|
return value
|
|
|
|
|
|
def get_setting_bool(key, default=None):
|
|
"""Get an add-on setting as boolean"""
|
|
try:
|
|
return ADDON.getSettingBool(key)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not a boolean
|
|
value = get_setting(key, default)
|
|
if value not in ('false', 'true'):
|
|
return default
|
|
return bool(value == 'true')
|
|
except RuntimeError: # Occurs when the add-on is disabled
|
|
return default
|
|
|
|
|
|
def get_setting_int(key, default=None):
|
|
"""Get an add-on setting as integer"""
|
|
try:
|
|
return ADDON.getSettingInt(key)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not an integer
|
|
value = get_setting(key, default)
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
return default
|
|
except RuntimeError: # Occurs when the add-on is disabled
|
|
return default
|
|
|
|
|
|
def get_setting_float(key, default=None):
|
|
"""Get an add-on setting"""
|
|
try:
|
|
return ADDON.getSettingNumber(key)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not a float
|
|
value = get_setting(key, default)
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
return default
|
|
except RuntimeError: # Occurs when the add-on is disabled
|
|
return default
|
|
|
|
|
|
def set_setting(key, value):
|
|
"""Set an add-on setting"""
|
|
return ADDON.setSetting(key, from_unicode(str(value)))
|
|
|
|
|
|
def set_setting_bool(key, value):
|
|
"""Set an add-on setting as boolean"""
|
|
try:
|
|
return ADDON.setSettingBool(key, value)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not a boolean
|
|
if value in ['false', 'true']:
|
|
return set_setting(key, value)
|
|
if value:
|
|
return set_setting(key, 'true')
|
|
return set_setting(key, 'false')
|
|
|
|
|
|
def set_setting_int(key, value):
|
|
"""Set an add-on setting as integer"""
|
|
try:
|
|
return ADDON.setSettingInt(key, value)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not an integer
|
|
return set_setting(key, value)
|
|
|
|
|
|
def set_setting_float(key, value):
|
|
"""Set an add-on setting"""
|
|
try:
|
|
return ADDON.setSettingNumber(key, value)
|
|
except (AttributeError, TypeError): # On Krypton or older, or when not a float
|
|
return set_setting(key, value)
|
|
|
|
|
|
def open_settings():
|
|
"""Open the add-in settings window, shows Credentials"""
|
|
ADDON.openSettings()
|
|
|
|
|
|
def get_global_setting(key):
|
|
"""Get a Kodi setting"""
|
|
result = jsonrpc(method='Settings.GetSettingValue', params=dict(setting=key))
|
|
return result.get('result', {}).get('value')
|
|
|
|
|
|
def get_cond_visibility(condition):
|
|
"""Test a condition in XBMC"""
|
|
return xbmc.getCondVisibility(condition)
|
|
|
|
|
|
def has_addon(name):
|
|
"""Checks if add-on is installed"""
|
|
return xbmc.getCondVisibility('System.HasAddon(%s)' % name) == 1
|
|
|
|
|
|
def kodi_version():
|
|
"""Returns major Kodi version"""
|
|
return int(xbmc.getInfoLabel('System.BuildVersion').split('.')[0])
|
|
|
|
|
|
def get_tokens_path():
|
|
"""Cache and return the userdata tokens path"""
|
|
if not hasattr(get_tokens_path, 'cached'):
|
|
get_tokens_path.cached = addon_profile() + 'tokens/'
|
|
return getattr(get_tokens_path, 'cached')
|
|
|
|
|
|
def get_cache_path():
|
|
"""Cache and return the userdata cache path"""
|
|
if not hasattr(get_cache_path, 'cached'):
|
|
get_cache_path.cached = addon_profile() + 'cache/'
|
|
return getattr(get_cache_path, 'cached')
|
|
|
|
|
|
def get_addon_info(key):
|
|
"""Return addon information"""
|
|
return to_unicode(ADDON.getAddonInfo(key))
|
|
|
|
|
|
def listdir(path):
|
|
"""Return all files in a directory (using xbmcvfs)"""
|
|
from xbmcvfs import listdir as vfslistdir
|
|
return vfslistdir(path)
|
|
|
|
|
|
def mkdir(path):
|
|
"""Create a directory (using xbmcvfs)"""
|
|
from xbmcvfs import mkdir as vfsmkdir
|
|
_LOGGER.debug("Create directory '{path}'.", path=path)
|
|
return vfsmkdir(path)
|
|
|
|
|
|
def mkdirs(path):
|
|
"""Create directory including parents (using xbmcvfs)"""
|
|
from xbmcvfs import mkdirs as vfsmkdirs
|
|
_LOGGER.debug("Recursively create directory '{path}'.", path=path)
|
|
return vfsmkdirs(path)
|
|
|
|
|
|
def exists(path):
|
|
"""Whether the path exists (using xbmcvfs)"""
|
|
from xbmcvfs import exists as vfsexists
|
|
return vfsexists(path)
|
|
|
|
|
|
@contextmanager
|
|
def open_file(path, flags='r'):
|
|
"""Open a file (using xbmcvfs)"""
|
|
from xbmcvfs import File
|
|
fdesc = File(path, flags)
|
|
yield fdesc
|
|
fdesc.close()
|
|
|
|
|
|
def stat_file(path):
|
|
"""Return information about a file (using xbmcvfs)"""
|
|
from xbmcvfs import Stat
|
|
return Stat(path)
|
|
|
|
|
|
def delete(path):
|
|
"""Remove a file (using xbmcvfs)"""
|
|
from xbmcvfs import delete as vfsdelete
|
|
_LOGGER.debug("Delete file '{path}'.", path=path)
|
|
return vfsdelete(path)
|
|
|
|
|
|
def container_refresh(url=None):
|
|
"""Refresh the current container or (re)load a container by URL"""
|
|
if url:
|
|
_LOGGER.debug('Execute: Container.Refresh({url})', url=url)
|
|
xbmc.executebuiltin('Container.Refresh({url})'.format(url=url))
|
|
else:
|
|
_LOGGER.debug('Execute: Container.Refresh')
|
|
xbmc.executebuiltin('Container.Refresh')
|
|
|
|
|
|
def container_update(url):
|
|
"""Update the current container while respecting the path history."""
|
|
if url:
|
|
_LOGGER.debug('Execute: Container.Update({url})', url=url)
|
|
xbmc.executebuiltin('Container.Update({url})'.format(url=url))
|
|
else:
|
|
# URL is a mandatory argument for Container.Update, use Container.Refresh instead
|
|
container_refresh()
|
|
|
|
|
|
def end_of_directory():
|
|
"""Close a virtual directory, required to avoid a waiting Kodi"""
|
|
from resources.lib.addon import routing
|
|
xbmcplugin.endOfDirectory(handle=routing.handle, succeeded=False, updateListing=False, cacheToDisc=False)
|
|
|
|
|
|
def jsonrpc(*args, **kwargs):
|
|
"""Perform JSONRPC calls"""
|
|
from json import dumps, loads
|
|
|
|
# We do not accept both args and kwargs
|
|
if args and kwargs:
|
|
_LOGGER.error('Wrong use of jsonrpc()')
|
|
return None
|
|
|
|
# Process a list of actions
|
|
if args:
|
|
for (idx, cmd) in enumerate(args):
|
|
if cmd.get('id') is None:
|
|
cmd.update(id=idx)
|
|
if cmd.get('jsonrpc') is None:
|
|
cmd.update(jsonrpc='2.0')
|
|
return loads(xbmc.executeJSONRPC(dumps(args)))
|
|
|
|
# Process a single action
|
|
if kwargs.get('id') is None:
|
|
kwargs.update(id=0)
|
|
if kwargs.get('jsonrpc') is None:
|
|
kwargs.update(jsonrpc='2.0')
|
|
return loads(xbmc.executeJSONRPC(dumps(kwargs)))
|
|
|
|
|
|
def get_cache(key, ttl=None):
|
|
""" Get an item from the cache """
|
|
import time
|
|
path = get_cache_path()
|
|
file = '.'.join(key)
|
|
fullpath = path + file
|
|
|
|
if not exists(fullpath):
|
|
return None
|
|
|
|
if ttl and time.mktime(time.localtime()) - stat_file(fullpath).st_mtime() > ttl:
|
|
return None
|
|
|
|
with open_file(fullpath, 'r') as fdesc:
|
|
try:
|
|
_LOGGER.info('Fetching {file} from cache', file=file)
|
|
import json
|
|
value = json.load(fdesc)
|
|
return value
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def set_cache(key, data):
|
|
""" Store an item in the cache """
|
|
path = get_cache_path()
|
|
file = '.'.join(key)
|
|
fullpath = path + file
|
|
|
|
if not exists(path):
|
|
mkdirs(path)
|
|
|
|
with open_file(fullpath, 'w') as fdesc:
|
|
_LOGGER.info('Storing to cache as {file}', file=file)
|
|
import json
|
|
json.dump(data, fdesc)
|