Changeset 1301 for trunk/rp/trac

Show
Ignore:
Timestamp:
02/20/08 14:13:33 (11 months ago)
Author:
dbuss
Message:

started multivalued claim handling #277, reviewed error messages #285, fixed a problem with ldapstore authentication #336.

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • trunk/rp/trac/infocard_acct/0.11/infocard_acct/ldapstore.py

    r1245 r1301  
    88#  This library is distributed in the hope that it will be useful, 
    99#  but WITHOUT ANY WARRANTY; without even the implied warranty of 
    10 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
     10#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
    1111#  GNU Lesser General Public License for more details. 
    1212  
     
    3131 
    3232class LDAPAssociationStore(Component): 
    33         implements(IAssociationStore) 
    34          
    35         def set_association(self, user, cardkeyhash): 
    36                 """Sets the association for the user. 
    37                 """ 
    38          
    39                  
    40         def check_association(self, cardkeyhash): 
    41                 """Checks if the cardkeyhash is valid 
    42                 Returns the user associated to the .key 
    43                 """ 
    44                  
    45                  
    46         def delete_association(self, cardkeyhash): 
    47                 """Deletes the association 
    48                 Returns True if the association existed and was deleted, False otherwise. 
    49                 """ 
    50                  
     33    implements(IAssociationStore) 
     34     
     35    def set_association(self, user, cardkeyhash): 
     36        """Sets the association for the user. 
     37        """ 
     38     
     39         
     40    def check_association(self, cardkeyhash): 
     41        """Checks if the cardkeyhash is valid 
     42        Returns the user associated to the .key 
     43        """ 
     44         
     45         
     46    def delete_association(self, cardkeyhash): 
     47        """Deletes the association 
     48        Returns True if the association existed and was deleted, False otherwise. 
     49        """ 
     50         
    5151class LDAPUserStore(Component): 
    52         implements(IPasswordStore) 
    53         url = None 
    54         binddn = "" 
    55         bindpw = "" 
    56         hash_method = ExtensionOption('ldap_user_store', 'hash_method', 
    57                                                                   IPasswordHashMethod, 'HtDigestHashMethod') 
    58                                                                    
    59         def __init__(self):                                                                
    60                 url = PathOption('ldap_user_store', 'url', '/') 
    61                 binddn = PathOption('ldap_user_store', 'bind_user', '') 
    62                 bindpw =  PathOption('ldap_user_store', 'bind_password', '') 
    63                  
    64                 self.url = self.env.config['ldap_user_store'].get('url') 
    65                 self.binddn = self.env.config['ldap_user_store'].get('bind_user')        
    66                 self.bindpw = self.env.config['ldap_user_store'].get('bind_password')    
     52    implements(IPasswordStore) 
     53    url = None 
     54    binddn = "" 
     55    bindpw = "" 
     56    hash_method = ExtensionOption('ldap_user_store', 'hash_method', 
     57                                  IPasswordHashMethod, 'HtDigestHashMethod') 
     58                                   
     59    def __init__(self):                                
     60        url = PathOption('ldap_user_store', 'url', '/') 
     61        binddn = PathOption('ldap_user_store', 'bind_user', '') 
     62        bindpw =  PathOption('ldap_user_store', 'bind_password', '') 
     63         
     64        self.url = self.env.config['ldap_user_store'].get('url') 
     65        self.binddn = self.env.config['ldap_user_store'].get('bind_user')    
     66        self.bindpw = self.env.config['ldap_user_store'].get('bind_password')    
    6767 
    68         def _setup_search_url(self): 
    69                 search_url = LDAPUrl(self.url) 
    70                 if search_url: 
    71                         try: 
    72                                 if not search_url.who: 
    73                                         search_url.who = self.binddn 
    74                                 if not search_url.cred: 
    75                                         search_url.cred = self.bindpw 
    76                         except AttributeError: 
    77                                 nop = 1         #catch and ignore the error 
    78                         return search_url 
    79                 return None; 
    80                  
    81         def _get_attrs(self, url): 
    82                 if len(url.attrs) > 0: 
    83                         attrList = [] 
    84                         for attr in url.attrs: 
    85                                 attrList.append(attr.encode('utf-8')) 
    86                         return attrList 
    87                 else: 
    88                         return ('uid',)  
    89                  
    90         def _user_is_invalid(self, user): 
    91                 """ look for ldap injection 
    92                 """ 
    93 #               "^(#$!@#$" 
    94 #               ")(()))" 
    95 #               self.log.debug('LDAPUserStore:_is_user_valid found potential ldap injection') 
    96                 return None 
    97          
    98         def _get_userfilterstr(self, search_url, user): 
    99                 attrs = self._get_attrs(search_url) 
    100                 if len(attrs) == 1: 
    101                         return  '(& ' + search_url.filterstr + '('+ self._get_attrs(search_url)+'='+ user       +'))' 
    102                 else: 
    103                         return '(& ' + search_url.filterstr + '(|' + ''.join("(%s=%s)" \ 
    104                                         % (attr, user) for attr in attrs)+'))' 
     68    def _setup_search_url(self): 
     69        search_url = LDAPUrl(self.url) 
     70        if search_url: 
     71            try: 
     72                if not search_url.who: 
     73                    search_url.who = self.binddn 
     74                if not search_url.cred: 
     75                    search_url.cred = self.bindpw 
     76            except AttributeError: 
     77                nop = 1     #catch and ignore the error 
     78            return search_url 
     79        return None; 
     80         
     81    def _get_attrs(self, url): 
     82        if len(url.attrs) > 0: 
     83            attrList = [] 
     84            for attr in url.attrs: 
     85                attrList.append(attr.encode('utf-8')) 
     86            return attrList 
     87        else: 
     88            return ('uid',)  
     89         
     90    def _user_is_invalid(self, user): 
     91        """ look for ldap injection 
     92         
     93         
     94        """ 
     95#       "^(#$!@#$" 
     96#       ")(()))" 
     97#       self.log.debug('LDAPUserStore:_is_user_valid found potential ldap injection') 
     98        return False 
     99     
     100    def _get_userfilterstr(self, search_url, user): 
     101        attrs = self._get_attrs(search_url) 
     102        if len(attrs) == 1: 
     103            return  '(& ' + search_url.filterstr + '('+ attrs[0] + '='+ user +'))' 
     104        else: 
     105            return '(& ' + search_url.filterstr + '(|' + ''.join("(%s=%s)" \ 
     106                    % (attr, user) for attr in attrs)+'))' 
    105107 
    106         def get_users(self): 
    107                 """Return known usernames 
    108                 """ 
    109                 search_url = self._setup_search_url() 
    110                 if search_url: 
    111                         self.log.debug('LDAPUserStore:get_users: binding as \"%s\" ' 
    112                                         + 'using \"%s\"', search_url.who, search_url.cred) 
    113                         l = ldap.open(search_url.hostport) 
    114                         l.simple_bind_s(search_url.who, search_url.cred) 
    115                          
    116                         #get first attr, place in list 
    117                         attrs = self._get_attrs(search_url) 
    118                         self.log.debug('LDAPUserStore:get_users: searching \"%s\" ' 
    119                                         + 'for \"%s\"', search_url.dn, search_url.filterstr) 
    120                         results = l.search_s(search_url.dn, search_url.scope,  
    121                                 search_url.filterstr, attrs) 
    122                                  
    123                         #works because there is only one attr per entry based on the number 
    124                         # of attributes in the attrs variable above 
    125                         for entry in results: 
    126 #                               if entry[1].items() == 1: 
    127                                 for attr in entry[1].values(): 
    128                                         self.log.debug('LDAPUserStore:get_users returning: %s' % (attr[0], )) 
    129                                         yield attr[0] 
    130 #                               else: 
    131 #                                       self.log.debug('LDAPUserStore:get_users found incorrect number of items') 
     108    def get_users(self): 
     109        """Return known usernames 
     110        """ 
     111        search_url = self._setup_search_url() 
     112        if search_url: 
     113            self.log.debug('LDAPUserStore:get_users: binding as \"%s\" ' 
     114                    + 'using \"%s\"', search_url.who, search_url.cred) 
     115            l = ldap.open(search_url.hostport) 
     116            l.simple_bind_s(search_url.who, search_url.cred) 
     117             
     118            #get first attr, place in list 
     119            attrs = self._get_attrs(search_url) 
     120            self.log.debug('LDAPUserStore:get_users: searching \"%s\" ' 
     121                    + 'for \"%s\"', search_url.dn, search_url.filterstr) 
     122            results = l.search_s(search_url.dn, search_url.scope,  
     123                search_url.filterstr, attrs) 
     124                 
     125            #works because there is only one attr per entry based on the number 
     126            # of attributes in the attrs variable above 
     127            for entry in results: 
     128#               if entry[1].items() == 1: 
     129                for attr in entry[1].values(): 
     130                    self.log.debug('LDAPUserStore:get_users returning: %s' % (attr[0], )) 
     131                    yield attr[0] 
     132#               else: 
     133#                   self.log.debug('LDAPUserStore:get_users found incorrect number of items') 
    132134 
    133         def has_user(self, user): 
    134 #               if self._user_is_invalid(user): 
    135 #                       return False 
    136                 try: 
    137                         search_url = self._setup_search_url() 
    138                         if search_url: 
    139                                 l = ldap.open(search_url.hostport) 
    140                                 l.simple_bind_s(search_url.who, search_url.cred) 
    141                                 #todo fix so it looks for the user. 
    142                                 results = l.search_s(search_url.dn, search_url.scope,  
    143                                         self._userfilterstr(search_url, user)) 
    144                                 if len(results) == 1: 
    145                                         return True 
    146                 except: 
    147                         pass 
    148                 return False 
     135    def has_user(self, user): 
     136#       if self._user_is_invalid(user): 
     137#           return False 
     138        try: 
     139            search_url = self._setup_search_url() 
     140            if search_url: 
     141                l = ldap.open(search_url.hostport) 
     142                l.simple_bind_s(search_url.who, search_url.cred) 
     143                #todo fix so it looks for the user. 
     144                results = l.search_s(search_url.dn, search_url.scope,  
     145                    self._userfilterstr(search_url, user)) 
     146                if len(results) == 1: 
     147                    return True 
     148        except: 
     149            pass 
     150        return False 
    149151 
    150         def check_password(self, user, password): 
    151 #               if self._user_is_invalid(user): 
    152 #                       return False 
    153                 try: 
    154                         search_url = self._setup_search_url() 
    155                         if search_url: 
    156                                 l = ldap.open(search_url.hostport) 
    157                                 l.simple_bind_s(search_url.who, search_url.cred) 
    158                                 searchfilter = self._get_userfilterstr(search_url, user) 
    159                                 self.log.debug('LDAPUserStore:check_password: searching \"%s\" ' 
    160                                         + 'for \"%s\"', search_url.dn, searchfilter) 
    161                                 results = l.search_s(search_url.dn, search_url.scope, searchfilter) 
    162                                 if len(results) == 1: 
    163 #                                       self.log.debug('LDAPUserStore:check_password \"%s\" for \"%s\"', 
    164 #                                               password, (results[0])[0]) 
    165 #                                       for entry in results: 
    166 #                                               self.log.debug('LDAPUserStore:check_password \"%s\"', entry[0]) 
    167 #                                               self.log.debug('LDAPUserStore:check_password \"%s\"', entry[1]) 
    168                                         l.simple_bind_s(results[0][0], password) 
    169                                         self.log.debug('LDAPUserStore:check_password succeeded') 
    170                                         return True 
    171                                 elif len(results) == 0: 
    172                                         self.log.debug('LDAPUserStore:check_password search failed') 
    173 #                               else: 
    174 #                                       for entry in results: 
    175 #                                               self.log.debug('LDAPUserStore:check_password \"%s\"', entry[0]) 
    176 #                                               self.log.debug('LDAPUserStore:check_password \"%s\"', entry[1]) 
    177                 except: 
    178                         #on error just return the password failed 
    179                         self.log.debug('LDAPUserStore:check_password for \"%s\" failed' % (user)) 
    180                         return False 
     152    def check_password(self, user, password): 
     153#       if self._user_is_invalid(user): 
     154#           return False 
     155        try: 
     156            search_url = self._setup_search_url() 
     157            if search_url: 
     158                l = ldap.open(search_url.hostport) 
     159                l.simple_bind_s(search_url.who, search_url.cred) 
     160                searchfilter = self._get_userfilterstr(search_url, user) 
     161                self.log.debug('LDAPUserStore:check_password: searching \"%s\" ' 
     162                    + 'for \"%s\"', search_url.dn, searchfilter) 
     163                results = l.search_s(search_url.dn, search_url.scope, searchfilter) 
     164                if len(results) == 1: 
     165#                   self.log.debug('LDAPUserStore:check_password \"%s\" for \"%s\"', 
     166#                       password, (results[0])[0]) 
     167#                   for entry in results: 
     168#                       self.log.debug('LDAPUserStore:check_password \"%s\"', entry[0]) 
     169#                       self.log.debug('LDAPUserStore:check_password \"%s\"', entry[1]) 
     170                    l.simple_bind_s(results[0][0], password) 
     171                    self.log.debug('LDAPUserStore:check_password succeeded') 
     172                    return True 
     173                elif len(results) == 0: 
     174                    self.log.debug('LDAPUserStore:check_password search failed') 
     175#               else: 
     176#                   for entry in results: 
     177#                       self.log.debug('LDAPUserStore:check_password \"%s\"', entry[0]) 
     178#                       self.log.debug('LDAPUserStore:check_password \"%s\"', entry[1]) 
     179        except ldap.LDAPError, e: 
     180            #on error just return the password failed 
     181            self.log.debug('LDAPUserStore:check_password for \"%s\" failed : %s' % (user, e)) 
     182            return False 
    181183 
    182 #       def delete_user(self, user): 
    183 #               """Deletes the user account. 
    184 #               Returns True if the account existed and was deleted, False otherwise. 
    185 #               """ 
    186 #               return False 
    187                  
     184#   def delete_user(self, user): 
     185#       """Deletes the user account. 
     186#       Returns True if the account existed and was deleted, False otherwise. 
     187#       """ 
     188#       return False 
     189