plugin.video.viervijfzes/resources/lib/viervijfzes/aws/cognito_sync.py

194 lines
7.1 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
""" Amazon Cognito Sync implementation without external dependencies """
from __future__ import absolute_import, division, unicode_literals
import datetime
import hashlib
import hmac
import json
import logging
import requests
try: # Python 3
from urllib.parse import quote, urlparse
except ImportError: # Python 2
from urllib import quote
from urlparse import urlparse
_LOGGER = logging.getLogger(__name__)
class CognitoSync:
""" Amazon Cognito Sync """
def __init__(self, identity_pool_id, identity_id, credentials):
"""
See https://docs.aws.amazon.com/cognitosync/latest/APIReference/Welcome.html.
:param str identity_pool_id:
:param str identity_id:
:param dict credentials:
"""
self.identity_pool_id = identity_pool_id
self.identity_id = identity_id
self.credentials = credentials
self.region = self.identity_pool_id.split(":")[0]
self.url = "https://cognito-sync.%s.amazonaws.com" % self.region
self._session = requests.session()
def _sign(self, request, service='cognito-sync'):
""" Sign the request.
More info at https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html.
:param requests.PreparedRequest request: A prepared request that should be signed.
:param str service: The service where this request is going to.
"""
def sign(key, msg):
""" Sign this message. """
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def get_signature_key(key, date_stamp, region_name, service_name):
""" Generate a signature key. """
k_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
k_region = sign(k_date, region_name)
k_service = sign(k_region, service_name)
k_signing = sign(k_service, 'aws4_request')
return k_signing
# Parse the URL
url_parsed = urlparse(request.url)
# Create a date for headers and the credential string
now = datetime.datetime.utcnow()
amzdate = now.strftime('%Y%m%dT%H%M%SZ')
datestamp = now.strftime('%Y%m%d') # Date w/o time, used in credential scope
# Step 1. Create a canonical request
canonical_uri = quote(url_parsed.path)
canonical_querystring = url_parsed.query # TODO: sort when using multiple values
canonical_headers = ('host:' + url_parsed.netloc + '\n' +
'x-amz-date:' + amzdate + '\n')
signed_headers = 'host;x-amz-date'
if request.body:
payload_hash = hashlib.sha256(request.body).hexdigest()
else:
# SHA256 of empty string
payload_hash = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
canonical_request = (request.method + '\n' +
canonical_uri + '\n' +
canonical_querystring + '\n' +
canonical_headers + '\n' +
signed_headers + '\n' +
payload_hash)
# Step 2. Create a string to sign
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = '%s/%s/%s/%s' % (datestamp, self.region, service, 'aws4_request')
string_to_sign = (algorithm + '\n' +
amzdate + '\n' +
credential_scope + '\n' +
hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())
signing_key = get_signature_key(self.credentials.get('SecretKey'), datestamp, self.region, service)
# Step 3. Calculate the signature
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
authorization_header = '%s Credential=%s/%s, SignedHeaders=%s, Signature=%s' % (
algorithm, self.credentials.get('AccessKeyId'), credential_scope, signed_headers, signature)
# Step 4. Add the signature to the request
request.headers.update({
'x-amz-date': amzdate,
'Authorization': authorization_header
})
def list_records(self, dataset, key):
""" Return the values of this dataset.
:param str dataset: The name of the dataset to request.
:param str key: The name of the key to request.
:return The requested dataset
:rtype: dict
"""
# Prepare the request
request = requests.Request(
method='GET',
params={
'maxResults': 1024,
},
url=self.url + '/identitypools/{identity_pool_id}/identities/{identity_id}/datasets/{dataset}/records'.format(
identity_pool_id=self.identity_pool_id,
identity_id=self.identity_id,
dataset=dataset
),
headers={
'x-amz-security-token': self.credentials.get('SessionToken'),
}).prepare()
# Sign the request
self._sign(request)
# Send the request
reply = self._session.send(request)
reply.raise_for_status()
result = json.loads(reply.text)
_LOGGER.debug("Got results: %s", result.get('Records'))
# Return the records
try:
record = next(record for record in result.get('Records', []) if record.get('Key') == key)
value = json.loads(record.get('Value'))
sync_count = record.get('SyncCount')
except StopIteration:
value = None
sync_count = 0
return value, result.get('SyncSessionToken'), sync_count
def update_records(self, dataset, key, value, session_token, sync_count):
""" Return the values of this dataset.
:param str dataset: The name of the dataset to request.
:param any value: The value.
:param str session_token: The session token from the list_records call.
:param int sync_count: The last SyncCount value, so we refuse race conditions.
"""
# Prepare the request
request = requests.Request(
method='POST',
url=self.url + '/identitypools/{identity_pool_id}/identities/{identity_id}/datasets/{dataset}'.format(
identity_pool_id=self.identity_pool_id,
identity_id=self.identity_id,
dataset=dataset
),
headers={
'x-amz-security-token': self.credentials.get('SessionToken'),
},
json={
"SyncSessionToken": session_token,
"RecordPatches": [
{
"Key": key,
"Op": "replace",
"SyncCount": sync_count,
"Value": json.dumps(value),
}
]
}).prepare()
# Sign the request
self._sign(request)
# Send the request
reply = self._session.send(request)
reply.raise_for_status()