import os CURRDIR = os.path.dirname(os.environ.get('SCRIPT_FILENAME') or os.environ.get('PWD') or os.getcwd()) ROOT = os.environ.get('DOCUMENT_ROOT', CURRDIR) class ACL(object): """very simple ACL for CGI it uses plain strings for permissions and roles, and allows bottom-up searching of folders for auth dbs to see if a user is defined and what roles it has (meaning that if someone is author in / and editor in /foo/bar, it will have *both* roles in /foo/bar/baz) """ def __init__(self, root=ROOT, path=CURRDIR): """initialize the ACL 'root' is the path to the root of the application, on the filesystem, 'path' is an absolute path from the root of the application to the current resource """ if root.endswith('/'): root = root[:-1] assert path.startswith('/'), 'path must be absolute' self._root = root self._path = path self._db = { 'roles': {}, # key role, value list of permissions (strings) 'users': [], # list of strings 'path_mappings': {}, # dict {path: {user: [roles]}} } def register_role(self, role, perms=[]): """register a role (to the root acl) raise a DuplicateError if the role is already defined permissions is a list of permission names the role should have path is a path to the directory where the provided permissions should be applied, defaulting to the root of the application """ if self._db['roles'].has_key(role): raise DuplicateError, role self._db['roles'][role] = perms def add_role_permissions(self, role, perms=[]): """add permissions for this role""" roleperms = self._db['roles'][role] for perm in perms: if perm in roleperms: raise DuplicateError, \ 'permission %s already assigned to %s role' % (role, perm) roleperms.extend(perms) def revoke_role_permissions(self, role, perms=[]): """revoke permissions for the role""" roleperms = self._db['roles'][role] for perm in perms: roleperms.remove(perm) def remove_role(self, role): """remove the role from the ACL raises a KeyError if the role does not exist removes the role from the database entirely, note that this may be a large operation """ del self._db['roles'][role] self._remove_role_from_path_mappings(role) def register_user(self, user, roles=[], path='/'): """register a user (to the root acl) raises a DuplicateError if the user is already defined user is a string with the username as defined in $REMOTE_USER (iow the user id of the currently authenticated user) roles is a list of role names the user should have path is a path to the directory where the provided roles should be applied, defaulting to the root of the application """ if user in self._db['users']: raise DuplicateError, user self._db['users'].append(user) if roles: self.add_user_roles(user, roles, path) def add_user_roles(self, user, roles, path='/'): """add roles for a user""" if path != '/' and path.endswith('/'): path = path[:-1] path_mappings = self._db['path_mappings'] if not path_mappings.has_key(path): path_mappings[path] = {} if not user in path_mappings[path].keys(): path_mappings[path][user] = [] current_roles = path_mappings[path][user] for role in roles: if role in current_roles: raise DuplicateError, \ ('role %s already assigned to user %s on path %s' % (role, user, path)) current_roles.extend(roles) def revoke_user_roles(self, user, roles, path='/'): """revoke roles for a user""" if path != '/' and path.endswith('/'): path = path[:-1] local_roles = self._db['path_mappings'][path][user] for role in roles: local_roles.remove(role) def remove_user(self, user): """remove a user from the ACL""" self._db['users'].remove(user) self._remove_user_from_mapping(user) def user_has_permission(self, user, perm, path='/'): """check whether a user has a certain permission on a certain place""" # start from the root, work our way up to the current path user_roles = [] if path != '/' and path.endswith('/'): path = path[:-1] chunks = path.split('/') for i in range(len(chunks)): currpath = '/'.join(chunks[:i + 1]) if not currpath: currpath = '/' user_roles_mappings = self._db['path_mappings'].get(currpath) if user_roles_mappings is None: continue user_roles.extend(user_roles_mappings.get(user, [])) for role in user_roles: if perm in self._db['roles'][role]: return True return False def users(self): return self._db['users'][:] def _remove_role_from_path_mappings(self, role): """remove a role from the path mapping""" # very naive implementation for path, idict in self._db['path_mappings'].items(): for user, roles in idict.items(): if role in roles: roles.remove(role) def _remove_user_from_mapping(self, user): for path, idict in self._db['path_mappings'].items(): if user in idict.keys(): del idict[user] class ShelvedACL(ACL): """ACL using a simple shelve db""" def __init__(self, root=ROOT, path=CURRDIR, filename='.acldb'): super(ShelvedACL, self).__init__(root, path) import shelve if not os.path.exists(root): os.mkdir(root) self._db = db = shelve.open(os.path.join(root, filename)) db.writeback = True try: self._db['roles'] except KeyError: self._db['roles'] = {} self._db['users'] = [] self._db['path_mappings'] = {} for func in ('register_permission', 'remove_permission', 'register_role', 'add_role_permissions', 'revoke_role_permissions', 'remove_role', 'add_user_roles', 'revoke_user_roles', 'remove_user', ): def mkhandler(self, func): def handler(*args, **kwargs): ret = getattr(super(ShelvedACL, self), func)(*args, **kwargs) self._db.sync() return ret return handler setattr(self, func, mkhandler(self, func)) def _remove_user_from_mapping(self, user): super(ShelvedACL, self)._remove_user_from_mapping(user) self._db.sync() class DuplicateError(Exception): """raised when registering something that is already registered"""