Source code for skpy.conn

import os
import re
import functools
from datetime import datetime, timedelta
import time
from types import MethodType
import hashlib
import base64

from bs4 import BeautifulSoup
import requests

from .core import SkypeObj, SkypeEnum, SkypeApiException, SkypeAuthException

if os.getenv("SKPY_DEBUG_HTTP"):
    from pprint import pformat


[docs]class SkypeConnection(SkypeObj): """ The main connection class -- handles all requests to API resources. To authenticate with a username and password, use :meth:`setUserPwd` to store the credentials. Token files can be specified with :meth:`setTokenFile`. Attributes: tokens (dict): Token strings used to connect to various Skype APIs. Uses keys ``skype`` and ``reg``. tokenExpiry (dict): Map from token key to :class:`datetime <datetime.datetime>` of expiry. tokenFile (str): Path to file holding token data for the current session. msgsHost (str): Derived API base URL during registration token retrieval. sess (requests.Session): Shared session used for all API requests. endpoints (dict): Container of :class:`SkypeEndpoint` instances for the current session. connected (bool): Whether the connection instance is ready to make API calls. guest (bool): Whether the connected account only has guest privileges. """ Auth = SkypeEnum("SkypeConnection.Auth", ("SkypeToken", "Authorize", "RegToken")) """ :class:`.SkypeEnum`: Authentication types for different API calls. Attributes: Auth.SkypeToken: Add an ``X-SkypeToken`` header with the Skype token. Auth.Authorize: Add an ``Authorization`` header with the Skype token. Auth.RegToken: Add a ``RegistrationToken`` header with the registration token. """
[docs] @staticmethod def handle(*codes, **kwargs): """ Method decorator: if a given status code is received, re-authenticate and try again. Args: codes (int list): status codes to respond to regToken (bool): whether to try retrieving a new token on error Returns: method: decorator function, ready to apply to other methods """ regToken = kwargs.get("regToken", False) subscribe = kwargs.get("subscribe") def decorator(fn): @functools.wraps(fn) def wrapper(self, *args, **kwargs): try: return fn(self, *args, **kwargs) except SkypeApiException as e: if isinstance(e.args[1], requests.Response) and e.args[1].status_code in codes: conn = self if isinstance(self, SkypeConnection) else self.conn if regToken: conn.getRegToken() if subscribe: conn.endpoints[subscribe].subscribe() return fn(self, *args, **kwargs) raise return wrapper return decorator
[docs] @classmethod def externalCall(cls, method, url, codes=(200, 201, 204, 207), **kwargs): """ Make a public API call without a connected :class:`.Skype` instance. The obvious implications are that no authenticated calls are possible, though this allows accessing some public APIs such as join URL lookups. Args: method (str): HTTP request method url (str): full URL to connect to codes (int list): expected HTTP response codes for success kwargs (dict): any extra parameters to pass to :func:`requests.request` Returns: requests.Response: response object provided by :mod:`requests` Raises: .SkypeAuthException: if an authentication rate limit is reached .SkypeApiException: if a successful status code is not received """ if os.getenv("SKPY_DEBUG_HTTP"): print("<= [{0}] {1} {2}".format(datetime.now().strftime("%d/%m %H:%M:%S"), method, url)) print(pformat(kwargs)) resp = cls.extSess.request(method, url, **kwargs) if os.getenv("SKPY_DEBUG_HTTP"): print("=> [{0}] {1}".format(datetime.now().strftime("%d/%m %H:%M:%S"), resp.status_code)) print(pformat(dict(resp.headers))) try: print(pformat(resp.json())) except ValueError: print(resp.text) if resp.status_code not in codes: raise SkypeApiException("{0} response from {1} {2}".format(resp.status_code, method, url), resp) return resp
API_LOGIN = "https://login.skype.com/login" API_MSACC = "https://login.live.com" API_USER = "https://api.skype.com" API_PROFILE = "https://profile.skype.com/profile/v1" API_OPTIONS = "https://options.skype.com/options/v1/users/self/options" API_JOIN = "https://join.skype.com" API_JOIN_CREATE = "https://api.join.skype.com/v1" API_BOT = "https://api.aps.skype.com/v1" API_FLAGS = "https://flagsapi.skype.com/flags/v1" API_ENTITLEMENT = "https://consumer.entitlement.skype.com" API_TRANSLATE = "https://dev.microsofttranslator.com/api" API_ASM = "https://api.asm.skype.com/v1/objects" API_ASM_LOCAL = "https://{0}1-api.asm.skype.com/v1/objects" API_URL = "https://urlp.asm.skype.com/v1/url/info" API_CONTACTS = "https://contacts.skype.com/contacts/v2" API_MSGSHOST = "https://client-s.gateway.messenger.live.com/v1" API_DIRECTORY = "https://skypegraph.skype.com/search/v1.1/namesearch/swx/" # Version doesn't seem to be important, at least not for what we need. API_CONFIG = "https://a.config.skype.com/config/v1" attrs = ("userId", "tokenFile", "connected", "guest") extSess = requests.session()
[docs] def __init__(self): """ Create a new, unconnected instance. """ self.userId = None self.tokens = {} self.tokenExpiry = {} self.tokenFile = None self.msgsHost = self.API_MSGSHOST self.sess = requests.Session() self.endpoints = {"self": SkypeEndpoint(self, "SELF")} self.syncStates = {}
@property def connected(self): return "skype" in self.tokenExpiry and datetime.now() <= self.tokenExpiry["skype"] \ and "reg" in self.tokenExpiry and datetime.now() <= self.tokenExpiry["reg"] @property def guest(self): return self.userId.startswith("guest:") if self.userId else None
[docs] def __call__(self, method, url, codes=(200, 201, 202, 204, 207), auth=None, headers=None, **kwargs): """ Make an API call. Most parameters are passed directly to :mod:`requests`. Set codes to a list of valid HTTP response codes -- an exception is raised if the response does not match. If authentication is required, set ``auth`` to one of the :class:`Auth` constants. Args: method (str): HTTP request method url (str): full URL to connect to codes (int list): expected HTTP response codes for success auth (Auth): authentication type to be included headers (dict): additional headers to be included kwargs (dict): any extra parameters to pass to :func:`requests.request` Returns: requests.Response: response object provided by :mod:`requests` Raises: .SkypeAuthException: if an authentication rate limit is reached .SkypeApiException: if a successful status code is not received """ self.verifyToken(auth) if not headers: headers = {} if auth == self.Auth.SkypeToken: headers["X-SkypeToken"] = self.tokens["skype"] elif auth == self.Auth.Authorize: headers["Authorization"] = "skype_token {0}".format(self.tokens["skype"]) elif auth == self.Auth.RegToken: headers["RegistrationToken"] = self.tokens["reg"] if os.getenv("SKPY_DEBUG_HTTP"): print("<= [{0}] {1} {2}".format(datetime.now().strftime("%d/%m %H:%M:%S"), method, url)) print(pformat(kwargs)) resp = self.sess.request(method, url, headers=headers, **kwargs) if os.getenv("SKPY_DEBUG_HTTP"): print("=> [{0}] {1}".format(datetime.now().strftime("%d/%m %H:%M:%S"), resp.status_code)) print(pformat(dict(resp.headers))) try: print(pformat(resp.json())) except ValueError: print(resp.text) if resp.status_code not in codes: if resp.status_code == 429: raise SkypeAuthException("Auth rate limit exceeded", resp) raise SkypeApiException("{0} response from {1} {2}".format(resp.status_code, method, url), resp) return resp
[docs] def syncStateCall(self, method, url, params={}, **kwargs): """ Follow and track sync state URLs provided by an API endpoint, in order to implicitly handle pagination. In the first call, ``url`` and ``params`` are used as-is. If a ``syncState`` endpoint is provided in the response, subsequent calls go to the latest URL instead. Args: method (str): HTTP request method url (str): full URL to connect to params (dict): query parameters to include in the URL kwargs (dict): any extra parameters to pass to :meth:`__call__` """ try: states = self.syncStates[(method, url)] except KeyError: states = self.syncStates[(method, url)] = [] if states: # We have a state link, use it to replace the URL and query string. url = states[-1] params = {} resp = self(method, url, params=params, **kwargs) try: json = resp.json() except ValueError: # Don't do anything if not a JSON response. pass else: # If a state link exists in the response, store it for later. state = json.get("_metadata", {}).get("syncState") if state: states.append(state) return resp
[docs] def setUserPwd(self, user, pwd): """ Replace the stub :meth:`getSkypeToken` method with one that connects via the Microsoft account flow using the given credentials. Avoids storing the account password in an accessible way. Args: user (str): username or email address of the connecting account pwd (str): password of the connecting account """ def getSkypeToken(self): self.liveLogin(user, pwd) self.getSkypeToken = MethodType(getSkypeToken, self)
[docs] def setTokenFile(self, path): """ Enable reading and writing session tokens to a file at the given location. Args: path (str): path to file used for token storage """ self.tokenFile = path
[docs] def readToken(self): """ Attempt to re-establish a connection using previously acquired tokens. If the Skype token is valid but the registration token is invalid, a new endpoint will be registered. Raises: .SkypeAuthException: if the token file cannot be used to authenticate """ if not self.tokenFile: raise SkypeAuthException("No token file specified") try: with open(self.tokenFile, "r") as f: lines = f.read().splitlines() except OSError: raise SkypeAuthException("Token file doesn't exist or not readable") try: user, skypeToken, skypeExpiry, regToken, regExpiry, msgsHost = lines skypeExpiry = datetime.fromtimestamp(int(skypeExpiry)) regExpiry = datetime.fromtimestamp(int(regExpiry)) except ValueError: raise SkypeAuthException("Token file is malformed") if datetime.now() >= skypeExpiry: raise SkypeAuthException("Token file has expired") self.userId = user self.tokens["skype"] = skypeToken self.tokenExpiry["skype"] = skypeExpiry if datetime.now() < regExpiry: self.tokens["reg"] = regToken self.tokenExpiry["reg"] = regExpiry self.msgsHost = msgsHost else: self.getRegToken()
[docs] def writeToken(self): """ Store details of the current connection in the named file. This can be used by :meth:`readToken` to re-authenticate at a later time. """ # Write token file privately. with os.fdopen(os.open(self.tokenFile, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f: # When opening files via os, truncation must be done manually. f.truncate() f.write(self.userId + "\n") f.write(self.tokens["skype"] + "\n") f.write(str(int(time.mktime(self.tokenExpiry["skype"].timetuple()))) + "\n") f.write(self.tokens["reg"] + "\n") f.write(str(int(time.mktime(self.tokenExpiry["reg"].timetuple()))) + "\n") f.write(self.msgsHost + "\n")
[docs] def verifyToken(self, auth): """ Ensure the authentication token for the given auth method is still valid. Args: auth (Auth): authentication type to check Raises: .SkypeAuthException: if Skype auth is required, and the current token has expired and can't be renewed """ if auth in (self.Auth.SkypeToken, self.Auth.Authorize): if "skype" not in self.tokenExpiry or datetime.now() >= self.tokenExpiry["skype"]: if not hasattr(self, "getSkypeToken"): raise SkypeAuthException("Skype token expired, and no password specified") self.getSkypeToken() elif auth == self.Auth.RegToken: if "reg" not in self.tokenExpiry or datetime.now() >= self.tokenExpiry["reg"]: self.getRegToken()
[docs] def liveLogin(self, user, pwd): """ Obtain connection parameters from the Microsoft account login page, and perform a login with the given email address or Skype username, and its password. This emulates a login to Skype for Web on ``login.live.com``. .. note:: Microsoft accounts with two-factor authentication enabled are not supported, and will cause a :class:`.SkypeAuthException` to be raised. See the exception definitions for other possible causes. Args: user (str): username or email address of the connecting account pwd (str): password of the connecting account Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ self.tokens["skype"], self.tokenExpiry["skype"] = SkypeLiveAuthProvider(self).auth(user, pwd) self.getUserId() self.getRegToken()
[docs] def guestLogin(self, url, name): """ Connect to Skype as a guest, joining a given conversation. In this state, some APIs (such as contacts) will return 401 status codes. A guest can only communicate with the conversation they originally joined. Args: url (str): public join URL for conversation, or identifier from it name (str): display name as shown to other participants Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ self.tokens["skype"], self.tokenExpiry["skype"] = SkypeGuestAuthProvider(self).auth(url, name) self.getUserId() self.getRegToken()
[docs] def getSkypeToken(self): """ A wrapper for :meth:`liveLogin` that applies the previously given username and password. Raises: .SkypeAuthException: if credentials were never provided """ raise SkypeAuthException("No username or password provided, and no valid token file")
[docs] def refreshSkypeToken(self): """ Take the existing Skype token and refresh it, to extend the expiry time without other credentials. Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ self.tokens["skype"], self.tokenExpiry["skype"] = SkypeRefreshAuthProvider(self).auth(self.tokens["skype"]) self.getRegToken()
[docs] def getUserId(self): """ Ask Skype for the authenticated user's identifier, and store it on the connection object. """ self.userId = self("GET", "{0}/users/self/profile".format(self.API_USER), auth=self.Auth.SkypeToken).json().get("username")
[docs] def getRegToken(self): """ Acquire a new registration token. Once successful, all tokens and expiry times are written to the token file (if specified on initialisation). """ self.verifyToken(self.Auth.SkypeToken) token, expiry, msgsHost, endpoint = SkypeRegistrationTokenProvider(self).auth(self.tokens["skype"]) self.tokens["reg"] = token self.tokenExpiry["reg"] = expiry self.msgsHost = msgsHost if endpoint: endpoint.config() self.endpoints["main"] = endpoint self.syncEndpoints() if self.tokenFile: self.writeToken()
[docs] def syncEndpoints(self): """ Retrieve all current endpoints for the connected user. """ self.endpoints["all"] = [] for json in self("GET", "{0}/users/ME/presenceDocs/messagingService".format(self.msgsHost), params={"view": "expanded"}, auth=self.Auth.RegToken).json().get("endpointPresenceDocs", []): id = json.get("link", "").split("/")[7] self.endpoints["all"].append(SkypeEndpoint(self, id))
[docs]class SkypeAuthProvider(SkypeObj): """ A base class for authentication providers. Subclasses should implement the :meth:`auth` method. """ def __init__(self, conn): self.conn = conn
[docs] def auth(self, *args, **kwargs): """ Authenticate a user, given some form of identification. Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login forms can't be processed """ raise NotImplementedError
[docs]class SkypeAPIAuthProvider(SkypeAuthProvider): """ An authentication provider that connects via the Skype API. Only compatible with Skype usernames. """
[docs] def auth(self, user, pwd): """ Perform a login with the given Skype username and its password. This emulates a login to Skype for Web on ``api.skype.com``. Args: user (str): username of the connecting account pwd (str): password of the connecting account Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ # Wrap up the credentials ready to send. pwdHash = base64.b64encode(hashlib.md5((user + "\nskyper\n" + pwd).encode("utf-8")).digest()).decode("utf-8") json = self.conn("POST", "{0}/login/skypetoken".format(SkypeConnection.API_USER), json={"username": user, "passwordHash": pwdHash, "scopes": "client"}).json() if "skypetoken" not in json: raise SkypeAuthException("Couldn't retrieve Skype token from response") expiry = None if "expiresIn" in json: expiry = datetime.fromtimestamp(int(time.time()) + int(json["expiresIn"])) return json["skypetoken"], expiry
[docs]class SkypeLiveAuthProvider(SkypeAuthProvider): """ An authentication provider that connects via Microsoft account authentication. """
[docs] def checkUser(self, user): """ Query a username or email address to see if a corresponding Microsoft account exists. Args: user (str): username or email address of an account Returns: bool: whether the account exists """ return not self.conn("POST", "{0}/GetCredentialType.srf".format(SkypeConnection.API_MSACC), json={"username": user}).json().get("IfExistsResult")
[docs] def auth(self, user, pwd): """ Obtain connection parameters from the Microsoft account login page, and perform a login with the given email address or Skype username, and its password. This emulates a login to Skype for Web on ``login.live.com``. .. note:: Microsoft accounts with two-factor authentication enabled are not supported, and will cause a :class:`.SkypeAuthException` to be raised. See the exception definitions for other possible causes. Args: user (str): username or email address of the connecting account pwd (str): password of the connecting account Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ # Do the authentication dance. params = self.getParams() t = self.sendCreds(user, pwd, params) return self.getToken(t)
def getParams(self): # First, start a Microsoft account login from Skype, which will redirect to login.live.com. loginResp = self.conn("GET", "{0}/oauth/microsoft".format(SkypeConnection.API_LOGIN), params={"client_id": "578134", "redirect_uri": "https://web.skype.com"}) # This is inside some embedded JavaScript, so can't easily parse with BeautifulSoup. ppftReg = re.search(r"""<input.*?name="PPFT".*?value="(.*?)""" + "\"", loginResp.text) if not ppftReg: raise SkypeApiException("Couldn't retrieve PPFT from login form", loginResp) if "MSPRequ" not in loginResp.cookies or "MSPOK" not in loginResp.cookies: raise SkypeApiException("Couldn't retrieve MSPRequ/MSPOK cookies", loginResp) return {"MSPRequ": loginResp.cookies.get("MSPRequ"), "MSPOK": loginResp.cookies.get("MSPOK"), "PPFT": ppftReg.group(1)} def sendCreds(self, user, pwd, params): # Now pass the login credentials over. loginResp = self.conn("POST", "{0}/ppsecure/post.srf".format(SkypeConnection.API_MSACC), params={"wa": "wsignin1.0", "wp": "MBI_SSL", "wreply": "https://lw.skype.com/login/oauth/proxy?client_id=578134&site_name=" "lw.skype.com&redirect_uri=https%3A%2F%2Fweb.skype.com%2F"}, cookies={"MSPRequ": params["MSPRequ"], "MSPOK": params["MSPOK"], "CkTst": str(int(time.time() * 1000))}, data={"login": user, "passwd": pwd, "PPFT": params["PPFT"]}) tField = BeautifulSoup(loginResp.text, "html.parser").find(id="t") if tField is None: err = re.search(r"sErrTxt:'([^'\\]*(\\.[^'\\]*)*)'", loginResp.text) errMsg = "Couldn't retrieve t field from login response" if err: errMsg = re.sub(r"<.*?>", "", err.group(1)).replace("\\'", "'").replace("\\\\", "\\") raise SkypeAuthException(errMsg, loginResp) return tField.get("value") def getToken(self, t): # Now exchange the 't' value for a Skype token. loginResp = self.conn("POST", "{0}/microsoft".format(SkypeConnection.API_LOGIN), params={"client_id": "578134", "redirect_uri": "https://web.skype.com"}, data={"t": t, "client_id": "578134", "oauthPartner": "999", "site_name": "lw.skype.com", "redirect_uri": "https://web.skype.com"}) loginPage = BeautifulSoup(loginResp.text, "html.parser") # Collect the Skype token, and expiry if present. tokenField = loginPage.find("input", {"name": "skypetoken"}) if not tokenField: raise SkypeApiException("Couldn't retrieve Skype token from login response", loginResp) token = tokenField.get("value") expiryField = loginPage.find("input", {"name": "expires_in"}) expiry = None if expiryField: expiry = datetime.fromtimestamp(int(time.time()) + int(expiryField.get("value"))) return (token, expiry)
[docs]class SkypeGuestAuthProvider(SkypeAuthProvider): """ An authentication provider that connects and joins a public conversation via a join URL. """
[docs] def auth(self, url, name): """ Connect to Skype as a guest, joining a given conversation. In this state, some APIs (such as contacts) will return 401 status codes. A guest can only communicate with the conversation they originally joined. Args: url (str): public join URL for conversation, or identifier from it name (str): display name as shown to other participants Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ urlId = url.split("/")[-1] # Pretend to be Chrome on Windows (required to avoid "unsupported device" messages). agent = "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " \ "Chrome/33.0.1750.117 Safari/537.36" cookies = self.conn("GET", "{0}/{1}".format(SkypeConnection.API_JOIN, urlId), headers={"User-Agent": agent}).cookies ids = self.conn("POST", "{0}/api/v2/conversation/".format(SkypeConnection.API_JOIN), json={"shortId": urlId, "type": "wl"}).json() token = self.conn("POST", "{0}/api/v1/users/guests".format(SkypeConnection.API_JOIN), headers={"csrf_token": cookies.get("csrf_token"), "X-Skype-Request-Id": cookies.get("launcher_session_id")}, json={"flowId": cookies.get("launcher_session_id"), "shortId": urlId, "longId": ids.get("Long"), "threadId": ids.get("Resource"), "name": name}).json().get("skypetoken") # Assume the token lasts 24 hours, as a guest account only lasts that long anyway. expiry = datetime.now() + timedelta(days=1) return token, expiry
[docs]class SkypeRefreshAuthProvider(SkypeAuthProvider): """ An authentication provider that connects via the Skype API. Only compatible with Skype usernames. """
[docs] def auth(self, token): """ Take an existing Skype token and refresh it, to extend the expiry time without other credentials. Args: token (str): existing Skype token Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ t = self.sendToken(token) return self.getToken(t)
def sendToken(self, token): # Send the existing token over. loginResp = self.conn("GET", "{0}/login".format(SkypeConnection.API_LOGIN), params={"client_id": "578134", "redirect_uri": "https://web.skype.com"}, cookies={"refresh-token": token}) tField = BeautifulSoup(loginResp.text, "html.parser").find(id="t") if tField is None: err = re.search(r"sErrTxt:'([^'\\]*(\\.[^'\\]*)*)'", loginResp.text) errMsg = "Couldn't retrieve t field from login response" if err: errMsg = re.sub(r"<.*?>", "", err.group(1)).replace("\\'", "'").replace("\\\\", "\\") raise SkypeAuthException(errMsg, loginResp) return tField.get("value") def getToken(self, t): # Now exchange the 't' value for a Skype token. loginResp = self.conn("POST", "{0}/microsoft".format(SkypeConnection.API_LOGIN), params={"client_id": "578134", "redirect_uri": "https://web.skype.com"}, data={"t": t, "client_id": "578134", "oauthPartner": "999", "site_name": "lw.skype.com", "redirect_uri": "https://web.skype.com"}) loginPage = BeautifulSoup(loginResp.text, "html.parser") # Collect the Skype token, and expiry if present. tokenField = loginPage.find("input", {"name": "skypetoken"}) if not tokenField: raise SkypeApiException("Couldn't retrieve Skype token from login response", loginResp) token = tokenField.get("value") expiryField = loginPage.find("input", {"name": "expires_in"}) expiry = None if expiryField: expiry = datetime.fromtimestamp(int(time.time()) + int(expiryField.get("value"))) return (token, expiry)
[docs]class SkypeRegistrationTokenProvider(SkypeAuthProvider): """ An authentication provider that handles the handshake for a registration token. """
[docs] def auth(self, skypeToken): """ Request a new registration token using a current Skype token. Args: skypeToken (str): existing Skype token Returns: (str, datetime.datetime, str, SkypeEndpoint) tuple: registration token, associated expiry if known, resulting endpoint hostname, endpoint if provided Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ token = expiry = endpoint = None msgsHost = SkypeConnection.API_MSGSHOST while not token: secs = int(time.time()) hash = self.getMac256Hash(str(secs)) headers = {"LockAndKey": "appId=msmsgs@msnmsgr.com; time={0}; lockAndKeyResponse={1}".format(secs, hash), "Authentication": "skypetoken=" + skypeToken, "BehaviorOverride": "redirectAs404"} endpointResp = self.conn("POST", "{0}/users/ME/endpoints".format(msgsHost), codes=(200, 201, 404), headers=headers, json={"endpointFeatures": "Agent"}) regTokenHead = endpointResp.headers.get("Set-RegistrationToken") locHead = endpointResp.headers.get("Location") if locHead: locParts = re.search(r"(https://[^/]+/v1)/users/ME/endpoints(/(%7B[a-z0-9\-]+%7D))?", locHead).groups() if locParts[2]: endpoint = SkypeEndpoint(self.conn, locParts[2].replace("%7B", "{").replace("%7D", "}")) if not locParts[0] == msgsHost: # Skype is requiring the use of a different hostname. msgsHost = locHead.rsplit("/", 4 if locParts[2] else 3)[0] # Don't accept the token if present, we need to re-register first. continue if regTokenHead: token = re.search(r"(registrationToken=[a-z0-9\+/=]+)", regTokenHead, re.I).group(1) regExpiry = re.search(r"expires=(\d+)", regTokenHead).group(1) expiry = datetime.fromtimestamp(int(regExpiry)) regEndMatch = re.search(r"endpointId=({[a-z0-9\-]+})", regTokenHead) if regEndMatch: endpoint = SkypeEndpoint(self.conn, regEndMatch.group(1)) if not endpoint and endpointResp.status_code == 200 and endpointResp.json(): # Use the most recent endpoint listed in the JSON response. endpoint = SkypeEndpoint(self.conn, endpointResp.json()[0]["id"]) return token, expiry, msgsHost, endpoint
[docs] @staticmethod def getMac256Hash(challenge, appId="msmsgs@msnmsgr.com", key="Q1P7W2E4J9R8U3S5"): """ Generate the lock-and-key response, needed to acquire registration tokens. """ clearText = challenge + appId clearText += "0" * (8 - len(clearText) % 8) def int32ToHexString(n): hexChars = "0123456789abcdef" hexString = "" for i in range(4): hexString += hexChars[(n >> (i * 8 + 4)) & 15] hexString += hexChars[(n >> (i * 8)) & 15] return hexString def int64Xor(a, b): sA = "{0:b}".format(a) sB = "{0:b}".format(b) sC = "" sD = "" diff = abs(len(sA) - len(sB)) for i in range(diff): sD += "0" if len(sA) < len(sB): sD += sA sA = sD elif len(sB) < len(sA): sD += sB sB = sD for i in range(len(sA)): sC += "0" if sA[i] == sB[i] else "1" return int(sC, 2) def cS64(pdwData, pInHash): MODULUS = 2147483647 CS64_a = pInHash[0] & MODULUS CS64_b = pInHash[1] & MODULUS CS64_c = pInHash[2] & MODULUS CS64_d = pInHash[3] & MODULUS CS64_e = 242854337 pos = 0 qwDatum = 0 qwMAC = 0 qwSum = 0 for i in range(len(pdwData) // 2): qwDatum = int(pdwData[pos]) pos += 1 qwDatum *= CS64_e qwDatum = qwDatum % MODULUS qwMAC += qwDatum qwMAC *= CS64_a qwMAC += CS64_b qwMAC = qwMAC % MODULUS qwSum += qwMAC qwMAC += int(pdwData[pos]) pos += 1 qwMAC *= CS64_c qwMAC += CS64_d qwMAC = qwMAC % MODULUS qwSum += qwMAC qwMAC += CS64_b qwMAC = qwMAC % MODULUS qwSum += CS64_d qwSum = qwSum % MODULUS return [qwMAC, qwSum] cchClearText = len(clearText) // 4 pClearText = [] for i in range(cchClearText): pClearText = pClearText[:i] + [0] + pClearText[i:] for pos in range(4): pClearText[i] += ord(clearText[4 * i + pos]) * (256 ** pos) sha256Hash = [0, 0, 0, 0] hash = hashlib.sha256((challenge + key).encode("utf-8")).hexdigest().upper() for i in range(len(sha256Hash)): sha256Hash[i] = 0 for pos in range(4): dpos = 8 * i + pos * 2 sha256Hash[i] += int(hash[dpos:dpos + 2], 16) * (256 ** pos) macHash = cS64(pClearText, sha256Hash) macParts = [macHash[0], macHash[1], macHash[0], macHash[1]] return "".join(map(int32ToHexString, map(int64Xor, sha256Hash, macParts)))
[docs]class SkypeEndpoint(SkypeObj): """ An endpoint represents a single point of presence within Skype. Typically, a user with multiple devices would have one endpoint per device (desktop, laptop, mobile and so on). Endpoints are time-sensitive -- they lapse after a short time unless kept alive (by :meth:`ping` or otherwise). """ attrs = ("id",)
[docs] def __init__(self, conn, id): """ Create a new instance based on a newly-created endpoint identifier. Args: conn (SkypeConnection): parent connection instance id (str): endpoint identifier as generated by the API """ super(SkypeEndpoint, self).__init__() self.conn = conn self.id = id self.subscribed = False
[docs] def config(self, name="skype"): """ Configure this endpoint to allow setting presence. Args: name (str): display name for this endpoint """ self.conn("PUT", "{0}/users/ME/endpoints/{1}/presenceDocs/messagingService" .format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken, json={"id": "messagingService", "type": "EndpointPresenceDoc", "selfLink": "uri", "privateInfo": {"epname": name}, "publicInfo": {"capabilities": "", "type": 1, "skypeNameVersion": "skype.com", "nodeInfo": "xx", "version": "908/1.30.0.128"}})
[docs] def ping(self, timeout=12): """ Send a keep-alive request for the endpoint. Args: timeout (int): maximum amount of time for the endpoint to stay active """ self.conn("POST", "{0}/users/ME/endpoints/{1}/active".format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken, json={"timeout": timeout})
[docs] def subscribe(self): """ Subscribe to contact and conversation events. These are accessible through :meth:`getEvents`. """ self.conn("POST", "{0}/users/ME/endpoints/{1}/subscriptions".format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken, json={"interestedResources": ["/v1/threads/ALL", "/v1/users/ME/contacts/ALL", "/v1/users/ME/conversations/ALL/messages", "/v1/users/ME/conversations/ALL/properties"], "template": "raw", "channelType": "httpLongPoll"}) self.subscribed = True
[docs] def getEvents(self): """ Retrieve a list of events since the last poll. Multiple calls may be needed to retrieve all events. If no events occur, the API will block for up to 30 seconds, after which an empty list is returned. If any event occurs whilst blocked, it is returned immediately. Returns: :class:`.SkypeEvent` list: list of events, possibly empty """ if not self.subscribed: self.subscribe() return self.conn("POST", "{0}/users/ME/endpoints/{1}/subscriptions/0/poll".format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken).json().get("eventMessages", [])