Implement My List (#71)

* Fetch my list from amazon cognito sync
* Allow updating My List
This commit is contained in:
Michaël Arnauts 2021-02-09 20:54:40 +01:00 committed by GitHub
parent 497cdd6b14
commit 1140b9b07a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 544 additions and 33 deletions

View File

@ -34,6 +34,14 @@ msgctxt "#30010"
msgid "Search trough the catalogue" msgid "Search trough the catalogue"
msgstr "" msgstr ""
msgctxt "#30011"
msgid "My List"
msgstr ""
msgctxt "#30012"
msgid "Browse My List"
msgstr ""
msgctxt "#30013" msgctxt "#30013"
msgid "TV guide" msgid "TV guide"
msgstr "" msgstr ""
@ -74,6 +82,14 @@ msgstr ""
### CONTEXT MENU ### CONTEXT MENU
msgctxt "#30100"
msgid "Add to My List"
msgstr ""
msgctxt "#30101"
msgid "Remove from My List"
msgstr ""
msgctxt "#30102" msgctxt "#30102"
msgid "Go to Program" msgid "Go to Program"
msgstr "" msgstr ""

View File

@ -35,6 +35,14 @@ msgctxt "#30010"
msgid "Search trough the catalogue" msgid "Search trough the catalogue"
msgstr "Doorzoek de catalogus" msgstr "Doorzoek de catalogus"
msgctxt "#30011"
msgid "My List"
msgstr "Mijn lijst"
msgctxt "#30012"
msgid "Browse My List"
msgstr "Bekijk mijn lijst"
msgctxt "#30013" msgctxt "#30013"
msgid "TV guide" msgid "TV guide"
msgstr "Tv-gids" msgstr "Tv-gids"
@ -75,6 +83,14 @@ msgstr "Bekijk korte videoclips van [B]{program}[/B]"
### CONTEXT MENU ### CONTEXT MENU
msgctxt "#30100"
msgid "Add to My List"
msgstr "Toevoegen aan mijn lijst"
msgctxt "#30101"
msgid "Remove from My List"
msgstr "Verwijderen uit mijn lijst"
msgctxt "#30102" msgctxt "#30102"
msgid "Go to Program" msgid "Go to Program"
msgstr "Ga naar programma" msgstr "Ga naar programma"

View File

@ -98,6 +98,27 @@ def show_catalog_program_season(program, season):
Catalog().show_program_season(program, season) Catalog().show_program_season(program, season)
@routing.route('/mylist')
def show_mylist():
""" Show my list """
from resources.lib.modules.catalog import Catalog
Catalog().show_mylist()
@routing.route('/mylist/add/<uuid>')
def mylist_add(uuid):
""" Add a program to My List """
from resources.lib.modules.catalog import Catalog
Catalog().mylist_add(uuid)
@routing.route('/mylist/del/<uuid>')
def mylist_del(uuid):
""" Remove a program from My List """
from resources.lib.modules.catalog import Catalog
Catalog().mylist_del(uuid)
@routing.route('/search') @routing.route('/search')
@routing.route('/search/<query>') @routing.route('/search/<query>')
def show_search(query=None): def show_search(query=None):

View File

@ -4,6 +4,9 @@
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
import logging import logging
from datetime import datetime
import dateutil.tz
from resources.lib import kodiutils from resources.lib import kodiutils
from resources.lib.kodiutils import TitleItem from resources.lib.kodiutils import TitleItem
@ -19,8 +22,8 @@ class Catalog:
def __init__(self): def __init__(self):
""" Initialise object """ """ Initialise object """
auth = AuthApi(kodiutils.get_setting('username'), kodiutils.get_setting('password'), kodiutils.get_tokens_path()) self._auth = AuthApi(kodiutils.get_setting('username'), kodiutils.get_setting('password'), kodiutils.get_tokens_path())
self._api = ContentApi(auth, cache_path=kodiutils.get_cache_path()) self._api = ContentApi(self._auth, cache_path=kodiutils.get_cache_path())
def show_catalog(self): def show_catalog(self):
""" Show all the programs of all channels """ """ Show all the programs of all channels """
@ -174,3 +177,59 @@ class Catalog:
# Sort like we get our results back. # Sort like we get our results back.
kodiutils.show_listing(listing, 30003, content='episodes') kodiutils.show_listing(listing, 30003, content='episodes')
def show_mylist(self):
""" Show all the programs of all channels """
try:
mylist, _ = self._auth.get_dataset('myList')
except Exception as ex:
kodiutils.notification(message=str(ex))
raise
items = []
for item in mylist:
program = self._api.get_program_by_uuid(item.get('id'))
if program:
program.my_list = True
items.append(program)
listing = [Menu.generate_titleitem(item) for item in items]
# Sort items by title
# Used for A-Z listing or when movies and episodes are mixed.
kodiutils.show_listing(listing, 30011, content='tvshows', sort='title')
def mylist_add(self, uuid):
""" Add a program to My List """
if not uuid:
kodiutils.end_of_directory()
return
mylist, sync_info = self._auth.get_dataset('myList')
if uuid not in [item.get('id') for item in mylist]:
# Python 2.7 doesn't support .timestamp(), and windows doesn't do '%s', so we need to calculate it ourself
epoch = datetime(1970, 1, 1, tzinfo=dateutil.tz.gettz('UTC'))
now = datetime.now(tz=dateutil.tz.gettz('UTC'))
timestamp = str(int((now - epoch).total_seconds())) + '000'
mylist.append({
'id': uuid,
'timestamp': timestamp,
})
self._auth.put_dataset('myList', mylist, sync_info)
kodiutils.end_of_directory()
def mylist_del(self, uuid):
""" Remove a program from My List """
if not uuid:
kodiutils.end_of_directory()
return
mylist, sync_info = self._auth.get_dataset('myList')
new_mylist = [item for item in mylist if item.get('id') != uuid]
self._auth.put_dataset('myList', new_mylist, sync_info)
kodiutils.end_of_directory()

View File

@ -41,6 +41,17 @@ class Menu:
plot=kodiutils.localize(30008), plot=kodiutils.localize(30008),
) )
), ),
TitleItem(
title=kodiutils.localize(30011), # My List
path=kodiutils.url_for('show_mylist'),
art_dict=dict(
icon='DefaultPlaylist.png',
fanart=kodiutils.get_addon_info('fanart'),
),
info_dict=dict(
plot=kodiutils.localize(30012),
)
),
TitleItem( TitleItem(
title=kodiutils.localize(30009), # Search title=kodiutils.localize(30009), # Search
path=kodiutils.url_for('show_search'), path=kodiutils.url_for('show_search'),
@ -90,8 +101,30 @@ class Menu:
# We have episodes, or we don't know it # We have episodes, or we don't know it
title = item.title title = item.title
context_menu = []
if item.uuid:
if item.my_list:
context_menu.append((
kodiutils.localize(30101), # Remove from My List
'Container.Update(%s)' %
kodiutils.url_for('mylist_del', uuid=item.uuid)
))
else:
context_menu.append((
kodiutils.localize(30100), # Add to My List
'Container.Update(%s)' %
kodiutils.url_for('mylist_add', uuid=item.uuid)
))
context_menu.append((
kodiutils.localize(30102), # Go to Program
'Container.Update(%s)' %
kodiutils.url_for('show_catalog_program', program=item.path)
))
return TitleItem(title=title, return TitleItem(title=title,
path=kodiutils.url_for('show_catalog_program', program=item.path), path=kodiutils.url_for('show_catalog_program', program=item.path),
context_menu=context_menu,
art_dict=art_dict, art_dict=art_dict,
info_dict=info_dict) info_dict=info_dict)

View File

@ -9,7 +9,7 @@ from resources.lib import kodiutils
from resources.lib.modules.menu import Menu from resources.lib.modules.menu import Menu
from resources.lib.viervijfzes import CHANNELS, ResolvedStream from resources.lib.viervijfzes import CHANNELS, ResolvedStream
from resources.lib.viervijfzes.auth import AuthApi from resources.lib.viervijfzes.auth import AuthApi
from resources.lib.viervijfzes.auth_awsidp import AuthenticationException, InvalidLoginException from resources.lib.viervijfzes.aws.cognito_idp import AuthenticationException, InvalidLoginException
from resources.lib.viervijfzes.content import ContentApi, GeoblockedException, UnavailableException from resources.lib.viervijfzes.content import ContentApi, GeoblockedException, UnavailableException
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)

View File

@ -9,7 +9,9 @@ import os
import time import time
from resources.lib import kodiutils from resources.lib import kodiutils
from resources.lib.viervijfzes.auth_awsidp import AuthenticationException, AwsIdp, InvalidLoginException from resources.lib.viervijfzes.aws.cognito_identity import CognitoIdentity
from resources.lib.viervijfzes.aws.cognito_idp import AuthenticationException, CognitoIdp, InvalidLoginException
from resources.lib.viervijfzes.aws.cognito_sync import CognitoSync
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -19,6 +21,7 @@ class AuthApi:
COGNITO_REGION = 'eu-west-1' COGNITO_REGION = 'eu-west-1'
COGNITO_POOL_ID = 'eu-west-1_dViSsKM5Y' COGNITO_POOL_ID = 'eu-west-1_dViSsKM5Y'
COGNITO_CLIENT_ID = '6s1h851s8uplco5h6mqh1jac8m' COGNITO_CLIENT_ID = '6s1h851s8uplco5h6mqh1jac8m'
COGNITO_IDENTITY_POOL_ID = 'eu-west-1:8b7eb22c-cf61-43d5-a624-04b494867234'
TOKEN_FILE = 'auth-tokens.json' TOKEN_FILE = 'auth-tokens.json'
@ -93,11 +96,36 @@ class AuthApi:
@staticmethod @staticmethod
def _authenticate(username, password): def _authenticate(username, password):
""" Authenticate with Amazon Cognito and fetch a refresh token and id token. """ """ Authenticate with Amazon Cognito and fetch a refresh token and id token. """
client = AwsIdp(AuthApi.COGNITO_POOL_ID, AuthApi.COGNITO_CLIENT_ID) idp_client = CognitoIdp(AuthApi.COGNITO_POOL_ID, AuthApi.COGNITO_CLIENT_ID)
return client.authenticate(username, password) return idp_client.authenticate(username, password)
@staticmethod @staticmethod
def _refresh(refresh_token): def _refresh(refresh_token):
""" Use the refresh token to fetch a new id token. """ """ Use the refresh token to fetch a new id token. """
client = AwsIdp(AuthApi.COGNITO_POOL_ID, AuthApi.COGNITO_CLIENT_ID) idp_client = CognitoIdp(AuthApi.COGNITO_POOL_ID, AuthApi.COGNITO_CLIENT_ID)
return client.renew_token(refresh_token) return idp_client.renew_token(refresh_token)
def get_dataset(self, dataset):
""" Fetch the value from the specified dataset. """
identity_client = CognitoIdentity(AuthApi.COGNITO_POOL_ID, AuthApi.COGNITO_IDENTITY_POOL_ID)
id_token = self.get_token()
identity_id = identity_client.get_id(id_token)
credentials = identity_client.get_credentials_for_identity(id_token, identity_id)
sync_client = CognitoSync(AuthApi.COGNITO_IDENTITY_POOL_ID, identity_id, credentials)
data, session_token, sync_count = sync_client.list_records(dataset)
sync_info = {
'identity_id': identity_id,
'credentials': credentials,
'session_token': session_token,
'sync_count': sync_count,
}
return data, sync_info
@staticmethod
def put_dataset(dataset, value, sync_info):
""" Store the value from the specified dataset. """
sync_client = CognitoSync(AuthApi.COGNITO_IDENTITY_POOL_ID, sync_info.get('identity_id'), sync_info.get('credentials'))
sync_client.update_records(dataset, value, sync_info.get('session_token'), sync_info.get('sync_count'))

View File

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
""" Amazon Cognito Identity implementation without external dependencies """
from __future__ import absolute_import, division, unicode_literals
import json
import logging
import requests
_LOGGER = logging.getLogger(__name__)
class CognitoIdentity:
""" Cognito Identity """
def __init__(self, pool_id, identity_pool_id):
"""
See https://docs.aws.amazon.com/cognitoidentity/latest/APIReference/Welcome.html.
:param str pool_id:
:param str identity_pool_id:
"""
self.pool_id = pool_id
if "_" not in self.pool_id:
raise ValueError("Invalid pool_id format. Should be <region>_<poolid>.")
self.identity_pool_id = identity_pool_id
self.region = self.pool_id.split("_")[0]
self.url = "https://cognito-identity.%s.amazonaws.com/" % self.region
self._session = requests.session()
def get_id(self, id_token):
""" Get the Identity ID based on the id_token. """
provider = 'cognito-idp.%s.amazonaws.com/%s' % (self.region, self.pool_id)
data = {
"IdentityPoolId": self.identity_pool_id,
"Logins": {
provider: id_token,
}
}
response = self._session.post(self.url, json=data, headers={
'x-amz-target': 'AWSCognitoIdentityService.GetId',
'content-type': 'application/x-amz-json-1.1',
})
_LOGGER.debug(response.text)
result = json.loads(response.text)
return result.get('IdentityId')
def get_credentials_for_identity(self, id_token, identity_id):
""" Get credentials based on the id_token and identity_id. """
provider = 'cognito-idp.%s.amazonaws.com/%s' % (self.region, self.pool_id)
data = {
"IdentityId": identity_id,
"Logins": {
provider: id_token,
}
}
response = self._session.post(self.url, json=data, headers={
'x-amz-target': 'AWSCognitoIdentityService.GetCredentialsForIdentity',
'content-type': 'application/x-amz-json-1.1',
})
_LOGGER.debug(response.text)
result = json.loads(response.text)
return result.get('Credentials')

View File

@ -27,11 +27,14 @@ class AuthenticationException(Exception):
""" Something went wrong while logging in """ """ Something went wrong while logging in """
class AwsIdp: class CognitoIdp:
""" AWS Identity Provider """ """ Cognito IDP """
def __init__(self, pool_id, client_id): def __init__(self, pool_id, client_id):
""" """
See https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/Welcome.html.
:param str pool_id: The AWS user pool to connect to (format: <region>_<poolid>). :param str pool_id: The AWS user pool to connect to (format: <region>_<poolid>).
E.g.: eu-west-1_aLkOfYN3T E.g.: eu-west-1_aLkOfYN3T
:param str client_id: The client application ID (the ID of the application connecting) :param str client_id: The client application ID (the ID of the application connecting)
@ -291,7 +294,7 @@ class AwsIdp:
@staticmethod @staticmethod
def __hex_hash(hex_string): def __hex_hash(hex_string):
return AwsIdp.__hash_sha256(bytearray.fromhex(hex_string)) return CognitoIdp.__hash_sha256(bytearray.fromhex(hex_string))
@staticmethod @staticmethod
def __hash_sha256(buf): def __hash_sha256(buf):
@ -311,7 +314,7 @@ class AwsIdp:
# noinspection PyTypeChecker # noinspection PyTypeChecker
if not isinstance(long_int, six.string_types): if not isinstance(long_int, six.string_types):
hash_str = AwsIdp.__long_to_hex(long_int) hash_str = CognitoIdp.__long_to_hex(long_int)
else: else:
hash_str = long_int hash_str = long_int
if len(hash_str) % 2 == 1: if len(hash_str) % 2 == 1:
@ -323,7 +326,7 @@ class AwsIdp:
@staticmethod @staticmethod
def __get_random(nbytes): def __get_random(nbytes):
random_hex = binascii.hexlify(os.urandom(nbytes)) random_hex = binascii.hexlify(os.urandom(nbytes))
return AwsIdp.__hex_to_long(random_hex) return CognitoIdp.__hex_to_long(random_hex)
@staticmethod @staticmethod
def __get_current_timestamp(): def __get_current_timestamp():

View File

@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-
""" Amazon Cognito Sync implementation without external dependencies """
from __future__ import absolute_import, division, unicode_literals
import datetime
import hashlib
import hmac
import json
import logging
import requests
try: # Python 3
from urllib.parse import quote, urlparse
except ImportError: # Python 2
from urllib import quote
from urlparse import urlparse
_LOGGER = logging.getLogger(__name__)
class CognitoSync:
""" Amazon Cognito Sync """
def __init__(self, identity_pool_id, identity_id, credentials):
"""
See https://docs.aws.amazon.com/cognitosync/latest/APIReference/Welcome.html.
:param str identity_pool_id:
:param str identity_id:
:param dict credentials:
"""
self.identity_pool_id = identity_pool_id
self.identity_id = identity_id
self.credentials = credentials
self.region = self.identity_pool_id.split(":")[0]
self.url = "https://cognito-sync.%s.amazonaws.com" % self.region
self._session = requests.session()
def _sign(self, request, service='cognito-sync'):
""" Sign the request.
More info at https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html.
:param requests.PreparedRequest request: A prepared request that should be signed.
:param str service: The service where this request is going to.
"""
def sign(key, msg):
""" Sign this message. """
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def get_signature_key(key, date_stamp, region_name, service_name):
""" Generate a signature key. """
k_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
k_region = sign(k_date, region_name)
k_service = sign(k_region, service_name)
k_signing = sign(k_service, 'aws4_request')
return k_signing
# Parse the URL
url_parsed = urlparse(request.url)
# Create a date for headers and the credential string
now = datetime.datetime.utcnow()
amzdate = now.strftime('%Y%m%dT%H%M%SZ')
datestamp = now.strftime('%Y%m%d') # Date w/o time, used in credential scope
# Step 1. Create a canonical request
canonical_uri = quote(url_parsed.path)
canonical_querystring = url_parsed.query # TODO: sort when using multiple values
canonical_headers = ('host:' + url_parsed.netloc + '\n' +
'x-amz-date:' + amzdate + '\n')
signed_headers = 'host;x-amz-date'
if request.body:
payload_hash = hashlib.sha256(request.body).hexdigest()
else:
# SHA256 of empty string
payload_hash = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
canonical_request = (request.method + '\n' +
canonical_uri + '\n' +
canonical_querystring + '\n' +
canonical_headers + '\n' +
signed_headers + '\n' +
payload_hash)
_LOGGER.warning(canonical_request)
# Step 2. Create a string to sign
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = '%s/%s/%s/%s' % (datestamp, self.region, service, 'aws4_request')
string_to_sign = (algorithm + '\n' +
amzdate + '\n' +
credential_scope + '\n' +
hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())
signing_key = get_signature_key(self.credentials.get('SecretKey'), datestamp, self.region, service)
# Step 3. Calculate the signature
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
authorization_header = '%s Credential=%s/%s, SignedHeaders=%s, Signature=%s' % (
algorithm, self.credentials.get('AccessKeyId'), credential_scope, signed_headers, signature)
# Step 4. Add the signature to the request
request.headers.update({
'x-amz-date': amzdate,
'Authorization': authorization_header
})
def list_records(self, dataset):
""" Return the values of this dataset.
:param str dataset: The name of the dataset to request.
:return The requested dataset
:rtype: dict
"""
# Prepare the request
request = requests.Request(
method='GET',
params={
'maxResults': 1024,
},
url=self.url + '/identitypools/{identity_pool_id}/identities/{identity_id}/datasets/{dataset}/records'.format(
identity_pool_id=self.identity_pool_id,
identity_id=self.identity_id,
dataset=dataset
),
headers={
'x-amz-security-token': self.credentials.get('SessionToken'),
}).prepare()
# Sign the request
self._sign(request)
# Send the request
reply = self._session.send(request)
reply.raise_for_status()
result = json.loads(reply.text)
# Return the records
record = next(record for record in result.get('Records', []) if record.get('Key') == dataset)
value = json.loads(record.get('Value'))
return value, result.get('SyncSessionToken'), record.get('SyncCount')
def update_records(self, dataset, value, session_token, sync_count):
""" Return the values of this dataset.
:param str dataset: The name of the dataset to request.
:param any value: The value.
:param str session_token: The session token from the list_records call.
:param int sync_count: The last SyncCount value, so we refuse race conditions.
"""
# Prepare the request
request = requests.Request(
method='POST',
url=self.url + '/identitypools/{identity_pool_id}/identities/{identity_id}/datasets/{dataset}'.format(
identity_pool_id=self.identity_pool_id,
identity_id=self.identity_id,
dataset=dataset
),
headers={
'x-amz-security-token': self.credentials.get('SessionToken'),
},
json={
"SyncSessionToken": session_token,
"RecordPatches": [
{
"Key": dataset,
"Op": "replace",
"SyncCount": sync_count,
"Value": json.dumps(value),
}
]
}).prepare()
# Sign the request
self._sign(request)
# Send the request
reply = self._session.send(request)
reply.raise_for_status()

View File

@ -45,7 +45,7 @@ class Program:
""" Defines a Program. """ """ Defines a Program. """
def __init__(self, uuid=None, path=None, channel=None, title=None, description=None, aired=None, cover=None, background=None, seasons=None, episodes=None, def __init__(self, uuid=None, path=None, channel=None, title=None, description=None, aired=None, cover=None, background=None, seasons=None, episodes=None,
clips=None): clips=None, my_list=False):
""" """
:type uuid: str :type uuid: str
:type path: str :type path: str
@ -58,6 +58,7 @@ class Program:
:type seasons: list[Season] :type seasons: list[Season]
:type episodes: list[Episode] :type episodes: list[Episode]
:type clips: list[Episode] :type clips: list[Episode]
:type my_list: bool
""" """
self.uuid = uuid self.uuid = uuid
self.path = path self.path = path
@ -70,6 +71,7 @@ class Program:
self.seasons = seasons self.seasons = seasons
self.episodes = episodes self.episodes = episodes
self.clips = clips self.clips = clips
self.my_list = my_list
def __repr__(self): def __repr__(self):
return "%r" % self.__dict__ return "%r" % self.__dict__
@ -182,6 +184,7 @@ class ContentApi:
def get_programs(self, channel=None, cache=CACHE_AUTO): def get_programs(self, channel=None, cache=CACHE_AUTO):
""" Get a list of all programs of the specified channel. """ Get a list of all programs of the specified channel.
:type channel: str
:type cache: str :type cache: str
:rtype list[Program] :rtype list[Program]
""" """
@ -260,6 +263,31 @@ class ContentApi:
return program return program
def get_program_by_uuid(self, uuid, cache=CACHE_AUTO):
""" Get a Program object with the specified uuid.
:type uuid: str
:type cache: str
:rtype Program
"""
if not uuid:
return None
def update():
""" Fetch the program metadata """
# Fetch webpage
result = self._get_url(self.SITE_URL + '/api/program/%s' % uuid)
data = json.loads(result)
return data
# Fetch listing from cache or update if needed
data = self._handle_cache(key=['program', uuid], cache_mode=cache, update=update)
if not data:
return None
program = self._parse_program_data(data)
return program
def get_episode(self, path, cache=CACHE_AUTO): def get_episode(self, path, cache=CACHE_AUTO):
""" Get a Episode object from the specified page. """ Get a Episode object from the specified page.
:type path: str :type path: str

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" AUTH API """ """ Search API """
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
@ -8,17 +8,19 @@ import logging
import requests import requests
from resources.lib.viervijfzes.content import Program from resources.lib import kodiutils
from resources.lib.viervijfzes.content import Program, ContentApi, CACHE_ONLY
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class SearchApi: class SearchApi:
""" GoPlay Search API """ """ GoPlay Search API """
API_ENDPOINT = 'https://api.viervijfzes.be/search' API_ENDPOINT = 'https://api.goplay.be/search'
def __init__(self): def __init__(self):
""" Initialise object """ """ Initialise object """
self._api = ContentApi(None, cache_path=kodiutils.get_cache_path())
self._session = requests.session() self._session = requests.session()
def search(self, query): def search(self, query):
@ -33,26 +35,28 @@ class SearchApi:
self.API_ENDPOINT, self.API_ENDPOINT,
json={ json={
"query": query, "query": query,
"sites": ["vier", "vijf", "zes"],
"page": 0, "page": 0,
"mode": "byDate" "mode": "programs"
} }
) )
_LOGGER.debug(response.content)
if response.status_code != 200: response.raise_for_status()
raise Exception('Could not search')
data = json.loads(response.text) data = json.loads(response.text)
results = [] results = []
for hit in data['hits']['hits']: for hit in data['hits']['hits']:
if hit['_source']['bundle'] == 'program': if hit['_source']['bundle'] == 'program':
results.append(Program( path = hit['_source']['url'].split('/')[-1]
channel=hit['_source']['site'], program = self._api.get_program(path, cache=CACHE_ONLY)
path=hit['_source']['url'].split('/')[-1], if program:
title=hit['_source']['title'], results.append(program)
description=hit['_source']['intro'], else:
cover=hit['_source']['img'], results.append(Program(
)) path=path,
title=hit['_source']['title'],
description=hit['_source']['intro'],
cover=hit['_source']['img'],
))
return results return results

