# Copyright (c) 2007,2008 Novell, Inc. # All Rights Reserved. # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public License as # published by the Free Software Foundation; version 2.1 of the license. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library; if not, contact Novell, Inc. # # To contact Novell about this file by physical or electronic mail, # you may find current contact information at www.novell.com # # This file incorporates work covered by the following copyright and # permission notice: # # This file originated from Robert Richards CDATA blog, # downloaded from http://www.cdatazone.org/index.php?/pages/source.html # http://www.cdatazone.org/files/pubdomain.tar.gz # According to blog entry # http://www.cdatazone.org/index.php?/archives/26-Catching-Up.html # posted Thursday, July 5. 2007 the libraries have been released under a # bsd style license. # # Copyright (c) 2007, Robert Richards . # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in # the documentation and/or other materials provided with the # distribution. # # * Neither the name of Robert Richards nor the names of his # contributors may be used to endorse or promote products derived # from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # import base64, binascii from Crypto.Hash import SHA, SHA256, RIPEMD from Crypto.Cipher import AES, DES3 from M2Crypto import EVP, RSA, util, BIO, X509, m2 from xml.dom import minidom import xml.dom.ext.c14n from xml import xpath from urlparse import urlparse import event from event import Event, BasicEventLog def report_event(eventLog, text, severity = event.NOTSET, tag = None): """ module level function for either remembering the event or throwing an exception if eventlonging is not configured. """ if eventLog: eventLog.add_event(text, severity, tag) elif severity >= event.ERROR: raise Exception(text) class XMLSecurityKey: """Class for encrypting/decrypting data, and remembering information about . the methods and keys for doing so. Hides from the caller the details of which library (openssl or mcrypt) is used for a particular cipher. """ TRIPLEDES_CBC = 'http://www.w3.org/2001/04/xmlenc#tripledes-cbc' AES128_CBC = 'http://www.w3.org/2001/04/xmlenc#aes128-cbc' AES192_CBC = 'http://www.w3.org/2001/04/xmlenc#aes192-cbc' AES256_CBC = 'http://www.w3.org/2001/04/xmlenc#aes256-cbc' RSA_1_5 = 'http://www.w3.org/2001/04/xmlenc#rsa-1_5' RSA_OAEP_MGF1P = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p' RSA_SHA1 = 'http://www.w3.org/2000/09/xmldsig#rsa-sha1' DSA_SHA1 = 'http://www.w3.org/2000/09/xmldsig#dsa-sha1' def __init__(self, type, eventLog = None, params=None): self.type = 0 self.key = None self.passphrase = "" self.eventLog = eventLog self.iv = None self.name = None self.keyChain = None self.isEncrypted = False self.encryptedCtx = None self.cryptParams = {'mode':None, 'cipher':None, 'padding':None, 'type':None, 'hash':None} if (type == XMLSecurityKey.TRIPLEDES_CBC): self.cryptParams['library'] = 'mcrypt' self.cryptParams['cipher'] = DES3 self.cryptParams['mode'] = DES3.MODE_CBC self.cryptParams['method'] = 'http://www.w3.org/2001/04/xmlenc#tripledes-cbc' elif (type == XMLSecurityKey.AES128_CBC): self.cryptParams['library'] = 'mcrypt' self.cryptParams['cipher'] = AES self.cryptParams['mode'] = AES.MODE_CBC self.cryptParams['method'] = 'http://www.w3.org/2001/04/xmlenc#aes128-cbc' self.cryptParams['type'] = 'AES' elif (type == XMLSecurityKey.AES192_CBC): self.cryptParams['library'] = 'mcrypt' self.cryptParams['cipher'] = AES self.cryptParams['mode'] = AES.MODE_CBC self.cryptParams['method'] = 'http://www.w3.org/2001/04/xmlenc#aes192-cbc' self.cryptParams['type'] = 'AES' elif (type == XMLSecurityKey.AES256_CBC): self.cryptParams['library'] = 'mcrypt' self.cryptParams['cipher'] = AES self.cryptParams['mode'] = AES.MODE_CBC self.cryptParams['method'] = 'http://www.w3.org/2001/04/xmlenc#aes256-cbc' self.cryptParams['type'] = 'AES' elif (type == XMLSecurityKey.RSA_1_5): self.cryptParams['library'] = 'openssl' self.cryptParams['padding'] = RSA.pkcs1_padding self.cryptParams['method'] = 'http://www.w3.org/2001/04/xmlenc#rsa-1_5' if isinstance(params, dict): if (params['type'] == 'public' or params['type'] == 'private'): self.cryptParams['type'] = params['type'] elif (type == XMLSecurityKey.RSA_OAEP_MGF1P): self.cryptParams['library'] = 'openssl'; self.cryptParams['padding'] = RSA.pkcs1_oaep_padding; self.cryptParams['method'] = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p'; self.cryptParams['hash'] = None; if isinstance(params, dict): if (params['type'] == 'public' or params['type'] == 'private'): self.cryptParams['type'] = params['type'] elif (type == XMLSecurityKey.RSA_SHA1): self.cryptParams['library'] = 'openssl' self.cryptParams['method'] = 'http://www.w3.org/2000/09/xmldsig#rsa-sha1' if isinstance(params, dict): if (params['type'] == 'public' or params['type'] == 'private'): self.cryptParams['type'] = params['type'] else: return None self.type = type self.modulus =None self.exponent = None self.RefEncodingType = None self.RefValueType = None self.RefValue = None self.cert = None self.pubKey = None return None # def generateSessionKey(self): # key = '' # if (len(self.cryptParams['cipher']) > 0 and len(self.cryptParams['mode']) > 0): # keysize = mcrypt_module_get_algo_key_size(self.cryptParams['cipher']) # # Generating random key using iv generation routines # if ((keysize > 0) and (td = mcrypt_module_open(MCRYPT_RIJNDAEL_256, '',self.cryptParams['mode'], ''))): # if (self.cryptParams['type'] == 'AES'): # keysize = 16 # if self.type == XMLSecurityKey.AES256_CBC: # keysize = 32 # elif self.type == XMLSecurityKey.AES192_CBC: # keysize = 24 # while (len(key) < keysize): # key += mcrypt_create_iv(mcrypt_enc_get_iv_size (td),MCRYPT_RAND); # mcrypt_module_close(td) # key = substr(key, 0, keysize) # self.key = key # return key def loadKeyInfo(self, pubKey=None, cert=None, isFile = False): """ Setup as much information about the key as possible so we can gather and verify thumbprints and the like. """ if isFile and cert: fp = open(cert, 'rb') self.cert = fp.read() fp.close() else: self.cert = cert if isFile and pubKey: fp = open(pubKey, 'rb') self.pubKey = fp.read() fp.close() else: self.pubKey = pubKey def loadKey(self, key, passPhrase = None, isFile=False, isCert = True): """ setup key information in object This function fails if key can't be loaded succesfully Returns None on success and failure, reports errors to the configured event log. """ if isFile: fp = open(key, 'rb') key = fp.read() fp.close() self.passphrase = passPhrase if (self.cryptParams['library'] == 'openssl'): if (self.cryptParams['type'] == 'public'): if isCert: self.key = X509.load_cert_string(key).get_pubkey() else: bio = BIO.MemoryBuffer(key) self.key = EVP.PKey() self.key.assign_rsa(RSA.load_pub_key_bio(bio)) else: if isCert: self.key = RSA.load_key_string(key) else: if self.passphrase == None: self.key = RSA.load_key_string(key) else: self.key = RSA.load_key_string(key, self.passphrase_callback) elif (self.cryptParams['type'] == 'AES'): # Check key length if (self.type == XMLSecurityKey.AES256_CBC): if (len(key) < 25): report_event(self.eventLog, "XMLSecurityKey.loadkey for "\ "%s::%s cipher too short %u, should be at least 25 characters" \ % (self.cryptParams['library'], self.cryptParams['type'], len(key)), event.ERROR, 'cipher-size-invalid') elif (self.type == XMLSecurityKey.AES192_CBC): if (len(key) < 17): report_event(self.eventLog, "XMLSecurityKey.loadkey for "\ "%s::%s cipher too short %u, should be at least 25 characters" \ % (self.cryptParams['library'], self.cryptParams['type'], len(key)), event.ERROR, 'cipher-size-invalid') self.key = key if self.key: self.verifyKeyThumbprint() if not self.key: report_event(self.eventLog, "XMLSecurityKey.loadkey for %s::%s no key loaded" \ % (self.cryptParams['library'], self.cryptParams['type']), event.ERROR, 'unsupported-key-load') return None def encryptMcrypt(self, data): """Encrypted using the mcrypt library This function should not be called directly, rather call encryptData and let that function switch based on the configured library. Todo:: Not currently called, appears to be incomplete, possibly containing some phpisms """ td = mcrypt_module_open(self.cryptParams['cipher'], '', self.cryptParams['mode'], '') self.iv = mcrypt_create_iv(mcrypt_enc_get_iv_size(td), MCRYPT_RAND) mcrypt_generic_init(td, self.key, self.iv) encrypted_data = self.iv.mcrypt_generic(td, data) mcrypt_generic_deinit(td) mcrypt_module_close(td) return encrypted_data def decryptMcrypt(self, data): """Decrypt using the mcrypt library This function should not be called directly, rather call decryptData and let that function switch based on the configured library. Todo:: currently only supports MCRYPT_MODE_CBC returns None on failure, decrypted data on success. No events logged """ dCipher = self.cryptParams['cipher'] iv_length = dCipher.block_size self.iv = data[0:iv_length] data = data[iv_length:len(data)] #if (self.cryptParams['mode'] == MCRYPT_MODE_CBC): # For now only CBC supported so remove test pad = len(data) % iv_length data += '0'*(iv_length - pad) #End CBC only code td = dCipher.new(self.key, self.cryptParams['mode'], self.iv) decrypted_data = td.decrypt(data) if (not decrypted_data): return None #if (self.cryptParams['mode'] == MCRYPT_MODE_CBC): # For now only CBC supported so remove test #first remove any padding we have added dataLen = len(decrypted_data) newdataLen = dataLen-iv_length+pad decrypted_data = decrypted_data[0:newdataLen] #remove any CBC padding paddingLength = decrypted_data[newdataLen-1:newdataLen] decrypted_data = decrypted_data[0:newdataLen - ord(paddingLength)] #End CBC only code return decrypted_data def encryptOpenSSL(self, data): """Encrypt using the openssl library functions. This function should not be called directly, rather call encryptData and let that function switch based on the configured library. returns None & events logged on failure, encrypted data on success. """ if (self.cryptParams['type'] == 'public'): encrypted_data = self.key.public_encrypt(data, self.cryptParams['padding']) if not encrypted_data: report_event(self.eventLog, "Failure encrypting data (public_encrypt)" \ "padding : \"%s\" data: \"%s\" key: \"%s\"" % (self.cryptParams['padding'], data, self.key), event.ERROR, 'encryptOpenSSL-public-encrypt') return None else: encrypted_data = self.key.private_encrypt(data, self.cryptParams['padding']) if not encrypted_data: report_event(self.eventLog, "Failure encrypting data (private_encrypt)" \ "padding : \"%s\" data: \"%s\" key: \"%s\"" % (self.cryptParams['padding'], data, self.key), event.ERROR, 'encryptOpenSSL-private-encrypt') return None return encrypted_data def decryptOpenSSL(self, data): """Decrypt using the openssl library functions. This function should not be called directly, rather call decryptData and let that function switch based on the configured library. returns None & events logged on failure, decrypted data on success. """ if (self.cryptParams['type'] == 'public'): decrypted = self.key.public_decrypt(data, self.cryptParams['padding']) if (not decrypted): report_event(self.eventLog, "Failure decrypting data (public_decrypt)" \ "padding : \"%s\" data: \"%s\"" % (self.cryptParams['padding'], data), event.ERROR, 'decryptOpenSSL-public-decrypt') return None else: decrypted = self.key.private_decrypt(data, self.cryptParams['padding']) if (not decrypted): report_event(self.eventLog, "Failure decrypting data (private_decrypt)" \ "padding : \"%s\" data: \"%s\"" % (self.cryptParams['padding'], data), event.ERROR, 'decryptOpenSSL-private-decrypt') return None return decrypted def signOpenSSL(self, data): """Sign data using openssl library Todo:: log events. returns signed data on success, None and no events logged on failure """ pKey = self.key pKey.sign_init() pKey.sign_update(data) return pKey.sign_final() def verifyOpenSSL(self, data, signature): """Verify signature on signed data usins openssl library. Todo:: log events. """ pKey = self.key pKey.verify_init() pKey.verify_update(data) return m2.verify_final(pKey.ctx, signature, pKey.pkey) def encryptData(self, data): """Encrypt data using the configured library and cipher returns encrypted data on success, on failure returns None and logs events. """ if (self.cryptParams['library'] == 'mcrypt'): return self.encryptMcrypt(data) elif (self.cryptParams['library'] == 'openssl'): return self.encryptOpenSSL(data) else: report_event(self.eventLog, "encryptData does not support the " \ "currently configured library : \"%s\" " % (self.cryptParams['library']), event.ERROR, 'encryptData-no-supported-lib') def decryptData(self, data): """Decrypt data using the configured library and cipher returns decrypted data on success, on failure returns None and logs events. """ if (self.cryptParams['library'] == 'mcrypt'): return self.decryptMcrypt(data) elif (self.cryptParams['library'] == 'openssl'): return self.decryptOpenSSL(data) else: report_event(self.eventLog, "decryptData does not support the " \ "currently configured library : \"%s\" " % (self.cryptParams['library']), event.ERROR, 'decryptData-no-supported-lib') def signData(self, data): """Sign data using the configured library and cipher returns signature on success, on failure returns None and logs events. """ if (self.cryptParams['library'] == 'openssl'): return self.signOpenSSL(data) report_event(self.eventLog, "signData does not support the " \ "currently configured library : \"%s\" " % (self.cryptParams['library']), event.ERROR, 'signData-no-supported-lib') def verifySignature(self, data, signature): """Validates a signature using the configured library and cipher returns on failure returns None and logs events. """ if (self.cryptParams['library'] == 'openssl'): return self.verifyOpenSSL(data, signature) report_event(self.eventLog, "verifySignature does not support the " \ "currently configured library : \"%s\" " % (self.cryptParams['library']), event.ERROR, 'verifySignature-no-supported-lib') def verifyKeyThumbprint(self): if self.key and self.RefEncodingType and self.RefValueType and self.RefValue: # there is currently some confusion about what the thumbprint is # over, the public key or the cert? if not self.pubKey and not self.cert: report_event(self.eventLog, "XMLSecurityKey.verifyKeyThumbprint"\ " no public key or signing cert configured, unable"\ " to validate %s " % (self.RefValue), event.INFO, 'unconfigured-SecurityTokenReference') else: pubKeySig =self.generateKeyThumbPrints(self.pubKey, False, self.RefValueType, self.RefEncodingType) certSig = self.generateKeyThumbPrints(self.cert, True, self.RefValueType, self.RefEncodingType) if (self.RefValue not in pubKeySig) and (self.RefValue not in certSig): report_event(self.eventLog, "XMLSecurityKey.verifyKeyThumbprint"\ " unable to validate %s using %s, %s" % ( self.RefValue, self.RefValueType, self.RefEncodingType), event.ERROR, 'mismatched-SecurityTokenReference') for elem in pubKeySig: report_event(self.eventLog, "XMLSecurityKey.verifyKeyThumbprint pub key"\ " computed %s -- %s " % (elem, binascii.hexlify(elem)), event.DEBUG, 'SecurityTokenReference') for elem in certSig: report_event(self.eventLog, "XMLSecurityKey.verifyKeyThumbprint cert sig"\ " computed %s -- %s " % (elem, binascii.hexlify(elem)), event.DEBUG, 'SecurityTokenReference') def _generateKeyThumbPrint(self, key, method, encoding): data = None if key: if method == 'http://docs.oasis-open.org/wss/oasis-wss-soap-message-security-1.1#ThumbprintSHA1': alg = SHA.new() alg.update(key) data = alg.digest() if encoding and encoding == 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary': data = base64.b64encode(data) if not data: report_event(self.eventLog, "XMLSecurityKey._generateKeyThumbPrint"\ " unable to generate KeyThumbPrint %s using %s, %s" % ( key, method, encoding), event.INFO, 'generateKeySignature') return data def generateKeyThumbPrints(self, key, isCert, method, encoding): """ Assumes we are dealing with public keys/certs""" data = [] if key: # data.append(self._generateKeyThumbPrint(key, method, encoding)) if isCert: cert = X509.load_cert_string(key) # data.append(self._generateKeyThumbPrint(cert.as_pem(), method, encoding)) # data.append(self._generateKeyThumbPrint(cert.as_text(), method, encoding)) data.append(self._generateKeyThumbPrint(cert.as_der(), method, encoding)) # data.append(self._generateKeyThumbPrint(cert.as_pem(), method, None)) # data.append(self._generateKeyThumbPrint(cert.as_text(), method, None)) # data.append(self._generateKeyThumbPrint(cert.as_der(), method, None)) else: bio = BIO.MemoryBuffer(key) tkey = EVP.PKey() tkey.assign_rsa(RSA.load_pub_key_bio(bio)) data.append(self._generateKeyThumbPrint(tkey.as_der(), method, encoding)) data.append(self._generateKeyThumbPrint(tkey.as_der(), method, None)) return data def getAlgorith(self): """Returns a string with currently configured encrypt/decrypt method.""" return self.cryptParams['method'] def makeAsnSegment(type, instring): """ """ if (type == 0x02): if (ord(instring[0]) > 0x7f): instring = chr(0)+instring elif (type == 0x03): instring = chr(0)+instring length = len(instring) if (length < 128): output = "%c%c%s" % (type, length, instring) elif (length < 0x0100): output = "%c%c%c%s" % (type, 0x81, length, instring) elif (length < 0x010000): output = "%c%c%c%c%s" % (type, 0x82, length/0x0100, length%0x0100, instring); else: output = None return output def convertRSA(modulus, exponent): """Convert a modulus and exponent into an ASN publicKeyInfo Modulus and Exponent must already be base64 decoded """ exponentEncoding = XMLSecurityKey.makeAsnSegment(0x02, exponent) modulusEncoding = XMLSecurityKey.makeAsnSegment(0x02, modulus) sequenceEncoding = XMLSecurityKey. makeAsnSegment(0x30, modulusEncoding+exponentEncoding) bitstringEncoding = XMLSecurityKey.makeAsnSegment(0x03, sequenceEncoding) rsaAlgorithmIdentifier = util.h2b("300D06092A864886F70D0101010500") publicKeyInfo = XMLSecurityKey.makeAsnSegment(0x30, rsaAlgorithmIdentifier + bitstringEncoding) # encode the publicKeyInfo in base64 and add PEM brackets publicKeyInfoBase64 = base64.b64encode(publicKeyInfo); encoding = "-----BEGIN PUBLIC KEY-----\n" offset = 0 segment= publicKeyInfoBase64[offset:offset+64] while(segment): encoding = encoding + segment + "\n" offset += 64 segment=publicKeyInfoBase64[offset: offset+64] return encoding + "-----END PUBLIC KEY-----\n" def serializeKey(self, parent): """serialize a key Todo:: implement """ return None makeAsnSegment = staticmethod(makeAsnSegment) convertRSA = staticmethod(convertRSA) def passphrase_callback(*args): """Attempt to get passphrase protected certs working Todo: debug and fix. """ return self.passphrase class XMLSecurityDSig: """ """ XMLDSIGNS = 'http://www.w3.org/2000/09/xmldsig#' SHA1 = 'http://www.w3.org/2000/09/xmldsig#sha1' SHA256 = 'http://www.w3.org/2001/04/xmlenc#sha256' SHA512 = 'http://www.w3.org/2001/04/xmlenc#sha512' RIPEMD160 = 'http://www.w3.org/2001/04/xmlenc#ripemd160' C14N = 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315' C14N_COMMENTS = 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315#WithComments' EXC_C14N = 'http://www.w3.org/2001/10/xml-exc-c14n#' EXC_C14N_COMMENTS = 'http://www.w3.org/2001/10/xml-exc-c14n#WithComments' template = '\ \ \ \ ' def __init__(self, eventLog = None): """Setup digital signature object""" self.eventLog = eventLog self.idKeys = {} self.idNS = {} #signedInfo -> cannonical version of signed data self.signedInfo = None self.xPathCtx = None self.canonicalMethod = None self.prefix = 'ds' self.searchpfx = 'secdsig' sigdoc = minidom.parseString(XMLSecurityDSig.template) self.sigNode = sigdoc.documentElement def getXPathObj(self): if (not self.xPathCtx and self.sigNode): xPath = xpath.CreateContext(self.sigNode) xPath.setNamespaces({'secdsig' : XMLSecurityDSig.XMLDSIGNS}) self.xPathCtx = xPath return self.xPathCtx # TODO: Convert generate_GUID def generate_GUID(prefix=None): uuid = md5(uniqid(rand(), True)) guid = prefix.substr(uuid,0,8)+"-" + \ substr(uuid,8,4)+"-" + \ substr(uuid,12,4)+"-" + \ substr(uuid,16,4)+"-" + \ substr(uuid,20,12) return guid def locateSignature(self, objDoc): if (objDoc.nodeType == objDoc.DOCUMENT_NODE): doc = objDoc else: doc = objDoc.ownerDocument if doc: xPath = xpath.CreateContext(doc) xPath.setNamespaces({'secdsig' : XMLSecurityDSig.XMLDSIGNS}) query = ".//secdsig:Signature" nodeset = xpath.Evaluate(query, contextNode=objDoc, context=xPath) if nodeset: self.sigNode = nodeset[0] return self.sigNode; return None def createNewSignNode(self, name, value=None): """ """ doc = self.sigNode.ownerDocument if (not is_null(value)): node = doc.newChild(XMLSecurityDSig.XMLDSIGNS, self.prefix+':'+name, value) else: node = doc.newChild(XMLSecurityDSig.XMLDSIGNS, self.prefix+':'+name) return node def setCanonicalMethod(self, method): """ """ if (method == 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315' or \ method == 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315#WithComments' or \ method == 'http://www.w3.org/2001/10/xml-exc-c14n#' or \ method == 'http://www.w3.org/2001/10/xml-exc-c14n#WithComments'): self.canonicalMethod = method return True else: return False xpath = self.getXPathObj() if (xpath): query = './' + self.searchpfx + ':SignedInfo' nodeset = xpath.xpathEval(query) sinfo = nodeset[0] if (sinfo): query = './' + self.searchpfx + 'CanonicalizationMethod' xpath.setContextNode(sinfo) nodeset = xpath.xpathEval(query) canonNode = nodeset-[0] if (not canonNode): canonNode = self.createNewSignNode('CanonicalizationMethod') sinfo.insertBefore(canonNode, sinfo.firstChild) canonNode.setAttribute('Algorithm', self.canonicalMethod) def canonicalizeData(self, node, canonicalmethod, inclusiveNameSpacePrefixes=None): exclusive = 0 withComments = 0 if (canonicalmethod == 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315'): exclusive = 0; withComments = 0; elif (canonicalmethod == 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315#WithComments'): withComments = 1; elif (canonicalmethod == 'http://www.w3.org/2001/10/xml-exc-c14n#'): exclusive = 1; elif (canonicalmethod == 'http://www.w3.org/2001/10/xml-exc-c14n#WithComments'): exclusive = 1; withComments = 1; else: report_event(self.eventLog, "canonicalizeData requested for " \ "unsupported algorithm : %s" % (canonicalmethod), event.ERROR, 'calculate-canonical') if (exclusive == 1): return xml.dom.ext.c14n.Canonicalize(node, None, comments=withComments, unsuppressedPrefixes=inclusiveNameSpacePrefixes) else: return xml.dom.ext.c14n.Canonicalize(node, None, comments=withComments) def canonicalizeSignedInfo(self): doc = self.sigNode.ownerDocument canonicalmethod = None if (doc): xPath = self.getXPathObj() query = "./secdsig:SignedInfo" nodeset = xpath.Evaluate(query, contextNode=self.sigNode, context=xPath) if (nodeset and nodeset[0]): signInfoNode = nodeset[0] canonNode = signInfoNode.firstChild while (canonNode and \ (canonNode.localName != 'CanonicalizationMethod') and \ (canonNode.namespaceURI != XMLSecurityDSig.XMLDSIGNS)): canonNode = canonNode.nextSibling if (canonNode): canonicalmethod = canonNode.getAttribute('Algorithm') self.signedInfo = self.canonicalizeData(signInfoNode, canonicalmethod) return self.signedInfo return None def calculateDigest (self, digestAlgorithm, data): if (digestAlgorithm == XMLSecurityDSig.SHA1): alg = SHA.new() elif (digestAlgorithm == XMLSecurityDSig.SHA256): alg = SHA256.new() # elif (digestAlgorithm == XMLSecurityDSig.SHA512): #not implemented yet # return None elif (digestAlgorithm == XMLSecurityDSig.RIPEMD160): alg = RIPEMD.new() else: report_event(self.eventLog, "Digest Calcluation requested for " \ "unsupported algorithm : %s" % (digestAlgorithm), event.ERROR, 'calculate-digest') return None alg.update(data) hashed = alg.digest() return base64.b64encode(hashed) def validateDigest(self, refNode, data, dataObject): xPath = xpath.CreateContext(refNode) xPath.setNamespaces({'secdsig' : XMLSecurityDSig.XMLDSIGNS}) query = 'string(./secdsig:DigestMethod/@Algorithm)' digestAlgorithm = xpath.Evaluate(query, contextNode=refNode, context=xPath) digValue = self.calculateDigest(digestAlgorithm, data) query = 'string(./secdsig:DigestValue)' digestValue = xpath.Evaluate(query, contextNode=refNode, context=xPath) if digValue != digestValue: report_event(self.eventLog, "%s Digest Validation calculated \""\ "%s\" != \"%s\"\nCalculated: \"%s\"\nSent: \"%s\"\n " \ "The digest calculated on untransformed data is : \"%s\"" % (digestAlgorithm, digValue, digestValue, data, dataObject.toxml(), self.calculateDigest(digestAlgorithm, dataObject.toxml()) ), event.ERROR, 'validate-digest') return (digValue == digestValue) def processTransforms(self, signatureNode, signedDataNode): """ Transform the sent data using the same methods as the sender to get a cannonacal form Todo:: verify that everything within this transformed data only uses the inclusive namespaces, currently the code elects to 'pass' """ transformedData = signedDataNode xPath = xpath.CreateContext(signatureNode) xPath.setNamespaces({'secdsig' : XMLSecurityDSig.XMLDSIGNS}) query = './secdsig:Transforms/secdsig:Transform' nodelist = xpath.Evaluate(query, contextNode=signatureNode, context=xPath) canonicalMethod = 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315' for i in range(len(nodelist)): transform = nodelist[i] algorithm = transform.getAttribute("Algorithm") if (algorithm == 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315' or \ algorithm == 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315#WithComments' or \ algorithm == 'http://www.w3.org/2001/10/xml-exc-c14n#' or \ algorithm == 'http://www.w3.org/2001/10/xml-exc-c14n#WithComments'): canonicalMethod = algorithm break inclusiveNameSpacePrefixes = None nodelist = signatureNode.getElementsByTagNameNS( '*', 'InclusiveNamespaces') for node in nodelist: inclusiveNameSpacePrefixes = node.getAttribute("PrefixList").split(' ') if inclusiveNameSpacePrefixes == None or inclusiveNameSpacePrefixes == []: report_event(self.eventLog, "Signature doesn't have the" \ " InclusiveNamespaces element,"\ " by not including this element your chosen IdP puts you"\ " at risk of a brown bag attack!", event.ERROR, 'signature-missing-InclusiveNameSpaces') if (isinstance(signedDataNode, minidom.Node)): transformedData = self.canonicalizeData(signedDataNode, \ canonicalMethod, inclusiveNameSpacePrefixes) if inclusiveNameSpacePrefixes != None and inclusiveNameSpacePrefixes != []: pass return transformedData def processRefNode(self, refNode): signedObject = None uri = refNode.getAttribute("URI") if (uri): arUrl = urlparse(uri) if (not arUrl[2]): identifier = arUrl[5] if (identifier): if (self.idNS and isinstance(seld.idNS, dict)): for nspf in self.idNS.keys(): ns = self.idNS[nspf] xPath.xpathRegisterNs(nspf, ns) #dfb todo: why was this origially @Id? that didn't always work iDlist = '@*="' + identifier + '"' if isinstance(self.idKeys, dict): for key in self.idKeys.keys(): idKey = self.idKeys[key] iDlist = iDlist + " or @" + idKey + "='" + identifier + "'" query = '//*[' + iDlist + ']' res = xpath.Evaluate(query, contextNode=refNode.ownerDocument) if res: signedObject = res[0] else: report_event(self.eventLog, "Problem Validating reference, query : %s" % (query), event.ERROR, 'validate-reference-location') signedObject = refNode.ownerDocument else: signedObject = refNode.ownerDocument else: signedObject = file_get_contents(arUrl) else: signedObject = refNode.ownerDocument canonicalData = self.processTransforms(refNode, signedObject) if not canonicalData: report_event(self.eventLog, "Problem processing transforms for node: %s" % (signedObject.toXML()), event.INFO, 'validate-reference-transform') return self.validateDigest(refNode, canonicalData, signedObject) def validateReference(self): doc = self.sigNode.ownerDocument if (doc != self.sigNode): self.sigNode.parentNode.removeChild(self.sigNode) xPath = self.getXPathObj() query = "./secdsig:SignedInfo/secdsig:Reference" nodeset = xpath.Evaluate(query, contextNode=self.sigNode, context=xPath) if (not nodeset): return None for i in range(len(nodeset)): refNode = nodeset[i] if (not self.processRefNode(refNode)): report_event(self.eventLog, "Error validating reference node : %s" % (refNode.toprettyxml(' ', '\n'))) return None return True def addRefInternal(self, sinfoNode, node, algorithm, arTransforms=None, options=None): prefix = None prefix_ns = None if isinstance(options, dict): prefix = options['prefix'] prefix_ns = options['prefix_ns'] id_name = options['id_name'] refNode = self.createNewSignNode('Reference') sinfoNode.appendChild(refNode) if (node.nodeType == node.DOCUMENT_NODE): uri = None else: # Do we really need to set a prefix? uri = XMLSecurityDSig.generate_GUID() refNode.setAttribute("URI", '#' + uri) transNodes = self.createNewSignNode('Transforms') refNode.appendChild(transNodes) if isinstance(arTransforms, dict): for i in range(len(arTransforms)): transform = nodelist[i] transNode = self.createNewSignNode('Transform') transNodes.appendChild(transNode) transNode.setAttribute('Algorithm', transform) elif (not empty(self.canonicalMethod)): transNode = self.createNewSignNode('Transform') transNodes.appendChild(transNode) transNode.setAttribute('Algorithm', self.canonicalMethod) if (uri): attname = id_name if (prefix): attname = prefix + ':' + attname node.setAttributeNS(prefix_ns, attname, uri) canonicalData = self.processTransforms(refNode, node); digValue = self.calculateDigest(algorithm, canonicalData); digestMethod = self.createNewSignNode('DigestMethod'); refNode.appendChild(digestMethod); digestMethod.setAttribute('Algorithm', algorithm); digestValue = self.createNewSignNode('DigestValue', digValue) refNode.appendChild(digestValue) def addReference(self, node, algorithm, arTransforms=None, options=None): xpath = self.getXPathObj() if (xpath): query = "./secdsig:SignedInfo" nodeset = xpath.xpathEval(query) sInfo = nodeset[0] if (sInfo): self.addRefInternal(sInfo, node, algorithm, arTransforms, options) def addReferenceList(self, arNodes, algorithm, arTransforms=None, options=None): xpath = self.getXPathObj() if (xpath): query = "./secdsig:SignedInfo" nodeset = xpath.xpathEval(query) sInfo = nodeset[0] if (sInfo): for i in range(len(sInfo)): node = sInfo[i] self.addRefInternal(sInfo, node, algorithm, arTransforms, options) def locateKey(self, node=None): """Find expected key information within an encrypted block returns XMLSecurityKey, which may still require external information before it can be used for decryption.""" if (not node): node = self.sigNode if (not isinstance(node, minidom.Node)): return None doc = node.ownerDocument if (doc): xPath = xpath.CreateContext(node) xPath.setNamespaces({'secdsig' : XMLSecurityDSig.XMLDSIGNS}) query = "string(./secdsig:SignedInfo/secdsig:SignatureMethod/@Algorithm)"; algorithm = xpath.Evaluate(query, contextNode=node, context=xPath) if (algorithm): objKey = XMLSecurityKey(algorithm, self.eventLog, {'type':'public'}) return objKey return None def verify(self, objKey): xPath = xpath.CreateContext(self.sigNode) xPath.setNamespaces({'secdsig' : XMLSecurityDSig.XMLDSIGNS}) query = "string(./secdsig:SignatureValue)" sigValue = xpath.Evaluate(query, contextNode=self.sigNode, context=xPath) if (not sigValue): report_event(self.eventLog, "Unable to locate SignatureValue node," \ " signature not validated", event.ERROR, 'verify-signature') return None # report_event(self.eventLog, "Verify that signature value: \"%s\"\n is valid over the data: \"%s\"\n"\ # % (base64.b64decode(sigValue), self.signedInfo), # event.DEBUG, 'verify-signature') return objKey.verifySignature(self.signedInfo, base64.b64decode(sigValue)) def signData(self, objKey, data): return objKey.signData(data) def sign(self, objKey): xpath = self.getXPathObj() if (xpath): query = "./secdsig:SignedInfo"; nodeset = xpath.xpathEval(query) sInfo = nodeset[0] if (sInfo): query = "./secdsig:SignatureMethod" nodeset = xpath.xpathEval(query) sMethod = nodeset[0] sMethod.setAttribute('Algorithm', objKey.type) data = self.canonicalizeData(sInfo, self.canonicalMethod) sigValue = base64.b64encode(self.signData(objKey, data)) sigValueNode = self.createNewSignNode('SignatureValue', sigValue) infoSibling = sInfo.nextSibling if (infoSibling): infoSibling.parent.insertBefore(sigValueNode, infoSibling) else: self.sigNode.appendChild(sigValueNode) def appendCert(self): return None def appendKey(self, objKey, parent=None): objKey.serializeKey(parent) def appendSignature(self, parent, insertBefore = False): if (parent.nodeType == parent.DOCUMENT_NODE): baseDoc = parent else: parent.ownerDocument newSig = baseDoc.importNode(self.sigNode, True) if (insertBefore): parent.insertBefore(newSig, parent.firstChild) else: parent.appendChild(newSig) #todo - not fully converted from php def get509XCert(cert, isPEMFormat=True): """convert from PEM to data""" if (isPEMFormat): data = '' arCert = cert.split("\n") inData = False for curData in arCert: if not inData: if curData.find('-----BEGIN CERTIFICATE') != -1: inData = True else: if curData.find('-----END CERTIFICATE') != -1: break data += curData.strip() else: data = cert return data def get509XCertAsPEM(body): newStr = "-----BEGIN CERTIFICATE-----\n" temp = body.encode('utf-8').replace("\r", "\n") start = 0 end = len(temp) chunk_size = 63 while start < end: newStr += temp[start:start+chunk_size-1] + "\n" start += chunk_size newStr +="-----END CERTIFICATE-----\n" return newStr get509XCert = staticmethod(get509XCert) get509XCertAsPEM = staticmethod(get509XCertAsPEM) def add509Cert(self, cert, isPEMFormat=True): data = XMLSecurityDSig.get509XCert(cert, isPEMFormat) xpath = self.getXPathObj() if (xpath): query = "./secdsig:KeyInfo" nodeset = xpath.xpathEval(query) keyInfo = nodeset[0] if (not keyInfo): inserted = False keyInfo = self.createNewSignNode('KeyInfo') query = "./secdsig:Object" nodeset = xpath.xpathEval(query) sObject = nodeset[0] if (sObject): sObject.parent.insertBefore(keyInfo, sObject) inserted = True if (not inserted): self.sigNode.appendChild(keyInfo) x509DataNode = self.createNewSignNode('X509Data') keyInfo.appendChild(x509DataNode) x509CertNode = self.createNewSignNode('X509Certificate', data) x509DataNode.appendChild(x509CertNode) class XMLSecEnc: """ """ Element = 'http://www.w3.org/2001/04/xmlenc#Element' Content = 'http://www.w3.org/2001/04/xmlenc#Content' URI = 3 XMLENCNS = 'http://www.w3.org/2001/04/xmlenc#' template = "\ \ \ \ " def __init__(self, eventLog = None): self.eventLog = eventLog self.rawNode = None self.type = None self.encdoc = minidom.parseString(XMLSecEnc.template) def setNode(self, node): self.rawNode = node def encryptNode(self, objKey, replace=True): data = '' if (not self.rawNode): report_event(self.eventLog, "Node to encrypt has not been set in encryptNode", event.ERROR, 'encryptNode-not-selfRawNode') return None doc = self.rawNode.ownerDocument xPath = self.encdoc.xpathNewContext() objList = xPath.xpathEval('/xenc:EncryptedData/xenc:CipherData/xenc:CipherValue') cipherValue = objList[0] xPath.xpathFreeContext() if (cipherValue == None): report_event(self.eventLog, "Error locating CipherValue element within template", event.ERROR, 'encryptNode-no-cipherValue') return None if (self.type == XMLSecEnc.Element): data = doc.saveXML(self.rawNode) self.encdoc.getRootElement().setAttribute('Type', XMLSecEnc.Element) elif (self.type == XMLSecEnc.Content): children = self.sawNode.childNodes for i in range(len(children)): data += doc.saveXML(children[i]) self.encdoc.getRootElement().setAttribute('Type', XMLSecEnc.Content) else: report_event(self.eventLog, "Type \"%s\" is currently not supported" % (self.type), event.ERROR, 'encryptNode-unsupported-type') return None encMethod = self.encdoc.getRootElement().appendChild(self.encdoc.createElementNS(XMLSecEnc.XMLENCNS, 'xenc:EncryptionMethod')) encMethod.setAttribute('Algorithm', objKey.getAlgorith()) cipherValue.parent.parent.insertBefore(encMethod, cipherValue.parent) strEncrypt = base64.b64encode(objKey.encryptData(data)) value = self.encdoc.createTextNode(strEncrypt) cipherValue.appendChild(value) if (replace): if (self.type == XMLSecEnc.Element): if (self.rawNode.nodeType == self.rawNode.DOCUMENT_NODE): return self.encdoc importEnc = self.rawNode.ownerDocument.importNode(self.encdoc.getRootElement(), True) self.rawNode.parent.replaceChild(importEnc, self.rawNode) return importEnc elif (self.type == XMLSecEnc.Content): importEnc = self.rawNode.ownerDocument.importNode(self.encdoc.getRootElement(), True) while(self.rawNode.firstChild): self.rawNode.firstChild.unlinkNode() self.rawNode.appendChild(importEnc) def decryptNode(self, objKey, replace=True): data = '' if (not self.rawNode): report_event(self.eventLog, "Node to decrypt has not been set in dcryptNode", event.ERROR, 'decryptNode-not-selfRawNode') return None doc = self.rawNode.ownerDocument xPath = xpath.CreateContext(self.rawNode) xPath.setNamespaces({'xmlencr' : XMLSecEnc.XMLENCNS}) query = "string(./xmlencr:CipherData/xmlencr:CipherValue)" encryptedData = xpath.Evaluate(query, context=xPath) if (encryptedData): encryptedData = base64.b64decode(encryptedData) decrypted = objKey.decryptData(encryptedData) if (replace): if (self.type == XMLSecEnc.Element): newdoc = minidom.parseString(decrypted) if (self.rawNode.nodeType == newdoc.DOCUMENT_NODE): return newdoc importEnc = self.rawNode.ownerDocument.importNode(newdoc.getRootElement(), True) self.rawNode.parent.replaceChild(importEnc, self.rawNode) return importEnc elif (self.type == XMLSecEnc.Content): if (self.rawNode.nodeType == self.rawNode.DOCUMENT_NODE): doc = self.rawNode else: doc = self.rawNode.ownerDocument newFrag = doc.createDOMDocumentFragment() newFrag.appendXML(decrypted) self.rawNode.parent.replaceChild(newFrag, self.rawNode) return self.rawNode.parent return decrypted else: report_event(self.eventLog, "Cannot locate encrypted data", event.ERROR, 'decryptNode-no-encrypteddata') return None def encryptKey(self, srcKey, rawKey, append=True): strEncKey = base64.b64encode(srcKey.encryptData(rawKey.key)) root = self.encdoc.getRootElement() encKey = self.encdoc.createElementNS(XMLSecEnc.XMLENCNS, 'xenc:EncryptedKey') if (append): keyInfo = root.appendChild(self.encdoc.createElementNS('http://www.w3.org/2000/09/xmldsig#', 'dsig:KeyInfo')) keyInfo.appendChild(encKey) encMethod = encKey.appendChild(self.encdoc.createElementNS(XMLSecEnc.XMLENCNS, 'xenc:EncryptionMethod')) encMethod.setAttribute('Algorithm', srcKey.getAlgorith()) if (srcKey.name): keyInfo = encKey.appendChild(self.encdoc.createElementNS('http://www.w3.org/2000/09/xmldsig#', 'dsig:KeyInfo')) keyInfo.appendChild(self.encdoc.createElementNS('http://www.w3.org/2000/09/xmldsig#', 'dsig:KeyName', srcKey.name)) cipherData = encKey.appendChild(self.encdoc.createElementNS(XMLSecEnc.XMLENCNS, 'xenc:CipherData')) cipherData.appendChild(self.encdoc.createElementNS(XMLSecEnc.XMLENCNS, 'xenc:CipherValue', strEncKey)) return None def decryptKey(self, encKey): if (not encKey.isEncrypted): report_event(self.eventLog, "Key is not encrypted", event.ERROR, 'decryptKey-not-isEncrypted') return None if (not encKey.key): report_event(self.eventLog, "Key is missing data to perform the decryption", event.ERROR, 'decryptKey-not-key') return None return self.decryptNode(encKey, False) def locateEncryptedData(self, element): if (element.nodeType == element.DOCUMENT_NODE): doc = element else: doc = element.ownerDocument if (doc): query = "//*[local-name()='EncryptedData' and namespace-uri()='" + XMLSecEnc.XMLENCNS + "']" nodeset = xpath.Evaluate(query, doc) if nodeset and len(nodeset): return nodeset[0] return None def locateKey(self, node=None): if (not node): node = self.rawNode if (not isinstance(node,minidom.Node)): return None xPath = xpath.CreateContext(node) xPath.setNamespaces({'xmlsecenc' : XMLSecEnc.XMLENCNS}) query = ".//xmlsecenc:EncryptionMethod"; nodeset = xpath.Evaluate(query, contextNode=node, context=xPath) encmeth = nodeset[0] if (encmeth): attrAlgorithm = encmeth.getAttribute("Algorithm") return XMLSecurityKey(attrAlgorithm, self.eventLog, {'type':'private'}) return None def staticLocateKeyInfo(objBaseKey=None, node=None, eventLog=None, callBack=None, cbData=None): """ objBaseKey is an XMLSecurityKey, it may be returned with Todo: eleminiate side effects returns None on failure. """ if (not isinstance(node, minidom.Node)): return None xPath = xpath.CreateContext(node) xPath.setNamespaces({'xmlsecdsig' : XMLSecurityDSig.XMLDSIGNS, 'xmlsecenc' : XMLSecEnc.XMLENCNS}) query = "./xmlsecdsig:KeyInfo"; nodeset = xpath.Evaluate(query, contextNode=node, context=xPath) if nodeset and nodeset[0] and len(nodeset) == 1: encmeth = nodeset[0] for child in encmeth.childNodes: if (child.localName == 'KeyName'): if (objBaseKey): objBaseKey.localName = child.firstChild.data elif (child.localName == 'KeyValue'): for keyval in child.childNodes: if (keyval.localName == 'DSAKeyValue'): report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo KeyInfo DSAKeyValue not supported", event.ERROR, 'unsupported-DSAKeyValue') elif (keyval.localName == 'RSAKeyValue'): objBaseKey.modulus = None objBaseKey.exponent = None modulusNode = keyval.getElementsByTagNameNS(XMLSecurityDSig.XMLDSIGNS, 'Modulus') if (modulusNode): objBaseKey.modulus = base64.b64decode(modulusNode[0].firstChild.data) exponentNode = keyval.getElementsByTagNameNS(XMLSecurityDSig.XMLDSIGNS, 'Exponent') if (exponentNode): objBaseKey.exponent = base64.b64decode(exponentNode[0].firstChild.data) if ((not objBaseKey.modulus) or (not objBaseKey.exponent)): report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo"\ " missing Modulus or Exponent on RSAKeyValue ", event.ERROR, 'invalid-digitalsig-missing') publicKey = XMLSecurityKey.convertRSA(objBaseKey.modulus, objBaseKey.exponent) objBaseKey.loadKey(publicKey) else: report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo"\ "KeyInfo KeyValue \"%s\" not supported" % (keyval.localName), event.ERROR, 'unsupported-KeyValue') elif (child.localName == 'RetrievalMethod'): # Not currently supported report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo KeyInfo RetrievalMethod not supported", event.ERROR, 'unsupported-retrievalMethod') elif (child.localName == 'EncryptedKey'): objenc = XMLSecEnc(eventLog) objenc.setNode(child) objKey = objenc.locateKey() if (not objKey): report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo"\ " KeyInfo EncryptedKey "\ "Unable to locate algorithm for this Encrypted Key", event.ERROR, 'unsupported-retrievalMethod') else: XMLSecEnc.staticLocateKeyInfo(objKey, child, eventLog) objKey.isEncrypted = True; if not callBack: objKey.encryptedCtx = objenc return objKey else: callBack(cbData, objKey) key = objenc.decryptKey(objKey) if key: objBaseKey.loadKey(key) elif (child.localName == 'X509Data'): x509certNodes = child.getElementsByTagNameNS('*', 'X509Certificate') if (x509certNodes and (len(x509certNodes) > 0)): count = 0; while count < len(x509certNodes) : x509Cert = x509certNodes[count].firstChild.data if x509Cert: XMLSecurityDSig.get509XCert(x509Cert, False) objBaseKey.loadKey(XMLSecurityDSig.get509XCertAsPEM(x509Cert)) break count = count + 1 else: #todo revisit and find out why this is here. #heh, this feels like a hack and a half objBaseKey.loadKey(_string_to_certstring( child.firstChild.firstChild.data)) elif (child.localName == 'SecurityTokenReference'): #this is actually part of the document www.oasis-open.org/committees/download.php/1048/WSS-SAML-06.pd # and not part of core xml security for keyval in child.childNodes: if (keyval.localName == 'KeyIdentifier'): objBaseKey.RefEncodingType = keyval.getAttribute("EncodingType") objBaseKey.RefValueType = keyval.getAttribute("ValueType") objBaseKey.RefValue = keyval.firstChild.data else: report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo"\ " SecurityTokenReference %s contains unsupported %s" % (keyval.localName, keyval.toxml()), event.ERROR, 'unsupported-SecurityTokenReference') else: report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo"\ " KeyInfo %s is unsupported from %s" % (child.localName, child.toxml()), event.ERROR, 'unsupported-keyinfo') return objBaseKey else: if nodeset and nodeset[0]: report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo"\ " Move along, nothing to see here (no KeyInfo nodes found)", event.INFO) else: report_event(eventLog, "XMLSecEnc.staticLocateKeyinfo"\ " more than one KeyInfo nodes found, not processing any of them.", event.INFO) return None def locateKeyInfo(self, objBaseKey=None, node=None, callBack=None, cbData=None): if (not node): node = self.rawNode return XMLSecEnc.staticLocateKeyInfo(objBaseKey, node, self.eventLog, callBack, cbData) staticLocateKeyInfo = staticmethod(staticLocateKeyInfo)