Source code for skpy.conn

import base64
import functools
import hashlib
import os
import re
import time
from datetime import datetime, timedelta
from pprint import pformat
from types import MethodType
from xml.etree import ElementTree

import requests
from bs4 import BeautifulSoup

from .core import SkypeApiException, SkypeAuthException, SkypeEnum, SkypeObj, SkypeRateLimitException, \
                  SkypeTokenException


[docs]class SkypeConnection(SkypeObj): """ The main connection class -- handles all requests to API resources. To authenticate with a username and password, use :meth:`setUserPwd` to store the credentials. Token files can be specified with :meth:`setTokenFile`. Attributes: tokens (dict): Token strings used to connect to various Skype APIs. Uses keys ``skype`` and ``reg``. tokenExpiry (dict): Map from token key to :class:`datetime <datetime.datetime>` of expiry. tokenFile (str): Path to file holding token data for the current session. msgsHost (str): Derived API base URL during registration token retrieval. sess (requests.Session): Shared session used for all API requests. endpoints (dict): Container of :class:`SkypeEndpoint` instances for the current session. connected (bool): Whether the connection instance is ready to make API calls. guest (bool): Whether the connected account only has guest privileges. """ Auth = SkypeEnum("SkypeConnection.Auth", ("SkypeToken", "Authorize", "RegToken")) """ :class:`.SkypeEnum`: Authentication types for different API calls. Attributes: Auth.SkypeToken: Add an ``X-SkypeToken`` header with the Skype token. Auth.Authorize: Add an ``Authorization`` header with the Skype token. Auth.RegToken: Add a ``RegistrationToken`` header with the registration token. """
[docs] @staticmethod def handle(*codes, **kwargs): """ Method decorator: if a given status code is received, re-authenticate and try again. Args: codes (int list): status codes to respond to regToken (bool): whether to try retrieving a new token on error Returns: method: decorator function, ready to apply to other methods """ regToken = kwargs.get("regToken", False) subscribe = kwargs.get("subscribe") def decorator(fn): @functools.wraps(fn) def wrapper(self, *args, **kwargs): try: return fn(self, *args, **kwargs) except SkypeApiException as e: if isinstance(e.args[1], requests.Response) and e.args[1].status_code in codes: conn = self if isinstance(self, SkypeConnection) else self.conn if regToken: conn.getRegToken() if subscribe: conn.endpoints[subscribe].subscribe() return fn(self, *args, **kwargs) raise return wrapper return decorator
[docs] @classmethod def externalCall(cls, method, url, codes=(200, 201, 204, 207), **kwargs): """ Make a public API call without a connected :class:`.Skype` instance. The obvious implications are that no authenticated calls are possible, though this allows accessing some public APIs such as join URL lookups. Args: method (str): HTTP request method url (str): full URL to connect to codes (int list): expected HTTP response codes for success kwargs (dict): any extra parameters to pass to :func:`requests.request` Returns: requests.Response: response object provided by :mod:`requests` Raises: .SkypeAuthException: if an authentication rate limit is reached .SkypeApiException: if a successful status code is not received """ if os.getenv("SKPY_DEBUG_HTTP"): print("<= [{0}] {1} {2}".format(datetime.now().strftime("%d/%m %H:%M:%S"), method, url)) print(pformat(kwargs)) resp = cls.extSess.request(method, url, **kwargs) if os.getenv("SKPY_DEBUG_HTTP"): print("=> [{0}] {1}".format(datetime.now().strftime("%d/%m %H:%M:%S"), resp.status_code)) print(pformat(dict(resp.headers))) try: print(pformat(resp.json())) except ValueError: print(resp.text) if resp.status_code not in codes: raise SkypeApiException("{0} response from {1} {2}".format(resp.status_code, method, url), resp) return resp
API_LOGIN = "https://login.skype.com/login" API_MSACC = "https://login.live.com" API_EDGE = "https://edge.skype.com/rps/v1/rps/skypetoken" API_USER = "https://api.skype.com" API_AVATAR = "https://avatar.skype.com" API_PROFILE = "https://profile.skype.com/profile/v1" API_OPTIONS = "https://options.skype.com/options/v1/users/self/options" API_JOIN = "https://join.skype.com" API_JOIN_CREATE = "https://api.join.skype.com/v1" API_BOT = "https://api.aps.skype.com/v1" API_FLAGS = "https://flagsapi.skype.com/flags/v1" API_ENTITLEMENT = "https://consumer.entitlement.skype.com" API_TRANSLATE = "https://dev.microsofttranslator.com/api" API_ASM = "https://api.asm.skype.com/v1/objects" API_ASM_LOCAL = "https://{0}1-api.asm.skype.com/v1/objects" API_URL = "https://urlp.asm.skype.com/v1/url/info" API_CONTACTS = "https://contacts.skype.com/contacts/v2" API_MSGSHOST = "https://client-s.gateway.messenger.live.com/v1" API_DIRECTORY = "https://skypegraph.skype.com/v2.0/search/" # Version doesn't seem to be important, at least not for what we need. API_CONFIG = "https://a.config.skype.com/config/v1" USER_AGENT = "SkPy" USER_AGENT_BROWSER = ("Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/33.0.1750.117 Safari/537.36") SKYPE_CLIENT = "1418/9.99.0.999" attrs = ("userId", "tokenFile", "connected", "guest") extSess = requests.Session() extSess.headers["User-Agent"] = USER_AGENT
[docs] def __init__(self): """ Create a new, unconnected instance. """ self.userId = None self.tokens = {} self.tokenExpiry = {} self.tokenFile = None self.hasUserPwd = False self.msgsHost = self.API_MSGSHOST self.sess = requests.Session() self.sess.headers["User-Agent"] = self.USER_AGENT self.endpoints = {"self": SkypeEndpoint(self, "SELF")} self.syncStates = {}
@property def connected(self): return "skype" in self.tokenExpiry and datetime.now() <= self.tokenExpiry["skype"] \ and "reg" in self.tokenExpiry and datetime.now() <= self.tokenExpiry["reg"] @property def guest(self): return self.userId.startswith("guest:") if self.userId else None
[docs] def closure(self, method, *args, **kwargs): """ Create a generic closure to call a method with fixed arguments. Args: method (MethodType): bound method of the class args (list): positional arguments for the method kwargs (dict): keyword arguments for the method Returns: MethodType: bound method closure """ @functools.wraps(method) def inner(self): return method(*args, **kwargs) return MethodType(inner, self)
[docs] def __call__(self, method, url, codes=(200, 201, 202, 204, 207), auth=None, headers=None, **kwargs): """ Make an API call. Most parameters are passed directly to :mod:`requests`. Set codes to a list of valid HTTP response codes -- an exception is raised if the response does not match. If authentication is required, set ``auth`` to one of the :class:`Auth` constants. Args: method (str): HTTP request method url (str): full URL to connect to codes (int list): expected HTTP response codes for success auth (Auth): authentication type to be included headers (dict): additional headers to be included kwargs (dict): any extra parameters to pass to :func:`requests.request` Returns: requests.Response: response object provided by :mod:`requests` Raises: .SkypeAuthException: if an authentication rate limit is reached .SkypeApiException: if a successful status code is not received """ self.verifyToken(auth) if not headers: headers = {} debugHeaders = dict(headers) if auth == self.Auth.SkypeToken: headers["X-SkypeToken"] = self.tokens["skype"] debugHeaders["X-SkypeToken"] = "***" elif auth == self.Auth.Authorize: headers["Authorization"] = "skype_token {0}".format(self.tokens["skype"]) debugHeaders["Authorization"] = "***" elif auth == self.Auth.RegToken: headers["RegistrationToken"] = self.tokens["reg"] debugHeaders["RegistrationToken"] = "***" if os.getenv("SKPY_DEBUG_HTTP"): print("<= [{0}] {1} {2}".format(datetime.now().strftime("%d/%m %H:%M:%S"), method, url)) print(pformat(dict(kwargs, headers=debugHeaders))) resp = self.sess.request(method, url, headers=headers, **kwargs) if os.getenv("SKPY_DEBUG_HTTP"): print("=> [{0}] {1}".format(datetime.now().strftime("%d/%m %H:%M:%S"), resp.status_code)) print(pformat(dict(resp.headers))) try: print(pformat(resp.json())) except ValueError: print(resp.text) if resp.status_code not in codes: if resp.status_code == 429: raise SkypeRateLimitException("Rate limit exceeded", resp) raise SkypeApiException("{0} response from {1} {2}".format(resp.status_code, method, url), resp) return resp
[docs] def syncStateCall(self, method, url, params={}, **kwargs): """ Follow and track sync state URLs provided by an API endpoint, in order to implicitly handle pagination. In the first call, ``url`` and ``params`` are used as-is. If a ``syncState`` endpoint is provided in the response, subsequent calls go to the latest URL instead. Args: method (str): HTTP request method url (str): full URL to connect to params (dict): query parameters to include in the URL kwargs (dict): any extra parameters to pass to :meth:`__call__` """ try: states = self.syncStates[(method, url)] except KeyError: states = self.syncStates[(method, url)] = [] if states: # We have a state link, use it to replace the URL and query string. url = states[-1] params = {} resp = self(method, url, params=params, **kwargs) try: json = resp.json() except ValueError: # Don't do anything if not a JSON response. pass else: # If a state link exists in the response, store it for later. state = json.get("_metadata", {}).get("syncState") if state: states.append(state) return resp
[docs] def setTokenFile(self, path): """ Enable reading and writing session tokens to a file at the given location. Args: path (str): path to file used for token storage """ self.tokenFile = path
[docs] def readTokenFromStr(self, tokens): """ Attempt to re-establish a connection using previously acquired tokens from a string. If the Skype token is valid but the registration token is invalid, a new endpoint will be registered. Args: tokens (str): string containing tokens Raises: .SkypeAuthException: if the token string cannot be used to authenticate """ lines = tokens.splitlines() try: user, skypeToken, skypeExpiry, regToken, regExpiry, msgsHost = lines skypeExpiry = datetime.fromtimestamp(int(skypeExpiry)) regExpiry = datetime.fromtimestamp(int(regExpiry)) except ValueError: raise SkypeTokenException("Token file is malformed") if datetime.now() >= skypeExpiry: raise SkypeTokenException("Token file has expired") self.userId = user self.tokens["skype"] = skypeToken self.tokenExpiry["skype"] = skypeExpiry if datetime.now() < regExpiry: self.tokens["reg"] = regToken self.tokenExpiry["reg"] = regExpiry self.msgsHost = msgsHost else: self.getRegToken()
[docs] def readToken(self): """ Attempt to re-establish a connection using previously acquired tokens. If the Skype token is valid but the registration token is invalid, a new endpoint will be registered. Raises: .SkypeAuthException: if the token file cannot be used to authenticate """ if not self.tokenFile: raise SkypeTokenException("No token file specified") try: with open(self.tokenFile, "r") as f: tokens = f.read() except OSError: raise SkypeTokenException("Token file doesn't exist or not readable") self.readTokenFromStr(tokens)
[docs] def writeTokenToStr(self): """ Return details of the current connection into a string. This can be used by :meth:`readTokenFromStr` to re-authenticate at a later time. Returns: str: A token string that can be used by :meth:`readTokenFromStr` to re-authenticate. """ return "\n".join([ self.userId, self.tokens["skype"], str(int(time.mktime(self.tokenExpiry["skype"].timetuple()))), self.tokens["reg"], str(int(time.mktime(self.tokenExpiry["reg"].timetuple()))), self.msgsHost ]) + "\n"
[docs] def writeToken(self): """ Store details of the current connection in the named file. This can be used by :meth:`readToken` to re-authenticate at a later time. """ # Write token file privately. with os.fdopen(os.open(self.tokenFile, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f: # When opening files via os, truncation must be done manually. f.truncate() f.write(self.writeTokenToStr())
[docs] def verifyToken(self, auth): """ Ensure the authentication token for the given auth method is still valid. Args: auth (Auth): authentication type to check Raises: .SkypeAuthException: if Skype auth is required, and the current token has expired and can't be renewed """ if auth in (self.Auth.SkypeToken, self.Auth.Authorize): if "skype" not in self.tokenExpiry or datetime.now() >= self.tokenExpiry["skype"]: if not hasattr(self, "getSkypeToken"): raise SkypeTokenException("Skype token expired, and no password specified") self.getSkypeToken() elif auth == self.Auth.RegToken: if "reg" not in self.tokenExpiry or datetime.now() >= self.tokenExpiry["reg"]: self.getRegToken()
[docs] def skypeTokenClosure(self, method, *args, **kwargs): """ Replace the stub :meth:`getSkypeToken` method with one that connects using the given credentials. Avoids storing the account password in an accessible way. """ self.getSkypeToken = self.closure(method, *args, **kwargs) self.hasUserPwd = True
[docs] def setUserPwd(self, user, pwd): """ Replace the stub :meth:`getSkypeToken` method with one that connects via SOAP login using the given credentials. Avoids storing the account password in an accessible way. Args: user (str): username or email address of the connecting account pwd (str): password of the connecting account """ login = self.soapLogin if "@" in user else self.liveLogin self.skypeTokenClosure(login, user, pwd)
[docs] def liveLogin(self, user, pwd): """ Obtain connection parameters from the Microsoft account login page, and perform a login with the given email address or Skype username, and its password. This emulates a login to Skype for Web on ``login.live.com``. .. note:: Microsoft accounts with two-factor authentication enabled are not supported, and will cause a :class:`.SkypeAuthException` to be raised. See the exception definitions for other possible causes. Args: user (str): username or email address of the connecting account pwd (str): password of the connecting account Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ if not self.hasUserPwd: self.skypeTokenClosure(self.liveLogin, user, pwd) self.tokens["skype"], self.tokenExpiry["skype"] = SkypeLiveAuthProvider(self).auth(user, pwd) self.getUserId() self.getRegToken()
[docs] def soapLogin(self, user, pwd): """ Perform a login with the given email address or Skype username, and its password, using the Microsoft account SOAP login APIs. .. note:: Microsoft accounts with two-factor authentication enabled are supported if an application-specific password is provided. Skype accounts must be linked to a Microsoft account with an email address, otherwise :class:`.SkypeAuthException` will be raised. See the exception definitions for other possible causes. Args: user (str): username or email address of the connecting account pwd (str): password of the connecting account Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ if not self.hasUserPwd: self.skypeTokenClosure(self.soapLogin, user, pwd) self.tokens["skype"], self.tokenExpiry["skype"] = SkypeSOAPAuthProvider(self).auth(user, pwd) self.getUserId() self.getRegToken()
[docs] def guestLogin(self, url, name): """ Connect to Skype as a guest, joining a given conversation. In this state, some APIs (such as contacts) will return 401 status codes. A guest can only communicate with the conversation they originally joined. Args: url (str): public join URL for conversation, or identifier from it name (str): display name as shown to other participants Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ self.tokens["skype"], self.tokenExpiry["skype"] = SkypeGuestAuthProvider(self).auth(url, name) self.getUserId() self.getRegToken()
[docs] def getSkypeToken(self): """ A wrapper for the default login provider that applies the previously given username and password. Raises: .SkypeAuthException: if credentials were never provided """ raise SkypeTokenException("No username or password provided, and no valid token file")
[docs] def refreshSkypeToken(self): """ Take the existing Skype token and refresh it, to extend the expiry time without other credentials. Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ self.tokens["skype"], self.tokenExpiry["skype"] = SkypeRefreshAuthProvider(self).auth(self.tokens["skype"]) self.getRegToken()
[docs] def getUserId(self): """ Ask Skype for the authenticated user's identifier, and store it on the connection object. """ self.userId = self("GET", "{0}/users/self/profile".format(self.API_USER), auth=self.Auth.SkypeToken).json().get("username")
[docs] def getRegToken(self): """ Acquire a new registration token. Once successful, all tokens and expiry times are written to the token file (if specified on initialisation). """ self.verifyToken(self.Auth.SkypeToken) token, expiry, msgsHost, endpoint = SkypeRegistrationTokenProvider(self).auth(self.tokens["skype"]) self.tokens["reg"] = token self.tokenExpiry["reg"] = expiry self.msgsHost = msgsHost if endpoint: endpoint.config() self.endpoints["main"] = endpoint self.syncEndpoints() if self.tokenFile: self.writeToken()
[docs] def syncEndpoints(self): """ Retrieve all current endpoints for the connected user. """ self.endpoints["all"] = [] for json in self("GET", "{0}/users/ME/presenceDocs/messagingService".format(self.msgsHost), params={"view": "expanded"}, auth=self.Auth.RegToken).json().get("endpointPresenceDocs", []): id = json.get("link", "").split("/")[7] self.endpoints["all"].append(SkypeEndpoint(self, id))
[docs]class SkypeAuthProvider(SkypeObj): """ A base class for authentication providers. Subclasses should implement the :meth:`auth` method. """ def __init__(self, conn): self.conn = conn
[docs] def auth(self, *args, **kwargs): """ Authenticate a user, given some form of identification. Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login forms can't be processed """ raise NotImplementedError
[docs]class SkypeAPIAuthProvider(SkypeAuthProvider): """ An authentication provider that connects via the Skype API. Only compatible with Skype usernames. """
[docs] def auth(self, user, pwd): """ Perform a login with the given Skype username and its password. This emulates a login to Skype for Web on ``api.skype.com``. Args: user (str): username of the connecting account pwd (str): password of the connecting account Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ # Wrap up the credentials ready to send. pwdHash = base64.b64encode(hashlib.md5((user + "\nskyper\n" + pwd).encode("utf-8")).digest()).decode("utf-8") resp = self.conn("POST", "{0}/login/skypetoken".format(SkypeConnection.API_USER), json={"username": user, "passwordHash": pwdHash, "scopes": "client"}) json = resp.json() if "skypetoken" not in json: raise SkypeAuthException("Couldn't retrieve Skype token from response", resp) expiry = None if "expiresIn" in json: expiry = datetime.fromtimestamp(int(time.time()) + int(json["expiresIn"])) return json["skypetoken"], expiry
[docs]class LiveAuthSuccess(Exception): """ An exception used to capture the 't' value needed during Microsoft account authentication. """ def __init__(self, t): super(LiveAuthSuccess, self).__init__(t) self.t = t
[docs]class SkypeLiveAuthProvider(SkypeAuthProvider): """ An authentication provider that connects via Microsoft account authentication. """
[docs] def checkUser(self, user): """ Query a username or email address to see if a corresponding Microsoft account exists. Args: user (str): username or email address of an account Returns: bool: whether the account exists """ return not self.conn("POST", "{0}/GetCredentialType.srf".format(SkypeConnection.API_MSACC), json={"username": user}).json().get("IfExistsResult")
[docs] def auth(self, user, pwd): """ Obtain connection parameters from the Microsoft account login page, and perform a login with the given email address or Skype username, and its password. This emulates a login to Skype for Web on ``login.live.com``. .. note:: Microsoft accounts with two-factor authentication enabled are not supported, and will cause a :class:`.SkypeAuthException` to be raised. See the exception definitions for other possible causes. Args: user (str): username or email address of the connecting account pwd (str): password of the connecting account Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ try: self.getT(user, pwd) except LiveAuthSuccess as ex: return self.getToken(ex.t)
def check(self, resp): page = BeautifulSoup(resp.text, "html.parser") # Look for the 't' value we need to exchange for a Skype token, which might turn up at any stage. tField = page.find(id="t") if tField is not None: raise LiveAuthSuccess(tField.get("value")) # Look for an error message within the response. errReg = re.search(r"sErrTxt:'([^'\\]*(\\.[^'\\]*)+)'", resp.text) if errReg: errMsg = re.sub(r"<.*?>", "", errReg.group(1)).replace("\\'", "'").replace("\\\\", "\\") raise SkypeApiException(errMsg, resp) # Look for two-factor authentication device information (a non-empty array of factors) that we can't handle. if re.search(r"\bV:\s*\[\s*{", resp.text): raise SkypeAuthException("Two-factor authentication unsupported", resp) # Look for a user consent form, meaning the user needs to accept terms or follow account security steps. for form in page.findAll("form"): if form["name"] == "fmHF": url = form["action"].split("?", 1)[0] raise SkypeAuthException("Account action required ({0}), login with a web browser first" .format(url), resp) # No common elements, return the response for further processing. return resp def getT(self, user, pwd): # Stage 1: Start a Microsoft account login from Skype, which will redirect to login.live.com. stage1Resp = self.check(self.conn("GET", "{0}/oauth/microsoft".format(SkypeConnection.API_LOGIN), params={"client_id": "578134", "redirect_uri": "https://web.skype.com"})) # This is inside some embedded JavaScript, so can't easily parse with BeautifulSoup. ppftReg = re.search(r"""<input.*?name="PPFT".*?value="(.*?)""" + "\"", stage1Resp.text) if not ppftReg: raise SkypeApiException("Couldn't retrieve PPFT from login form", stage1Resp) ppft = ppftReg.group(1) if "MSPRequ" not in stage1Resp.cookies or "MSPOK" not in stage1Resp.cookies: raise SkypeApiException("Couldn't retrieve MSPRequ/MSPOK cookies", stage1Resp) # Prepare the Live login page request parameters. params = {"wa": "wsignin1.0", "wp": "MBI_SSL", "wreply": "https://lw.skype.com/login/oauth/proxy?client_id=578134&site_name=" "lw.skype.com&redirect_uri=https%3A%2F%2Fweb.skype.com%2F"} cookies = {"MSPRequ": stage1Resp.cookies.get("MSPRequ"), "MSPOK": stage1Resp.cookies.get("MSPOK")} # Stage 2: Submit the user's credentials. stage2Resp = self.check(self.conn("POST", "{0}/ppsecure/post.srf".format(SkypeConnection.API_MSACC), params=params, cookies=dict(cookies, CkTst="G{0}".format(int(time.time() * 1000))), data={"login": user, "passwd": pwd, "PPFT": ppft, "loginoptions": "3"})) opidReg = re.search(r"""opid=([A-Z0-9]+)""", stage2Resp.text, re.I) if not opidReg: raise SkypeApiException("Couldn't retrieve opid field from login response", stage2Resp) # Stage 3: Repeat with the 'opid' parameter. stage3Resp = self.check(self.conn("POST", "{0}/ppsecure/post.srf".format(SkypeConnection.API_MSACC), params=params, cookies=dict(cookies, CkTst="G{0}".format(int(time.time() * 1000))), data={"opid": opidReg.group(1), "PPFT": ppft, "site_name": "lw.skype.com", "oauthPartner": "999", "client_id": "578134", "redirect_uri": "https://web.skype.com", "type": "28"})) # No check matches, and no further actions we can take. raise SkypeApiException("Couldn't retrieve t field from login response", stage3Resp) def getToken(self, t): # Now exchange the 't' value for a Skype token. loginResp = self.conn("POST", "{0}/microsoft".format(SkypeConnection.API_LOGIN), params={"client_id": "578134", "redirect_uri": "https://web.skype.com"}, data={"t": t, "client_id": "578134", "oauthPartner": "999", "site_name": "lw.skype.com", "redirect_uri": "https://web.skype.com"}) loginPage = BeautifulSoup(loginResp.text, "html.parser") # Collect the Skype token, and expiry if present. tokenField = loginPage.find("input", {"name": "skypetoken"}) if not tokenField: raise SkypeApiException("Couldn't retrieve Skype token from login response", loginResp) token = tokenField.get("value") expiryField = loginPage.find("input", {"name": "expires_in"}) expiry = None if expiryField: expiry = datetime.fromtimestamp(int(time.time()) + int(expiryField.get("value"))) return (token, expiry)
[docs]class SkypeSOAPAuthProvider(SkypeAuthProvider): """ An authentication provider that connects via Microsoft account SOAP authentication. """ template = """ <Envelope xmlns='http://schemas.xmlsoap.org/soap/envelope/' xmlns:wsse='http://schemas.xmlsoap.org/ws/2003/06/secext' xmlns:wsp='http://schemas.xmlsoap.org/ws/2002/12/policy' xmlns:wsa='http://schemas.xmlsoap.org/ws/2004/03/addressing' xmlns:wst='http://schemas.xmlsoap.org/ws/2004/04/trust' xmlns:ps='http://schemas.microsoft.com/Passport/SoapServices/PPCRL'> <Header> <wsse:Security> <wsse:UsernameToken Id='user'> <wsse:Username>{}</wsse:Username> <wsse:Password>{}</wsse:Password> </wsse:UsernameToken> </wsse:Security> </Header> <Body> <ps:RequestMultipleSecurityTokens Id='RSTS'> <wst:RequestSecurityToken Id='RST0'> <wst:RequestType>http://schemas.xmlsoap.org/ws/2004/04/security/trust/Issue</wst:RequestType> <wsp:AppliesTo> <wsa:EndpointReference> <wsa:Address>wl.skype.com</wsa:Address> </wsa:EndpointReference> </wsp:AppliesTo> <wsse:PolicyReference URI='MBI_SSL'></wsse:PolicyReference> </wst:RequestSecurityToken> </ps:RequestMultipleSecurityTokens> </Body> </Envelope> """ @staticmethod def encode(value): return value.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
[docs] def auth(self, user, pwd): """ Perform a SOAP login with the given email address or Skype username, and its password. .. note:: Microsoft accounts with two-factor authentication enabled must provide an application-specific password. Args: user (str): username or email address of the connecting account pwd (str): password of the connecting account Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ token = self.getSecToken(user, pwd) return self.exchangeToken(token)
def getSecToken(self, user, pwd): loginResp = self.conn("POST", "{0}/RST.srf".format(SkypeConnection.API_MSACC), data=self.template.format(self.encode(user), self.encode(pwd))) loginData = ElementTree.fromstring(loginResp.text) token = None for node in loginData.iter(): tag = node.tag.split("}", 1)[-1] if tag == "Fault": code = msg = None for fnode in node: ftag = fnode.tag.split("}", 1)[-1] if ftag == "faultcode": code = fnode.text elif ftag == "faultstring": msg = fnode.text if code or msg: raise SkypeAuthException("{} - {}".format(code, msg), loginResp) else: raise SkypeApiException("Unknown fault whilst requesting security token", loginResp) elif tag == "BinarySecurityToken": token = node.text if not token: raise SkypeApiException("Couldn't retrieve security token from login response", loginResp) return token def exchangeToken(self, token): edgeResp = self.conn("POST", SkypeConnection.API_EDGE, data={"partner": 999, "access_token": token, "scopes": "client"}) try: edgeData = edgeResp.json() except ValueError: raise SkypeApiException("Couldn't parse edge response body", edgeResp) if "skypetoken" in edgeData: token = edgeData["skypetoken"] expiry = None if "expiresIn" in edgeData: expiry = datetime.fromtimestamp(int(time.time()) + int(edgeData["expiresIn"])) return (token, expiry) elif "status" in edgeData: status = edgeData["status"] raise SkypeApiException("{} - {}".format(status.get("code"), status.get("text")), edgeResp) else: raise SkypeApiException("Couldn't retrieve token from edge response", edgeResp)
[docs]class SkypeGuestAuthProvider(SkypeAuthProvider): """ An authentication provider that connects and joins a public conversation via a join URL. """
[docs] def auth(self, url, name): """ Connect to Skype as a guest, joining a given conversation. In this state, some APIs (such as contacts) will return 401 status codes. A guest can only communicate with the conversation they originally joined. Args: url (str): public join URL for conversation, or identifier from it name (str): display name as shown to other participants Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ urlId = url.split("/")[-1] # Pretend to be Chrome on Windows (required to avoid "unsupported device" messages). cookies = self.conn("GET", "{0}/{1}".format(SkypeConnection.API_JOIN, urlId), headers={"User-Agent": SkypeConnection.USER_AGENT_BROWSER}).cookies ids = self.conn("GET", "{0}/meetings/{1}".format(SkypeConnection.API_JOIN_CREATE, urlId)).json() token = self.conn("POST", "{0}/threads/{1}/members".format(SkypeConnection.API_JOIN_CREATE, ids.get("threadId")), headers={"csrf_token": cookies.get("csrf_token"), "X-Skype-Request-Id": cookies.get("launcher_session_id")}, json={"displayName": name}).json().get("skypetoken") # Assume the token lasts 24 hours, as a guest account only lasts that long anyway. expiry = datetime.now() + timedelta(days=1) return token, expiry
[docs]class SkypeRefreshAuthProvider(SkypeAuthProvider): """ An authentication provider that connects via the Skype API. Only compatible with Skype usernames. """
[docs] def auth(self, token): """ Take an existing Skype token and refresh it, to extend the expiry time without other credentials. Args: token (str): existing Skype token Returns: (str, datetime.datetime) tuple: Skype token, and associated expiry if known Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ t = self.sendToken(token) return self.getToken(t)
def sendToken(self, token): # Send the existing token over. loginResp = self.conn("GET", "{0}/login".format(SkypeConnection.API_LOGIN), params={"client_id": "578134", "redirect_uri": "https://web.skype.com"}, cookies={"refresh-token": token}) tField = BeautifulSoup(loginResp.text, "html.parser").find(id="t") if tField is None: err = re.search(r"sErrTxt:'([^'\\]*(\\.[^'\\]*)*)'", loginResp.text) errMsg = "Couldn't retrieve t field from login response" if err: errMsg = re.sub(r"<.*?>", "", err.group(1)).replace("\\'", "'").replace("\\\\", "\\") raise SkypeAuthException(errMsg, loginResp) return tField.get("value") def getToken(self, t): # Now exchange the 't' value for a Skype token. loginResp = self.conn("POST", "{0}/microsoft".format(SkypeConnection.API_LOGIN), params={"client_id": "578134", "redirect_uri": "https://web.skype.com"}, data={"t": t, "client_id": "578134", "oauthPartner": "999", "site_name": "lw.skype.com", "redirect_uri": "https://web.skype.com"}) loginPage = BeautifulSoup(loginResp.text, "html.parser") # Collect the Skype token, and expiry if present. tokenField = loginPage.find("input", {"name": "skypetoken"}) if not tokenField: raise SkypeApiException("Couldn't retrieve Skype token from login response", loginResp) token = tokenField.get("value") expiryField = loginPage.find("input", {"name": "expires_in"}) expiry = None if expiryField: expiry = datetime.fromtimestamp(int(time.time()) + int(expiryField.get("value"))) return (token, expiry)
[docs]class SkypeRegistrationTokenProvider(SkypeAuthProvider): """ An authentication provider that handles the handshake for a registration token. """
[docs] def auth(self, skypeToken): """ Request a new registration token using a current Skype token. Args: skypeToken (str): existing Skype token Returns: (str, datetime.datetime, str, SkypeEndpoint) tuple: registration token, associated expiry if known, resulting endpoint hostname, endpoint if provided Raises: .SkypeAuthException: if the login request is rejected .SkypeApiException: if the login form can't be processed """ token = expiry = endpoint = None msgsHost = SkypeConnection.API_MSGSHOST while not token: secs = int(time.time()) hash = self.getMac256Hash(str(secs)) headers = {"LockAndKey": "appId=msmsgs@msnmsgr.com; time={0}; lockAndKeyResponse={1}".format(secs, hash), "Authentication": "skypetoken=" + skypeToken, "BehaviorOverride": "redirectAs404"} endpointResp = self.conn("POST", "{0}/users/ME/endpoints".format(msgsHost), codes=(200, 201, 404), headers=headers, json={"endpointFeatures": "Agent"}) regTokenHead = endpointResp.headers.get("Set-RegistrationToken") locHead = endpointResp.headers.get("Location") if locHead: locParts = re.search(r"(https://[^/]+/v1)/users/ME/endpoints(/(%7B[a-z0-9\-]+%7D))?", locHead).groups() if locParts[2]: endpoint = SkypeEndpoint(self.conn, locParts[2].replace("%7B", "{").replace("%7D", "}")) if not locParts[0] == msgsHost: # Skype is requiring the use of a different hostname. msgsHost = locHead.rsplit("/", 4 if locParts[2] else 3)[0] # Don't accept the token if present, we need to re-register first. continue if regTokenHead: token = re.search(r"(registrationToken=[a-z0-9\+/=]+)", regTokenHead, re.I).group(1) regExpiry = re.search(r"expires=(\d+)", regTokenHead).group(1) expiry = datetime.fromtimestamp(int(regExpiry)) regEndMatch = re.search(r"endpointId=({[a-z0-9\-]+})", regTokenHead) if regEndMatch: endpoint = SkypeEndpoint(self.conn, regEndMatch.group(1)) if not endpoint and endpointResp.status_code == 200 and endpointResp.json(): # Use the most recent endpoint listed in the JSON response. endpoint = SkypeEndpoint(self.conn, endpointResp.json()[0]["id"]) return token, expiry, msgsHost, endpoint
[docs] @staticmethod def getMac256Hash(challenge, appId="msmsgs@msnmsgr.com", key="Q1P7W2E4J9R8U3S5"): """ Generate the lock-and-key response, needed to acquire registration tokens. """ clearText = challenge + appId clearText += "0" * (8 - len(clearText) % 8) def int32ToHexString(n): hexChars = "0123456789abcdef" hexString = "" for i in range(4): hexString += hexChars[(n >> (i * 8 + 4)) & 15] hexString += hexChars[(n >> (i * 8)) & 15] return hexString def int64Xor(a, b): sA = "{0:b}".format(a) sB = "{0:b}".format(b) sC = "" sD = "" diff = abs(len(sA) - len(sB)) for i in range(diff): sD += "0" if len(sA) < len(sB): sD += sA sA = sD elif len(sB) < len(sA): sD += sB sB = sD for i in range(len(sA)): sC += "0" if sA[i] == sB[i] else "1" return int(sC, 2) def cS64(pdwData, pInHash): MODULUS = 2147483647 CS64_a = pInHash[0] & MODULUS CS64_b = pInHash[1] & MODULUS CS64_c = pInHash[2] & MODULUS CS64_d = pInHash[3] & MODULUS CS64_e = 242854337 pos = 0 qwDatum = 0 qwMAC = 0 qwSum = 0 for i in range(len(pdwData) // 2): qwDatum = int(pdwData[pos]) pos += 1 qwDatum *= CS64_e qwDatum = qwDatum % MODULUS qwMAC += qwDatum qwMAC *= CS64_a qwMAC += CS64_b qwMAC = qwMAC % MODULUS qwSum += qwMAC qwMAC += int(pdwData[pos]) pos += 1 qwMAC *= CS64_c qwMAC += CS64_d qwMAC = qwMAC % MODULUS qwSum += qwMAC qwMAC += CS64_b qwMAC = qwMAC % MODULUS qwSum += CS64_d qwSum = qwSum % MODULUS return [qwMAC, qwSum] cchClearText = len(clearText) // 4 pClearText = [] for i in range(cchClearText): pClearText = pClearText[:i] + [0] + pClearText[i:] for pos in range(4): pClearText[i] += ord(clearText[4 * i + pos]) * (256 ** pos) sha256Hash = [0, 0, 0, 0] hash = hashlib.sha256((challenge + key).encode("utf-8")).hexdigest().upper() for i in range(len(sha256Hash)): sha256Hash[i] = 0 for pos in range(4): dpos = 8 * i + pos * 2 sha256Hash[i] += int(hash[dpos:dpos + 2], 16) * (256 ** pos) macHash = cS64(pClearText, sha256Hash) macParts = [macHash[0], macHash[1], macHash[0], macHash[1]] return "".join(map(int32ToHexString, map(int64Xor, sha256Hash, macParts)))
[docs]class SkypeEndpoint(SkypeObj): """ An endpoint represents a single point of presence within Skype. Typically, a user with multiple devices would have one endpoint per device (desktop, laptop, mobile and so on). Endpoints are time-sensitive -- they lapse after a short time unless kept alive (by :meth:`ping` or otherwise). """ attrs = ("id",) resources = ["/v1/users/ME/conversations/ALL/properties", "/v1/users/ME/conversations/ALL/messages", "/v1/threads/ALL"]
[docs] def __init__(self, conn, id): """ Create a new instance based on a newly-created endpoint identifier. Args: conn (SkypeConnection): parent connection instance id (str): endpoint identifier as generated by the API """ super(SkypeEndpoint, self).__init__() self.conn = conn self.id = id self.subscribed = False self.subscribedPresence = False
[docs] def config(self, name="skype"): """ Configure this endpoint to allow setting presence. Args: name (str): display name for this endpoint """ self.conn("PUT", "{0}/users/ME/endpoints/{1}/presenceDocs/messagingService" .format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken, json={"id": "messagingService", "type": "EndpointPresenceDoc", "selfLink": "uri", "privateInfo": {"epname": name}, "publicInfo": {"capabilities": "", "type": 1, "skypeNameVersion": "skype.com", "nodeInfo": "xx", "version": "908/1.30.0.128"}})
[docs] def ping(self, timeout=12): """ Send a keep-alive request for the endpoint. Args: timeout (int): maximum amount of time for the endpoint to stay active """ self.conn("POST", "{0}/users/ME/endpoints/{1}/active".format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken, json={"timeout": timeout})
[docs] def subscribe(self): """ Subscribe to contact and conversation events. These are accessible through :meth:`getEvents`. """ self.conn("POST", "{0}/users/ME/endpoints/{1}/subscriptions".format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken, json={"interestedResources": self.resources, "channelType": "HttpLongPoll", "conversationType": 2047}) self.subscribed = True
[docs] def subscribePresence(self, contacts): """ Enable presence subscriptions for the authenticated user's contacts. Args: contacts (.SkypeContacts): contact list to select user IDs """ if not self.subscribed: self.subscribe() resources = list(self.resources) for contact in contacts: resources.append("/v1/users/ME/contacts/8:{}".format(contact.id)) self.conn("PUT", "{0}/users/ME/endpoints/{1}/subscriptions/0".format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken, params={"name": "interestedResources"}, json={"interestedResources": resources}) self.subscribedPresence = True
[docs] def getEvents(self): """ Retrieve a list of events since the last poll. Multiple calls may be needed to retrieve all events. If no events occur, the API will block for up to 30 seconds, after which an empty list is returned. If any event occurs whilst blocked, it is returned immediately. Returns: :class:`.SkypeEvent` list: list of events, possibly empty """ if not self.subscribed: self.subscribe() return self.conn("POST", "{0}/users/ME/endpoints/{1}/subscriptions/0/poll".format(self.conn.msgsHost, self.id), auth=SkypeConnection.Auth.RegToken).json().get("eventMessages", [])