# Copyright (c) 2007, 2008 Novell, Inc. # All Rights Reserved. # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public License as # published by the Free Software Foundation; version 2.1 of the license. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library; if not, contact Novell, Inc. # # To contact Novell about this file by physical or electronic mail, # you may find current contact information at www.novell.com # # This file incorporates work covered by the following copyright and # permission notice: # # This file originated from Robert Richards CDATA blog, # downloaded from http://www.cdatazone.org/index.php?/pages/source.html # http://www.cdatazone.org/files/pubdomain.tar.gz # According to blog entry # http://www.cdatazone.org/index.php?/archives/26-Catching-Up.html # posted Thursday, July 5. 2007 the libraries have been released under a # bsd style license. # # Copyright (c) 2007, Robert Richards . # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in # the documentation and/or other materials provided with the # distribution. # # * Neither the name of Robert Richards nor the names of his # contributors may be used to endorse or promote products derived # from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. import sys, traceback import re import xmlseclibs, cookielib, datetime import hashlib import urlparse from xml.dom import minidom from xml import xpath import xml.utils.iso8601 import event from event import Event, BasicEventLog """ public namespace identifiers for different token types """ SAML_1_0_ASSERT_NS = 'urn:oasis:names:tc:SAML:1.0:assertion' SAML_1_1_ASSERT_NS = 'http://docs.oasis-open.org/wss/oasis-wss-saml-token-profile-1.1#SAMLV1.1' SAML_2_0_ASSERT_NS = 'urn:oasis:names:tc:SAML:2.0:assertion' """String to verify that the token is a bearer token """ BEARER_TOKEN = 'urn:oasis:names:tc:SAML:1.0:cm:bearer' HOLDER_OF_KEY_TOKEN = 'urn:oasis:names:tc:SAML:1.0:cm:holder-of-key' SENDER_VOUCHES_TOKEN = "urn:oasis:names:tc:SAML:1.0:cm:sender-vouches" """ public identifiers for meta data about the token """ META_AssertionID = 'AssertionID' META_Audience = 'Audience' META_NotBefore = 'NotBefore' META_NotOnOrAfter = 'NotOnOrAfter' META_Issuer = 'Issuer' META_IssueInstant = 'IssueInstant' META_MajorVersion = 'MajorVersion' META_MinorVersion = 'MinorVersion' META_CardKeyHash = 'CardKeyHash' META_SubjectConfirmation = 'SubjectConfirmationNethod' """ public identifiers for options which can or must be set on the infocard processor """ OPTION_CryptoKey = 'cryptoKey' OPTION_CryptoKeyPass = 'cryptoKeyPass' OPTION_CryptoKeyIsFile = 'cryptoKeyIsFile' OPTION_CryptoKeyIsCert = 'cryptoKeyIsCert' OPTION_CryptoKeyPublic = 'cryptoKeyPublic' OPTION_CryptoCert = 'cryptoCert' """ space delimited lists of claims """ OPTION_required_claims = 'required_claims' OPTION_optional_claims = 'optional_claims' OPTION_multivalued_claims = 'multivalued_claims' """ public identifiers for various settable options """ OPTION_token_type = 'token_type' class InfoCardProcessor: """Base object for consumers who wish to build a python based RP for processing information cards and dealing with cardspace. This object may be created and configured once then used to evaluate many security tokens """ def __init__(self): self.options = {} def setDecode(self, privateKey, passPhrase = None, isFile=False, isCert = True): """Setup the cert/private key used to decrypt tokens. In many cases this will be the servers ssl cert. always returns None """ if isFile: fp = open(privateKey, 'rb') privateKey = fp.read() fp.close() isFile = False self.options[OPTION_CryptoKey] = privateKey self.options[OPTION_CryptoKeyPass] = passPhrase self.options[OPTION_CryptoKeyIsFile] = isFile self.options[OPTION_CryptoKeyIsCert] = isCert def setClaims(self, required, optional = None, multivalued = None): """Helper function to simplify setting of the optional and required claims, as well as which claims may be multivalued. Failure to tell the processor which claims are expected may result in errors being reported. The process helps verify that all required claims were received and that no additional claims were sent. Todo: insert empty claim handling Todo: insert custom claim transformations always returns None """ self.options[OPTION_required_claims] = required self.options[OPTION_optional_claims] = optional self.options[OPTION_multivalued_claims] = multivalued def setOptions(self, options): """Set options for processing the security token The most common options relate to the overriding of event severity, please see evemt.py for details of option name and values. """ if options: for name in options.keys(): self.options[name] = options[name] def processToken(self, xmlToken = None, transport='http'): """Parse the token using prevously configured keys, claims and options. returns a SecToken objec, the returned secToken may or may not be valid, it is up to the caller to check secToken.isValid() """ secToken = SecToken(self.options) if secToken: secToken.processToken(xmlToken, transport) return secToken class SecToken: """ Class for the parsing and holding of security token data Instantiate, configure, process, check validity Typiclly not directly instanciated, instead InfoCardProcessor is used to hold the common configuration and as a factory for creating SecTokens Todo: currently only supports SAML 1.0/1.1 tokens, that should be abstracted out to allow for many token types. """ def __init__(self, options=None): self.crypted = None #Raw security token as delivered from the client self.cryptedDom = None #Raw security token as delivered from the client # and parsed into dom self.decrypted = None #decrypted token self.isValid = False #boolean, is security token valid self.objKeyInfo = None #internal object self.assertions = {} #array of assertions/claims self.metadata = {} #array of misc data about the token, such as #token type self.eventLog = BasicEventLog(options) self.namespace = SAML_1_0_ASSERT_NS self.options = options def processToken(self, xmlToken, transport): """Process the token, until processed no data is present returns True if valid, False if invalid. Events are logged to self.eventLog detialing failure reasons. """ self.crypted = xmlToken try: if not self.crypted or not len(self.crypted): self.eventLog.add_event("A Token was not supplied or was empty", event.FATAL, 'empty-token') else: objenc = xmlseclibs.XMLSecEnc(self.eventLog) self.cryptedDom = minidom.parseString(self.crypted) encData = objenc.locateEncryptedData(self.cryptedDom) if not encData: if transport == 'http': self.eventLog.add_event("Cannot locate encrypted"\ " data in security token, please ensure all requests"\ " are sent over SSL.", event.ERROR, 'parse-locate-encrypted') self.decrypted = self.cryptedDom else: objenc.setNode(encData) objenc.type = encData.getAttribute("Type") key = None objKey = objenc.locateKey() if (objKey): self.objKeyInfo = objenc.locateKeyInfo(objKey, None, loadDecryptionKey, self) # if (self.objKeyInfo): # if (self.objKeyInfo.isEncrypted): # objencKey = self.objKeyInfo.encryptedCtx # self.objKeyInfo.loadKey( # self.options.get(OPTION_CryptoKey), # self.options.get(OPTION_CryptoKeyPass), # self.options.get(OPTION_CryptoKeyIsFile), # self.options.get(OPTION_CryptoKeyIsCert)) # key = objencKey.decryptKey(self.objKeyInfo) # else: # self.eventLog.add_event("ObjectKeyInfo is not encrypted", # event.INFO, 'parse-object-key-info') # # if (not objKey) or (not key): # self.eventLog.add_event("Error loading key to handle "\ # "decryption. The RP Administrator must properly "\ # "configure the system.", event.FATAL, 'parse-keyload') # else: # objKey.loadKey(key) decrypt = objenc.decryptNode(objKey, False) if decrypt: # we have the saml token so load er up, catch the error try: self.decrypted = minidom.parseString(decrypt) except: pass else: self.eventLog.add_event('Unable to decrypt token', event.FATAL, 'decrypt') if not self.decrypted: self.eventLog.add_event('Unable to parse decrypted token to DOM', event.FATAL, 'parse-decrypted-to-dom') else: #grab assertions/claims self._setupAssertions() #set up metadata about the token for later use self._setupMiscData() #do validity checking self.isValid = self._isValid() if not self.isValid: self.eventLog.add_event("Security Token did not pass all valididation requirements.", event.ERROR, 'transaction-summary') except Exception, why: trace = traceback.format_exception(*sys.exc_info()) traceString = '\n '.join(trace) errorMsg = "ERROR:\n%s\n" % (traceString) self.eventLog.add_event("Exception encountered while processing Security Token, exception: %s " % (str(errorMsg)), event.FATAL, 'exception') return self.isValid def _setupMiscData(self): """Setup all of the data about the token we can, note that some data like the audience, valid before/after are set in _isValid Todo:: currently saml specific """ rootNode = self.decrypted.documentElement if rootNode: self.metadata[META_Issuer] = rootNode.getAttribute('Issuer') self.metadata[META_IssueInstant] = rootNode.getAttribute('IssueInstant') self.metadata[META_AssertionID] = rootNode.getAttribute('AssertionID') self.metadata[META_MajorVersion] = rootNode.getAttribute('MajorVersion') self.metadata[META_MinorVersion] = rootNode.getAttribute('MinorVersion') self.metadata[META_CardKeyHash] = self._getCardKeyHash() def _isValid(self): """ Validate the token, including signature validation, validity period, osis interop tests, and that the expected claims are present. There are some metadata items cached by this function Todo:: currently saml specific """ data = self.decrypted.documentElement.namespaceURI if data == SAML_1_0_ASSERT_NS or data == SAML_1_1_ASSERT_NS: self.namespace = data else: self.eventLog.add_event("Unknown SAML namespace encountered: %s " % (data), event.FATAL, 'unknown-namespace') objXMLSecDSig = xmlseclibs.XMLSecurityDSig(self.eventLog) objDSig = objXMLSecDSig.locateSignature(self.decrypted) if not objDSig: self.eventLog.add_event("SAML signature location failed", event.ERROR, 'locate-saml-signature') # Canonicalize the signed info objXMLSecDSig.canonicalizeSignedInfo() retVal = objXMLSecDSig.validateReference() if not retVal: self.eventLog.add_event("SAML reference validation failed.", event.ERROR, 'validate-saml-reference') key = None objKey = objXMLSecDSig.locateKey() if not objKey: self.eventLog.add_event("Error locating/loading key to handle "\ "signature.", event.ERROR, 'decrypt-loadkey') else: self.objKeyInfo = xmlseclibs.XMLSecEnc.staticLocateKeyInfo(objKey, objDSig, self.eventLog) # if (self.objKeyInfo): # # Handle any additional key processing such as encrypted keys here # self.eventLog.add_event("Potential additional key processing required: %s, %u" \ # % (self.objKeyInfo.type, objKey.isEncrypted), # event.INFO) # pass # else: # self.eventLog.add_event("No Key Info found for %u" \ # % (objKey.isEncrypted), event.INFO) retVal = objXMLSecDSig.verify(objKey) if not retVal: self.eventLog.add_event("Unable to verify digital signature.", event.ERROR, 'verify-signature') xPath = xpath.CreateContext(self.decrypted) xPath.setNamespaces({'mysaml' : self.namespace}) #validate Conditions #validate time conditions query = '/mysaml:Assertion/mysaml:Conditions' nodelist = xpath.Evaluate(query, context=xPath) if nodelist and nodelist[0]: node = nodelist[0] self.metadata[META_NotBefore] = node.getAttribute('NotBefore') self.metadata[META_NotOnOrAfter] = node.getAttribute('NotOnOrAfter') if (not checkDateConditions(self.metadata[META_NotBefore], self.metadata[META_NotOnOrAfter])): currentTime = datetime.datetime.utcnow() self.eventLog.add_event( 'Token range [%s - %s] does not include current time %s' % ( self.metadata[META_NotBefore], self.metadata[META_NotOnOrAfter], currentTime.isoformat()), event.ERROR, 'validate-time-conditions-inrange') if ((not self.metadata.has_key(META_NotBefore)) or (not self.metadata.has_key(META_NotOnOrAfter))): self.eventLog.add_event("Security Token does not contain "\ "both NotBefore and NotOnOrAfter conditions", event.ERROR, 'validate-time-conditions-present') query = '/mysaml:Assertion/mysaml:Conditions/mysaml:AudienceRestrictionCondition/mysaml:Audience' nodelist = xpath.Evaluate(query, context=xPath) for node in nodelist: self.metadata[META_Audience] = node.firstChild.data self._doesMatchFuzzy(self.options.get(META_Audience), self.metadata[META_Audience], 'Audience', 'mismatched-audience') break if not self.metadata.has_key(META_Audience): self.eventLog.add_event("Your IdP did not include an Audience "\ "restriction in the Security Token.", event.ERROR, 'validate-audience-present') query = '/mysaml:Assertion/mysaml:AttributeStatement/mysaml:Subject/mysaml:SubjectConfirmation/mysaml:ConfirmationMethod' nodelist = xpath.Evaluate(query, context=xPath) for node in nodelist: self.metadata[META_SubjectConfirmation] = node.firstChild.data if (not self._doesMatchFuzzy(BEARER_TOKEN, self.metadata[META_SubjectConfirmation], None, None) and not self._doesMatchFuzzy(HOLDER_OF_KEY_TOKEN, self.metadata[META_SubjectConfirmation], None, None) and not self._doesMatchFuzzy(SENDER_VOUCHES_TOKEN, self.metadata[META_SubjectConfirmation], None, None)): self.eventLog.add_event("Passed configuration data \"%s\" doesn't match "\ "expected configuration data\"%s\"" % (self.metadata[META_SubjectConfirmation], 'holder-of-key or bearer or sender vouches'), event.ERROR, 'mismatched-confirmation') break if not self.metadata.has_key(META_SubjectConfirmation): self.eventLog.add_event("Security Token either does not contain, or this RP does not "\ "understand the passed Subject Confirmation Method.", event.ERROR, 'validate-confirmation-present') #Check for all required claims rClaims = None oClaims = None if self.options.get(OPTION_optional_claims): oClaims = self.options.get(OPTION_optional_claims).rsplit(' ') if self.options.get(OPTION_required_claims): rClaims = self.options.get(OPTION_required_claims).rsplit(' ') for claim in rClaims: if claim and len(claim) and not self.getAssertionValues(claim): self.eventLog.add_event("Security Token does not contain "\ "required claim: \"%s\"" % (claim), event.ERROR, 'validate-required-claims-present') #check for extra claims claims = self.getAssertionValues() if claims: for claimName in claims.keys(): if ((not rClaims and not oClaims) or ((not rClaims or (rClaims and claimName not in rClaims)) and (not oClaims or (oClaims and claimName not in oClaims)))): self.eventLog.add_event("Security Token contains "\ "unrequested claim: \"%s\"" % (claimName), event.ERROR, 'validate-minimal-claims-present') #This check acully enforces the isvalid by ensuring there are no errors #Note what is shown in the code as error, info, fatal may not match #the current configuration if self.eventLog.has_severity(event.ERROR): return False return True def _doesMatchFuzzy(self, expected, got, clearTag, eventTag): """Lienient matching for items which may be allowed to be missing returns true if item is not required or matches. returns False and logs the event on failure. """ if not got or not expected or expected == got or re.match(expected, got): return True else: if eventTag and clearTag: self.eventLog.add_event("Passed %s \"%s\" doesn't match "\ "expected %s \"%s\"" % (clearTag, got, clearTag, expected), event.ERROR, eventTag) return False; def _multivaluedAllowd(self, name, ns): """ See if this attribute is allowed to be multivalued""" claim = ns + '/' + name multiClaims = self.options.get(OPTION_multivalued_claims) if multiClaims and (re.search(claim, multiClaims) is not None): return True return False def _setupAssertions(self): """Build a cache of assertion names/values Todo:: handle namespaces, claim name mapping, empty value handling and value attributes! at compleation self.assertions is setup. Assertions should be accessed by calling getAssertionValues, not by accessing self.assertions. """ if not self.assertions or len(self.assertions) == 0: xPath = xpath.CreateContext(self.decrypted) xPath.setNamespaces({'mysaml' : self.namespace}) query = '/mysaml:Assertion/mysaml:AttributeStatement/mysaml:Attribute' nodelist = xpath.Evaluate(query, context=xPath) for node in nodelist: ns = node.getAttribute('AttributeNamespace').rstrip('/') name = node.getAttribute('AttributeName') if (name): for child in node.childNodes: value = '' if (child.localName == 'AttributeValue'): if not child.firstChild or not child.firstChild.data: self.eventLog.add_event("Empty data value for %s/%s " \ % (ns, name), event.INFO, 'parse-assertions-empty-value') else: value = child.firstChild.data nsDict = self.assertions.get(ns, None) if not nsDict: # self.eventLog.add_event("First NS, adding %s/%s : %s" \ # % (ns, name, value), # event.INFO, 'parse-assertions') self.assertions[ns] = {name:value} else: if nsDict.get(name, None) is not None: if (self._multivaluedAllowd(name, ns)): values = nsDict[name] if isinstance(values, list): values.append(value) else: nsDict[name] = [values, value] else: self.eventLog.add_event("Ignoring multi-valued %s/%s : %s" \ % (ns, name, value), event.ERROR, 'parse-assertions') else: # self.eventLog.add_event("Adding %s/%s : %s" \ # % (ns, name, value), # event.INFO, 'parse-assertions') nsDict[name] = value def getAssertionValues(self, identifier=None): """Allows retrivial of any claim or assertion associated with the security token Todo:: visit and finish this function Returns either the data or None """ #is the identifier a URI or just a short name? ns = None claim = None try: if identifier: s = identifier.rsplit('/', 1) if len(s) > 1: ns = s[0] claim = s[-1] except Exception: claim = identifier # self.eventLog.add_event("Looking for ns: \"%s\" claim: \"%s\" " \ # % (ns, claim), # event.INFO, 'get-assertions') claims = dict() for nsElem in self.assertions.keys(): # self.eventLog.add_event("Comparing for requested ns: \"%s\" to : \"%s\" " \ # % (ns, nsElem), # event.INFO, 'get-assertions') if not ns or (ns and ns == nsElem): nsDict = self.assertions[nsElem] for key in nsDict.keys(): if claim and claim == key: return nsDict[key] elif not claim: if ns or (not ns and not claim): claims[nsElem+'/'+key] = nsDict[key] else: claims[key] = nsDict[key] if claims.items(): return claims def getMetaDataValues(self, identifier=None): """Allows retrivial of any meta data associated with the security token Note: currently all meta data is single valued! Pass a specific string for the identifier and receive either a string or None If the identifier is None then a dictionary of all meta data is returned """ if identifier: try: return self.metadata[identifier] except Exception: return None else: return self.metadata def _getCardKeyHash(self): """Internal routine to generate a generic cardkey hash, the result is stored in self.metadata[META_CardKeyHash] """ try: return self.metadata[META_CardKeyHash] except Exception: m = hashlib.md5() signer = None ppid = self.getAssertionValues('http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier') # ppid = self.getAssertionValues('privatepersonalidentifier') if ppid: m.update(ppid) if self.objKeyInfo and self.objKeyInfo.isEncrypted: if self.objKeyInfo.modulus: m.update(self.objKeyInfo.modulus) if self.objKeyInfo.exponent: m.update(self.objKeyInfo.exponent) self.metadata[META_CardKeyHash] = m.hexdigest() return self.metadata[META_CardKeyHash] def checkDateConditions(start=None, end=None, difference=300): """Validate a datetime is in range. difference in seconds allows for clock skew. Todo:: need work on ISO date checking - skip for now returns True if time is in range False if not in range """ currentTime = datetime.datetime.now() if (start): startTime = datetime.datetime.fromtimestamp(xml.utils.iso8601.parse(start)) # Allow for a difference in clock syncronization, d = datetime.timedelta(minutes=difference) if ((not startTime) or ((startTime - d) > currentTime)): return False if (end): endTime = datetime.datetime.fromtimestamp(xml.utils.iso8601.parse(end)) if ((not endTime) or (endTime <= currentTime)): return False return True def loadDecryptionKey(secToken, objKey): """ callback function for loading keys as needed """ objKey.loadKeyInfo( secToken.options.get(OPTION_CryptoKeyPublic), secToken.options.get(OPTION_CryptoCert), secToken.options.get(OPTION_CryptoKeyIsFile)) objKey.loadKey( secToken.options.get(OPTION_CryptoKey), secToken.options.get(OPTION_CryptoKeyPass), secToken.options.get(OPTION_CryptoKeyIsFile), secToken.options.get(OPTION_CryptoKeyIsCert))