View File

@ -26,12 +26,12 @@ class TestAuth(unittest.TestCase):
auth.clear_tokens() auth.clear_tokens()
# We should get a token by logging in # We should get a token by logging in
token = auth.get_token() id_token = auth.get_token()
self.assertTrue(token) self.assertTrue(id_token)
# Test it a second time, it should go from memory now # Test it a second time, it should go from memory now
token = auth.get_token() id_token = auth.get_token()
self.assertTrue(token) self.assertTrue(id_token)
if __name__ == '__main__': if __name__ == '__main__':

45
tests/test_mylist.py Normal file
View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
""" Tests for My List """
# pylint: disable=missing-docstring,no-self-use
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import unittest
from resources.lib import kodiutils
from resources.lib.viervijfzes.auth import AuthApi
_LOGGER = logging.getLogger(__name__)
class TestMyList(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(TestMyList, self).__init__(*args, **kwargs)
@unittest.skipUnless(kodiutils.get_setting('username') and kodiutils.get_setting('password'), 'Skipping since we have no credentials.')
def test_mylist(self):
auth = AuthApi(kodiutils.get_setting('username'), kodiutils.get_setting('password'), kodiutils.get_tokens_path())
id_token = auth.get_token()
self.assertTrue(id_token)
dataset, _ = auth.get_dataset('myList')
self.assertTrue(dataset)
# Test disabled since it would cause locks due to all the CI tests changing this at the same time.
# # Python 2.7 doesn't support .timestamp(), and windows doesn't do '%s', so we need to calculate it ourself
# epoch = datetime(1970, 1, 1, tzinfo=dateutil.tz.gettz('UTC'))
# now = datetime.now(tz=dateutil.tz.gettz('UTC'))
# timestamp = str(int((now - epoch).total_seconds())) + '000'
# new_dataset = [
# {'id': '06e209f9-092e-421e-9499-58c62c292b98', 'timestamp': timestamp},
# {'id': 'da584be3-dea6-49c7-bfbd-c480d8096937', 'timestamp': timestamp}
# ]
#
# auth.put_dataset('myList', new_dataset, sync_info)
if __name__ == '__main__':
unittest.main()