Source code for flask_appbuilder.security.manager

import base64
import datetime
import json
import logging
import re
from typing import Dict, List, Set

from flask import g, session, url_for
from flask_babel import lazy_gettext as _
from flask_jwt_extended import current_user as current_user_jwt
from flask_jwt_extended import JWTManager
from flask_login import current_user, LoginManager
from flask_openid import OpenID
from werkzeug.security import check_password_hash, generate_password_hash

from .api import SecurityApi
from .registerviews import (
    RegisterUserDBView,
    RegisterUserOAuthView,
    RegisterUserOIDView,
)
from .views import (
    AuthDBView,
    AuthLDAPView,
    AuthOAuthView,
    AuthOIDView,
    AuthRemoteUserView,
    PermissionModelView,
    PermissionViewModelView,
    RegisterUserModelView,
    ResetMyPasswordView,
    ResetPasswordView,
    RoleModelView,
    UserDBModelView,
    UserInfoEditView,
    UserLDAPModelView,
    UserOAuthModelView,
    UserOIDModelView,
    UserRemoteUserModelView,
    UserStatsChartView,
    ViewMenuModelView,
)
from ..basemanager import BaseManager
from ..const import (
    AUTH_DB,
    AUTH_LDAP,
    AUTH_OAUTH,
    AUTH_OID,
    AUTH_REMOTE_USER,
    LOGMSG_ERR_SEC_AUTH_LDAP,
    LOGMSG_ERR_SEC_AUTH_LDAP_TLS,
    LOGMSG_WAR_SEC_LOGIN_FAILED,
    LOGMSG_WAR_SEC_NO_USER,
    LOGMSG_WAR_SEC_NOLDAP_OBJ,
    PERMISSION_PREFIX,
)

log = logging.getLogger(__name__)


class AbstractSecurityManager(BaseManager):
    """
        Abstract SecurityManager class, declares all methods used by the
        framework. There is no assumptions about security models or auth types.
    """

    def add_permissions_view(self, base_permissions, view_menu):
        """
            Adds a permission on a view menu to the backend

            :param base_permissions:
                list of permissions from view (all exposed methods):
                 'can_add','can_edit' etc...
            :param view_menu:
                name of the view or menu to add
        """
        raise NotImplementedError

    def add_permissions_menu(self, view_menu_name):
        """
            Adds menu_access to menu on permission_view_menu

            :param view_menu_name:
                The menu name
        """
        raise NotImplementedError

    def register_views(self):
        """
            Generic function to create the security views
        """
        raise NotImplementedError

    def is_item_public(self, permission_name, view_name):
        """
            Check if view has public permissions

            :param permission_name:
                the permission: can_show, can_edit...
            :param view_name:
                the name of the class view (child of BaseView)
        """
        raise NotImplementedError

    def has_access(self, permission_name, view_name):
        """
            Check if current user or public has access to view or menu
        """
        raise NotImplementedError

    def security_cleanup(self, baseviews, menus):
        raise NotImplementedError


def _oauth_tokengetter(token=None):
    """
        Default function to return the current user oauth token
        from session cookie.
    """
    token = session.get("oauth")
    log.debug("Token Get: {0}".format(token))
    return token


[docs]class BaseSecurityManager(AbstractSecurityManager): auth_view = None """ The obj instance for authentication view """ user_view = None """ The obj instance for user view """ registeruser_view = None """ The obj instance for registering user view """ lm = None """ Flask-Login LoginManager """ jwt_manager = None """ Flask-JWT-Extended """ oid = None """ Flask-OpenID OpenID """ oauth = None """ Flask-OAuth """ oauth_remotes = None """ OAuth email whitelists """ oauth_whitelists = {} """ Initialized (remote_app) providers dict {'provider_name', OBJ } """ oauth_tokengetter = _oauth_tokengetter """ OAuth tokengetter function override to implement your own tokengetter method """ oauth_user_info = None user_model = None """ Override to set your own User Model """ role_model = None """ Override to set your own Role Model """ permission_model = None """ Override to set your own Permission Model """ viewmenu_model = None """ Override to set your own ViewMenu Model """ permissionview_model = None """ Override to set your own PermissionView Model """ registeruser_model = None """ Override to set your own RegisterUser Model """ userdbmodelview = UserDBModelView """ Override if you want your own user db view """ userldapmodelview = UserLDAPModelView """ Override if you want your own user ldap view """ useroidmodelview = UserOIDModelView """ Override if you want your own user OID view """ useroauthmodelview = UserOAuthModelView """ Override if you want your own user OAuth view """ userremoteusermodelview = UserRemoteUserModelView """ Override if you want your own user REMOTE_USER view """ registerusermodelview = RegisterUserModelView authdbview = AuthDBView """ Override if you want your own Authentication DB view """ authldapview = AuthLDAPView """ Override if you want your own Authentication LDAP view """ authoidview = AuthOIDView """ Override if you want your own Authentication OID view """ authoauthview = AuthOAuthView """ Override if you want your own Authentication OAuth view """ authremoteuserview = AuthRemoteUserView """ Override if you want your own Authentication REMOTE_USER view """ registeruserdbview = RegisterUserDBView """ Override if you want your own register user db view """ registeruseroidview = RegisterUserOIDView """ Override if you want your own register user OpenID view """ registeruseroauthview = RegisterUserOAuthView """ Override if you want your own register user OAuth view """ resetmypasswordview = ResetMyPasswordView """ Override if you want your own reset my password view """ resetpasswordview = ResetPasswordView """ Override if you want your own reset password view """ userinfoeditview = UserInfoEditView """ Override if you want your own User information edit view """ # API security_api = SecurityApi """ Override if you want your own Security API login endpoint """ rolemodelview = RoleModelView permissionmodelview = PermissionModelView userstatschartview = UserStatsChartView viewmenumodelview = ViewMenuModelView permissionviewmodelview = PermissionViewModelView def __init__(self, appbuilder): super(BaseSecurityManager, self).__init__(appbuilder) app = self.appbuilder.get_app # Base Security Config app.config.setdefault("AUTH_ROLE_ADMIN", "Admin") app.config.setdefault("AUTH_ROLE_PUBLIC", "Public") app.config.setdefault("AUTH_TYPE", AUTH_DB) # Self Registration app.config.setdefault("AUTH_USER_REGISTRATION", False) app.config.setdefault("AUTH_USER_REGISTRATION_ROLE", self.auth_role_public) app.config.setdefault("AUTH_USER_REGISTRATION_ROLE_JMESPATH", None) # LDAP Config if self.auth_type == AUTH_LDAP: if "AUTH_LDAP_SERVER" not in app.config: raise Exception( "No AUTH_LDAP_SERVER defined on config" " with AUTH_LDAP authentication type." ) app.config.setdefault("AUTH_LDAP_SEARCH", "") app.config.setdefault("AUTH_LDAP_SEARCH_FILTER", "") app.config.setdefault("AUTH_LDAP_BIND_USER", "") app.config.setdefault("AUTH_LDAP_APPEND_DOMAIN", "") app.config.setdefault("AUTH_LDAP_USERNAME_FORMAT", "") app.config.setdefault("AUTH_LDAP_BIND_PASSWORD", "") # TLS options app.config.setdefault("AUTH_LDAP_USE_TLS", False) app.config.setdefault("AUTH_LDAP_ALLOW_SELF_SIGNED", False) app.config.setdefault("AUTH_LDAP_TLS_DEMAND", False) app.config.setdefault("AUTH_LDAP_TLS_CACERTDIR", "") app.config.setdefault("AUTH_LDAP_TLS_CACERTFILE", "") app.config.setdefault("AUTH_LDAP_TLS_CERTFILE", "") app.config.setdefault("AUTH_LDAP_TLS_KEYFILE", "") # Mapping options app.config.setdefault("AUTH_LDAP_UID_FIELD", "uid") app.config.setdefault("AUTH_LDAP_FIRSTNAME_FIELD", "givenName") app.config.setdefault("AUTH_LDAP_LASTNAME_FIELD", "sn") app.config.setdefault("AUTH_LDAP_EMAIL_FIELD", "mail") if self.auth_type == AUTH_OID: self.oid = OpenID(app) if self.auth_type == AUTH_OAUTH: from authlib.integrations.flask_client import OAuth self.oauth = OAuth(app) self.oauth_remotes = dict() for _provider in self.oauth_providers: provider_name = _provider["name"] log.debug("OAuth providers init {0}".format(provider_name)) obj_provider = self.oauth.register( provider_name, **_provider["remote_app"] ) obj_provider._tokengetter = self.oauth_tokengetter if not self.oauth_user_info: self.oauth_user_info = self.get_oauth_user_info # Whitelist only users with matching emails if "whitelist" in _provider: self.oauth_whitelists[provider_name] = _provider["whitelist"] self.oauth_remotes[provider_name] = obj_provider self._builtin_roles = self.create_builtin_roles() # Setup Flask-Login self.lm = self.create_login_manager(app) # Setup Flask-Jwt-Extended self.jwt_manager = self.create_jwt_manager(app)
[docs] def create_login_manager(self, app) -> LoginManager: """ Override to implement your custom login manager instance :param app: Flask app """ lm = LoginManager(app) lm.login_view = "login" lm.user_loader(self.load_user) return lm
[docs] def create_jwt_manager(self, app) -> JWTManager: """ Override to implement your custom JWT manager instance :param app: Flask app """ jwt_manager = JWTManager() jwt_manager.init_app(app) jwt_manager.user_loader_callback_loader(self.load_user_jwt) return jwt_manager
def create_builtin_roles(self): return self.appbuilder.get_app.config.get("FAB_ROLES", {}) @property def get_url_for_registeruser(self): return url_for( "%s.%s" % (self.registeruser_view.endpoint, self.registeruser_view.default_view) ) @property def get_user_datamodel(self): return self.user_view.datamodel @property def get_register_user_datamodel(self): return self.registerusermodelview.datamodel @property def builtin_roles(self): return self._builtin_roles @property def auth_type(self): return self.appbuilder.get_app.config["AUTH_TYPE"] @property def auth_username_ci(self): return self.appbuilder.get_app.config.get("AUTH_USERNAME_CI", True) @property def auth_role_admin(self): return self.appbuilder.get_app.config["AUTH_ROLE_ADMIN"] @property def auth_role_public(self): return self.appbuilder.get_app.config["AUTH_ROLE_PUBLIC"] @property def auth_ldap_server(self): return self.appbuilder.get_app.config["AUTH_LDAP_SERVER"] @property def auth_ldap_use_tls(self): return self.appbuilder.get_app.config["AUTH_LDAP_USE_TLS"] @property def auth_user_registration(self): return self.appbuilder.get_app.config["AUTH_USER_REGISTRATION"] @property def auth_user_registration_role(self): return self.appbuilder.get_app.config["AUTH_USER_REGISTRATION_ROLE"] @property def auth_user_registration_role_jmespath(self) -> str: return self.appbuilder.get_app.config["AUTH_USER_REGISTRATION_ROLE_JMESPATH"] @property def auth_ldap_search(self): return self.appbuilder.get_app.config["AUTH_LDAP_SEARCH"] @property def auth_ldap_search_filter(self): return self.appbuilder.get_app.config["AUTH_LDAP_SEARCH_FILTER"] @property def auth_ldap_bind_user(self): return self.appbuilder.get_app.config["AUTH_LDAP_BIND_USER"] @property def auth_ldap_bind_password(self): return self.appbuilder.get_app.config["AUTH_LDAP_BIND_PASSWORD"] @property def auth_ldap_append_domain(self): return self.appbuilder.get_app.config["AUTH_LDAP_APPEND_DOMAIN"] @property def auth_ldap_username_format(self): return self.appbuilder.get_app.config["AUTH_LDAP_USERNAME_FORMAT"] @property def auth_ldap_uid_field(self): return self.appbuilder.get_app.config["AUTH_LDAP_UID_FIELD"] @property def auth_ldap_firstname_field(self): return self.appbuilder.get_app.config["AUTH_LDAP_FIRSTNAME_FIELD"] @property def auth_ldap_lastname_field(self): return self.appbuilder.get_app.config["AUTH_LDAP_LASTNAME_FIELD"] @property def auth_ldap_email_field(self): return self.appbuilder.get_app.config["AUTH_LDAP_EMAIL_FIELD"] @property def auth_ldap_bind_first(self): return self.appbuilder.get_app.config["AUTH_LDAP_BIND_FIRST"] @property def auth_ldap_allow_self_signed(self): return self.appbuilder.get_app.config["AUTH_LDAP_ALLOW_SELF_SIGNED"] @property def auth_ldap_tls_demand(self): return self.appbuilder.get_app.config["AUTH_LDAP_TLS_DEMAND"] @property def auth_ldap_tls_cacertdir(self): return self.appbuilder.get_app.config["AUTH_LDAP_TLS_CACERTDIR"] @property def auth_ldap_tls_cacertfile(self): return self.appbuilder.get_app.config["AUTH_LDAP_TLS_CACERTFILE"] @property def auth_ldap_tls_certfile(self): return self.appbuilder.get_app.config["AUTH_LDAP_TLS_CERTFILE"] @property def auth_ldap_tls_keyfile(self): return self.appbuilder.get_app.config["AUTH_LDAP_TLS_KEYFILE"] @property def openid_providers(self): return self.appbuilder.get_app.config["OPENID_PROVIDERS"] @property def oauth_providers(self): return self.appbuilder.get_app.config["OAUTH_PROVIDERS"] @property def current_user(self): if current_user.is_authenticated: return g.user elif current_user_jwt: return current_user_jwt
[docs] def oauth_user_info_getter(self, f): """ Decorator function to be the OAuth user info getter for all the providers, receives provider and response return a dict with the information returned from the provider. The returned user info dict should have it's keys with the same name as the User Model. Use it like this an example for GitHub :: @appbuilder.sm.oauth_user_info_getter def my_oauth_user_info(sm, provider, response=None): if provider == 'github': me = sm.oauth_remotes[provider].get('user') return {'username': me.data.get('login')} else: return {} """ def wraps(provider, response=None): ret = f(self, provider, response=response) # Checks if decorator is well behaved and returns a dict as supposed. if not type(ret) == dict: log.error( "OAuth user info decorated function " "did not returned a dict, but: {0}".format(type(ret)) ) return {} return ret self.oauth_user_info = wraps return wraps
[docs] def get_oauth_token_key_name(self, provider): """ Returns the token_key name for the oauth provider if none is configured defaults to oauth_token this is configured using OAUTH_PROVIDERS and token_key key. """ for _provider in self.oauth_providers: if _provider["name"] == provider: return _provider.get("token_key", "oauth_token")
[docs] def get_oauth_token_secret_name(self, provider): """ Returns the token_secret name for the oauth provider if none is configured defaults to oauth_secret this is configured using OAUTH_PROVIDERS and token_secret """ for _provider in self.oauth_providers: if _provider["name"] == provider: return _provider.get("token_secret", "oauth_token_secret")
[docs] def set_oauth_session(self, provider, oauth_response): """ Set the current session with OAuth user secrets """ # Get this provider key names for token_key and token_secret token_key = self.appbuilder.sm.get_oauth_token_key_name(provider) token_secret = self.appbuilder.sm.get_oauth_token_secret_name(provider) # Save users token on encrypted session cookie session["oauth"] = ( oauth_response[token_key], oauth_response.get(token_secret, ""), ) session["oauth_provider"] = provider
[docs] def get_oauth_user_info(self, provider, resp): """ Since there are different OAuth API's with different ways to retrieve user info """ # for GITHUB if provider == "github" or provider == "githublocal": me = self.appbuilder.sm.oauth_remotes[provider].get("user") data = me.json() log.debug("User info from Github: {0}".format(data)) return {"username": "github_" + data.get("login")} # for twitter if provider == "twitter": me = self.appbuilder.sm.oauth_remotes[provider].get("account/settings.json") data = me.json() log.debug("User info from Twitter: {0}".format(data)) return {"username": "twitter_" + data.get("screen_name", "")} # for linkedin if provider == "linkedin": me = self.appbuilder.sm.oauth_remotes[provider].get( "people/~:(id,email-address,first-name,last-name)?format=json" ) data = me.json() log.debug("User info from Linkedin: {0}".format(data)) return { "username": "linkedin_" + data.get("id", ""), "email": data.get("email-address", ""), "first_name": data.get("firstName", ""), "last_name": data.get("lastName", ""), } # for Google if provider == "google": me = self.appbuilder.sm.oauth_remotes[provider].get("userinfo") data = me.json() log.debug("User info from Google: {0}".format(data)) return { "username": "google_" + data.get("id", ""), "first_name": data.get("given_name", ""), "last_name": data.get("family_name", ""), "email": data.get("email", ""), } # for Azure AD Tenant. Azure OAuth response contains # JWT token which has user info. # JWT token needs to be base64 decoded. # https://docs.microsoft.com/en-us/azure/active-directory/develop/ # active-directory-protocols-oauth-code if provider == "azure": log.debug("Azure response received : {0}".format(resp)) id_token = resp["id_token"] log.debug(str(id_token)) me = self._azure_jwt_token_parse(id_token) log.debug("Parse JWT token : {0}".format(me)) return { "name": me["name"], "email": me["upn"], "first_name": me["given_name"], "last_name": me["family_name"], "id": me["oid"], "username": me["oid"], } # for OpenShift if provider == "openshift": me = self.appbuilder.sm.oauth_remotes[provider].get( "apis/user.openshift.io/v1/users/~" ) data = me.json() log.debug("User info from OpenShift: {0}".format(data)) return {"username": "openshift_" + data.get("metadata").get("name")} else: return {}
def _azure_parse_jwt(self, id_token): jwt_token_parts = r"^([^\.\s]*)\.([^\.\s]+)\.([^\.\s]*)$" matches = re.search(jwt_token_parts, id_token) if not matches or len(matches.groups()) < 3: log.error("Unable to parse token.") return {} return { "header": matches.group(1), "Payload": matches.group(2), "Sig": matches.group(3), } def _azure_jwt_token_parse(self, id_token): jwt_split_token = self._azure_parse_jwt(id_token) if not jwt_split_token: return jwt_payload = jwt_split_token["Payload"] # Prepare for base64 decoding payload_b64_string = jwt_payload payload_b64_string += "=" * (4 - ((len(jwt_payload) % 4))) decoded_payload = base64.urlsafe_b64decode(payload_b64_string.encode("ascii")) if not decoded_payload: log.error("Payload of id_token could not be base64 url decoded.") return jwt_decoded_payload = json.loads(decoded_payload.decode("utf-8")) return jwt_decoded_payload
[docs] def register_views(self): if not self.appbuilder.app.config.get("FAB_ADD_SECURITY_VIEWS", True): return # Security APIs self.appbuilder.add_api(self.security_api) if self.auth_user_registration: if self.auth_type == AUTH_DB: self.registeruser_view = self.registeruserdbview() elif self.auth_type == AUTH_OID: self.registeruser_view = self.registeruseroidview() elif self.auth_type == AUTH_OAUTH: self.registeruser_view = self.registeruseroauthview() if self.registeruser_view: self.appbuilder.add_view_no_menu(self.registeruser_view) self.appbuilder.add_view_no_menu(self.resetpasswordview()) self.appbuilder.add_view_no_menu(self.resetmypasswordview()) self.appbuilder.add_view_no_menu(self.userinfoeditview()) if self.auth_type == AUTH_DB: self.user_view = self.userdbmodelview self.auth_view = self.authdbview() elif self.auth_type == AUTH_LDAP: self.user_view = self.userldapmodelview self.auth_view = self.authldapview() elif self.auth_type == AUTH_OAUTH: self.user_view = self.useroauthmodelview self.auth_view = self.authoauthview() elif self.auth_type == AUTH_REMOTE_USER: self.user_view = self.userremoteusermodelview self.auth_view = self.authremoteuserview() else: self.user_view = self.useroidmodelview self.auth_view = self.authoidview() if self.auth_user_registration: pass # self.registeruser_view = self.registeruseroidview() # self.appbuilder.add_view_no_menu(self.registeruser_view) self.appbuilder.add_view_no_menu(self.auth_view) self.user_view = self.appbuilder.add_view( self.user_view, "List Users", icon="fa-user", label=_("List Users"), category="Security", category_icon="fa-cogs", category_label=_("Security"), ) role_view = self.appbuilder.add_view( self.rolemodelview, "List Roles", icon="fa-group", label=_("List Roles"), category="Security", category_icon="fa-cogs", ) role_view.related_views = [self.user_view.__class__] if self.userstatschartview: self.appbuilder.add_view( self.userstatschartview, "User's Statistics", icon="fa-bar-chart-o", label=_("User's Statistics"), category="Security", ) if self.auth_user_registration: self.appbuilder.add_view( self.registerusermodelview, "User's Statistics", icon="fa-user-plus", label=_("User Registrations"), category="Security", ) self.appbuilder.menu.add_separator("Security") if self.appbuilder.app.config.get("FAB_ADD_SECURITY_PERMISSION_VIEW", True): self.appbuilder.add_view( self.permissionmodelview, "Base Permissions", icon="fa-lock", label=_("Base Permissions"), category="Security", ) if self.appbuilder.app.config.get("FAB_ADD_SECURITY_VIEW_MENU_VIEW", True): self.appbuilder.add_view( self.viewmenumodelview, "Views/Menus", icon="fa-list-alt", label=_("Views/Menus"), category="Security", ) if self.appbuilder.app.config.get( "FAB_ADD_SECURITY_PERMISSION_VIEWS_VIEW", True ): self.appbuilder.add_view( self.permissionviewmodelview, "Permission on Views/Menus", icon="fa-link", label=_("Permission on Views/Menus"), category="Security", )
[docs] def create_db(self): """ Setups the DB, creates admin and public roles if they don't exist. """ roles_mapping = self.appbuilder.get_app.config.get("FAB_ROLES_MAPPING", {}) for pk, name in roles_mapping.items(): self.update_role(pk, name) for role_name in self.builtin_roles: self.add_role(role_name) if self.auth_role_admin not in self.builtin_roles: self.add_role(self.auth_role_admin) self.add_role(self.auth_role_public) if self.count_users() == 0: log.warning(LOGMSG_WAR_SEC_NO_USER)
[docs] def reset_password(self, userid, password): """ Change/Reset a user's password for authdb. Password will be hashed and saved. :param userid: the user.id to reset the password :param password: The clear text password to reset and save hashed on the db """ user = self.get_user_by_id(userid) user.password = generate_password_hash(password) self.update_user(user)
[docs] def update_user_auth_stat(self, user, success=True): """ Update authentication successful to user. :param user: The authenticated user model :param success: Default to true, if false increments fail_login_count on user model """ if not user.login_count: user.login_count = 0 if not user.fail_login_count: user.fail_login_count = 0 if success: user.login_count += 1 user.fail_login_count = 0 else: user.fail_login_count += 1 user.last_login = datetime.datetime.now() self.update_user(user)
[docs] def auth_user_db(self, username, password): """ Method for authenticating user, auth db style :param username: The username or registered email address :param password: The password, will be tested against hashed password on db """ if username is None or username == "": return None user = self.find_user(username=username) if user is None: user = self.find_user(email=username) if user is None or (not user.is_active): log.info(LOGMSG_WAR_SEC_LOGIN_FAILED.format(username)) return None elif check_password_hash(user.password, password): self.update_user_auth_stat(user, True) return user else: self.update_user_auth_stat(user, False) log.info(LOGMSG_WAR_SEC_LOGIN_FAILED.format(username)) return None
def _search_ldap(self, ldap, con, username): """ Searches LDAP for user, assumes ldap_search is set. :param ldap: The ldap module reference :param con: The ldap connection :param username: username to match with auth_ldap_uid_field :return: ldap object array """ if self.auth_ldap_append_domain: username = username + "@" + self.auth_ldap_append_domain if self.auth_ldap_search_filter: filter_str = "(&%s(%s=%s))" % ( self.auth_ldap_search_filter, self.auth_ldap_uid_field, username, ) else: filter_str = "(%s=%s)" % (self.auth_ldap_uid_field, username) user = con.search_s( self.auth_ldap_search, ldap.SCOPE_SUBTREE, filter_str, [ self.auth_ldap_firstname_field, self.auth_ldap_lastname_field, self.auth_ldap_email_field, ], ) if user: if not user[0][0]: return None return user def _bind_indirect_user(self, ldap, con): """ If using AUTH_LDAP_BIND_USER bind this user before performing search :param ldap: The ldap module reference :param con: The ldap connection """ indirect_user = self.auth_ldap_bind_user if indirect_user: indirect_password = self.auth_ldap_bind_password log.debug("LDAP indirect bind with: {0}".format(indirect_user)) con.bind_s(indirect_user, indirect_password) log.debug("LDAP BIND indirect OK") def _bind_ldap(self, ldap, con, username, password): """ Private to bind/Authenticate a user. If AUTH_LDAP_BIND_USER exists then it will bind first with it, next will search the LDAP server using the username with UID and try to bind to it (OpenLDAP). If AUTH_LDAP_BIND_USER does not exit, will bind with username/password """ try: if self.auth_ldap_bind_user: self._bind_indirect_user(ldap, con) user = self._search_ldap(ldap, con, username) if user: log.debug("LDAP got User {0}".format(user)) # username = DN from search username = user[0][0] else: log.debug("LDAP bind failure: user not found") return False log.debug("LDAP bind with: {0} {1}".format(username, "XXXXXX")) if self.auth_ldap_username_format: username = self.auth_ldap_username_format % username if self.auth_ldap_append_domain: username = username + "@" + self.auth_ldap_append_domain con.bind_s(username, password) log.debug("LDAP bind OK: {0}".format(username)) return True except ldap.INVALID_CREDENTIALS: log.debug("LDAP bind failure: invalid credentials") return False @staticmethod def ldap_extract(ldap_dict, field, fallback): if not ldap_dict.get(field): return fallback return ldap_dict[field][0].decode("utf-8") or fallback
[docs] def auth_user_ldap(self, username, password): """ Method for authenticating user, auth LDAP style. depends on ldap module that is not mandatory requirement for F.A.B. :param username: The username :param password: The password """ if username is None or username == "": return None user = self.find_user(username=username) if user is not None and (not user.is_active): return None else: try: import ldap except Exception: raise Exception("No ldap library for python.") try: if self.auth_ldap_allow_self_signed: ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0) elif self.auth_ldap_tls_demand: ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0) if self.auth_ldap_tls_cacertdir: ldap.set_option( ldap.OPT_X_TLS_CACERTDIR, self.auth_ldap_tls_cacertdir ) if self.auth_ldap_tls_cacertfile: ldap.set_option( ldap.OPT_X_TLS_CACERTFILE, self.auth_ldap_tls_cacertfile ) if self.auth_ldap_tls_certfile and self.auth_ldap_tls_keyfile: ldap.set_option( ldap.OPT_X_TLS_CERTFILE, self.auth_ldap_tls_certfile ) ldap.set_option(ldap.OPT_X_TLS_KEYFILE, self.auth_ldap_tls_keyfile) con = ldap.initialize(self.auth_ldap_server) con.set_option(ldap.OPT_REFERRALS, 0) if self.auth_ldap_use_tls: try: con.start_tls_s() except Exception: log.info( LOGMSG_ERR_SEC_AUTH_LDAP_TLS.format(self.auth_ldap_server) ) return None # Authenticate user if not self._bind_ldap(ldap, con, username, password): if user: self.update_user_auth_stat(user, False) log.warning(LOGMSG_WAR_SEC_LOGIN_FAILED.format(username)) return None # If user does not exist on the DB and not self user registration, go away if not user and not self.auth_user_registration: return None # User does not exist, create one if self registration. elif not user and self.auth_user_registration: self._bind_indirect_user(ldap, con) new_user = self._search_ldap(ldap, con, username) if not new_user: log.warning(LOGMSG_WAR_SEC_NOLDAP_OBJ.format(username)) return None ldap_user_info = new_user[0][1] if self.auth_user_registration and user is None: user = self.add_user( username=username, first_name=self.ldap_extract( ldap_user_info, self.auth_ldap_firstname_field, username ), last_name=self.ldap_extract( ldap_user_info, self.auth_ldap_lastname_field, username ), email=self.ldap_extract( ldap_user_info, self.auth_ldap_email_field, username + "@email.notfound", ), role=self.find_role(self.auth_user_registration_role), ) self.update_user_auth_stat(user) return user except ldap.LDAPError as e: msg = None if isinstance(e, dict): msg = getattr(e, "message", None) if msg is not None and "desc" in msg: log.error(LOGMSG_ERR_SEC_AUTH_LDAP.format(e.message["desc"])) return None else: log.error(e) return None
[docs] def auth_user_oid(self, email): """ OpenID user Authentication :param email: user's email to authenticate :type self: User model """ user = self.find_user(email=email) if user is None or (not user.is_active): log.info(LOGMSG_WAR_SEC_LOGIN_FAILED.format(email)) return None else: self.update_user_auth_stat(user) return user
[docs] def auth_user_remote_user(self, username): """ REMOTE_USER user Authentication :param username: user's username for remote auth :type self: User model """ user = self.find_user(username=username) # User does not exist, create one if auto user registration. if user is None and self.auth_user_registration: user = self.add_user( # All we have is REMOTE_USER, so we set # the other fields to blank. username=username, first_name=username, last_name="-", email=username + "@email.notfound", role=self.find_role(self.auth_user_registration_role), ) # If user does not exist on the DB and not auto user registration, # or user is inactive, go away. elif user is None or (not user.is_active): log.info(LOGMSG_WAR_SEC_LOGIN_FAILED.format(username)) return None self.update_user_auth_stat(user) return user
[docs] def auth_user_oauth(self, userinfo): """ OAuth user Authentication :userinfo: dict with user information the keys have the same name as User model columns. """ if "username" in userinfo: user = self.find_user(username=userinfo["username"]) elif "email" in userinfo: user = self.find_user(email=userinfo["email"]) else: log.error("User info does not have username or email {0}".format(userinfo)) return None # User is disabled if user and not user.is_active: log.info(LOGMSG_WAR_SEC_LOGIN_FAILED.format(userinfo)) return None # If user does not exist on the DB and not self user registration, go away if not user and not self.auth_user_registration: return None # User does not exist, create one if self registration. if not user: role_name = self.auth_user_registration_role if self.auth_user_registration_role_jmespath: import jmespath role_name = jmespath.search( self.auth_user_registration_role_jmespath, userinfo ) user = self.add_user( username=userinfo["username"], first_name=userinfo.get("first_name", ""), last_name=userinfo.get("last_name", ""), email=userinfo.get("email", ""), role=self.find_role(role_name), ) if not user: log.error("Error creating a new OAuth user %s" % userinfo["username"]) return None self.update_user_auth_stat(user) return user
""" ---------------------------------------- PERMISSION ACCESS CHECK ---------------------------------------- """
[docs] def is_item_public(self, permission_name, view_name): """ Check if view has public permissions :param permission_name: the permission: can_show, can_edit... :param view_name: the name of the class view (child of BaseView) """ permissions = self.get_public_permissions() if permissions: for i in permissions: if (view_name == i.view_menu.name) and ( permission_name == i.permission.name ): return True return False else: return False
def _has_access_builtin_roles( self, role, permission_name: str, view_name: str ) -> bool: """ Checks permission on builtin role """ builtin_pvms = self.builtin_roles.get(role.name, []) for pvm in builtin_pvms: _view_name = pvm[0] _permission_name = pvm[1] if re.match(_view_name, view_name) and re.match( _permission_name, permission_name ): return True return False def _has_view_access( self, user: object, permission_name: str, view_name: str ) -> bool: roles = user.roles db_role_ids = list() # First check against builtin (statically configured) roles # because no database query is needed for role in roles: if role.name in self.builtin_roles: if self._has_access_builtin_roles(role, permission_name, view_name): return True else: db_role_ids.append(role.id) # If it's not a builtin role check against database store roles return self.exist_permission_on_roles(view_name, permission_name, db_role_ids) def _get_user_permission_view_menus( self, user: object, permission_name: str, view_menus_name: List[str] ) -> Set[str]: """ Return a set of view menu names with a certain permission name that a user has access to. Mainly used to fetch all menu permissions on a single db call, will also check public permissions and builtin roles """ db_role_ids = list() if user is None: # include public role roles = [self.get_public_role()] else: roles = user.roles # First check against builtin (statically configured) roles # because no database query is needed result = set() for role in roles: if role.name in self.builtin_roles: for view_menu_name in view_menus_name: if self._has_access_builtin_roles( role, permission_name, view_menu_name ): result.add(view_menu_name) else: db_role_ids.append(role.id) # Then check against database-stored roles pvms_names = [ pvm.view_menu.name for pvm in self.find_roles_permission_view_menus( permission_name, db_role_ids ) ] result.update(pvms_names) return result
[docs] def has_access(self, permission_name, view_name): """ Check if current user or public has access to view or menu """ if current_user.is_authenticated: return self._has_view_access(g.user, permission_name, view_name) elif current_user_jwt: return self._has_view_access(current_user_jwt, permission_name, view_name) else: return self.is_item_public(permission_name, view_name)
def get_user_menu_access(self, menu_names: List[str] = None) -> Set[str]: if current_user.is_authenticated: return self._get_user_permission_view_menus( g.user, "menu_access", view_menus_name=menu_names ) elif current_user_jwt: return self._get_user_permission_view_menus( current_user_jwt, "menu_access", view_menus_name=menu_names ) else: return self._get_user_permission_view_menus( None, "menu_access", view_menus_name=menu_names )
[docs] def add_permissions_view(self, base_permissions, view_menu): """ Adds a permission on a view menu to the backend :param base_permissions: list of permissions from view (all exposed methods): 'can_add','can_edit' etc... :param view_menu: name of the view or menu to add """ view_menu_db = self.add_view_menu(view_menu) perm_views = self.find_permissions_view_menu(view_menu_db) if not perm_views: # No permissions yet on this view for permission in base_permissions: pv = self.add_permission_view_menu(permission, view_menu) if self.auth_role_admin not in self.builtin_roles: role_admin = self.find_role(self.auth_role_admin) self.add_permission_role(role_admin, pv) else: # Permissions on this view exist but.... role_admin = self.find_role(self.auth_role_admin) for permission in base_permissions: # Check if base view permissions exist if not self.exist_permission_on_views(perm_views, permission): pv = self.add_permission_view_menu(permission, view_menu) if self.auth_role_admin not in self.builtin_roles: self.add_permission_role(role_admin, pv) for perm_view in perm_views: if perm_view.permission is None: # Skip this perm_view, it has a null permission continue if perm_view.permission.name not in base_permissions: # perm to delete roles = self.get_all_roles() perm = self.find_permission(perm_view.permission.name) # del permission from all roles for role in roles: self.del_permission_role(role, perm) self.del_permission_view_menu(perm_view.permission.name, view_menu) elif ( self.auth_role_admin not in self.builtin_roles and perm_view not in role_admin.permissions ): # Role Admin must have all permissions self.add_permission_role(role_admin, perm_view)
[docs] def add_permissions_menu(self, view_menu_name): """ Adds menu_access to menu on permission_view_menu :param view_menu_name: The menu name """ self.add_view_menu(view_menu_name) pv = self.find_permission_view_menu("menu_access", view_menu_name) if not pv: pv = self.add_permission_view_menu("menu_access", view_menu_name) if self.auth_role_admin not in self.builtin_roles: role_admin = self.find_role(self.auth_role_admin) self.add_permission_role(role_admin, pv)
[docs] def security_cleanup(self, baseviews, menus): """ Will cleanup all unused permissions from the database :param baseviews: A list of BaseViews class :param menus: Menu class """ viewsmenus = self.get_all_view_menu() roles = self.get_all_roles() for viewmenu in viewsmenus: found = False for baseview in baseviews: if viewmenu.name == baseview.class_permission_name: found = True break if menus.find(viewmenu.name): found = True if not found: permissions = self.find_permissions_view_menu(viewmenu) for permission in permissions: for role in roles: self.del_permission_role(role, permission) self.del_permission_view_menu( permission.permission.name, viewmenu.name ) self.del_view_menu(viewmenu.name) self.security_converge(baseviews, menus)
@staticmethod def _get_new_old_permissions(baseview) -> Dict: ret = dict() for method_name, permission_name in baseview.method_permission_name.items(): old_permission_name = baseview.previous_method_permission_name.get( method_name ) # Actions do not get prefix when normally defined if hasattr(baseview, "actions") and baseview.actions.get( old_permission_name ): permission_prefix = "" else: permission_prefix = PERMISSION_PREFIX if old_permission_name: if PERMISSION_PREFIX + permission_name not in ret: ret[PERMISSION_PREFIX + permission_name] = { permission_prefix + old_permission_name } else: ret[PERMISSION_PREFIX + permission_name].add( permission_prefix + old_permission_name ) return ret @staticmethod def _add_state_transition( state_transition: Dict, old_view_name: str, old_perm_name: str, view_name: str, perm_name: str, ) -> None: old_pvm = state_transition["add"].get((old_view_name, old_perm_name)) if old_pvm: state_transition["add"][(old_view_name, old_perm_name)].add( (view_name, perm_name) ) else: state_transition["add"][(old_view_name, old_perm_name)] = { (view_name, perm_name) } state_transition["del_role_pvm"].add((old_view_name, old_perm_name)) state_transition["del_views"].add(old_view_name) state_transition["del_perms"].add(old_perm_name) @staticmethod def _update_del_transitions(state_transitions: Dict, baseviews: List) -> None: """ Mutates state_transitions, loop baseviews and prunes all views and permissions that are not to delete because references exist. :param baseview: :param state_transitions: :return: """ for baseview in baseviews: state_transitions["del_views"].discard(baseview.class_permission_name) for permission in baseview.base_permissions: state_transitions["del_role_pvm"].discard( (baseview.class_permission_name, permission) ) state_transitions["del_perms"].discard(permission)
[docs] def create_state_transitions(self, baseviews: List, menus: List) -> Dict: """ Creates a Dict with all the necessary vm/permission transitions Dict: { "add": {(<VM>, <PERM>): ((<VM>, PERM), ... )} "del_role_pvm": ((<VM>, <PERM>), ...) "del_views": (<VM>, ... ) "del_perms": (<PERM>, ... ) } :param baseviews: List with all the registered BaseView, BaseApi :param menus: List with all the menu entries :return: Dict with state transitions """ state_transitions = { "add": {}, "del_role_pvm": set(), "del_views": set(), "del_perms": set(), } for baseview in baseviews: add_all_flag = False new_view_name = baseview.class_permission_name permission_mapping = self._get_new_old_permissions(baseview) if baseview.previous_class_permission_name: old_view_name = baseview.previous_class_permission_name add_all_flag = True else: new_view_name = baseview.class_permission_name old_view_name = new_view_name for new_perm_name in baseview.base_permissions: if add_all_flag: old_perm_names = permission_mapping.get(new_perm_name) old_perm_names = old_perm_names or (new_perm_name,) for old_perm_name in old_perm_names: self._add_state_transition( state_transitions, old_view_name, old_perm_name, new_view_name, new_perm_name, ) else: old_perm_names = permission_mapping.get(new_perm_name) or set() for old_perm_name in old_perm_names: self._add_state_transition( state_transitions, old_view_name, old_perm_name, new_view_name, new_perm_name, ) self._update_del_transitions(state_transitions, baseviews) return state_transitions
[docs] def security_converge(self, baseviews: List, menus: List, dry=False) -> Dict: """ Converges overridden permissions on all registered views/api will compute all necessary operations from `class_permissions_name`, `previous_class_permission_name`, method_permission_name`, `previous_method_permission_name` class attributes. :param baseviews: List of registered views/apis :param menus: List of menu items :param dry: If True will not change DB :return: Dict with the necessary operations (state_transitions) """ state_transitions = self.create_state_transitions(baseviews, menus) if dry: return state_transitions if not state_transitions: log.info("No state transitions found") return dict() log.debug(f"State transitions: {state_transitions}") roles = self.get_all_roles() for role in roles: permissions = list(role.permissions) for pvm in permissions: new_pvm_states = state_transitions["add"].get( (pvm.view_menu.name, pvm.permission.name) ) if not new_pvm_states: continue for new_pvm_state in new_pvm_states: new_pvm = self.add_permission_view_menu( new_pvm_state[1], new_pvm_state[0] ) self.add_permission_role(role, new_pvm) if (pvm.view_menu.name, pvm.permission.name) in state_transitions[ "del_role_pvm" ]: self.del_permission_role(role, pvm) for pvm in state_transitions["del_role_pvm"]: self.del_permission_view_menu(pvm[1], pvm[0], cascade=False) for view_name in state_transitions["del_views"]: self.del_view_menu(view_name) for permission_name in state_transitions["del_perms"]: self.del_permission(permission_name) return state_transitions
""" --------------------------- INTERFACE ABSTRACT METHODS --------------------------- --------------------- PRIMITIVES FOR USERS ---------------------- """
[docs] def find_register_user(self, registration_hash): """ Generic function to return user registration """ raise NotImplementedError
[docs] def add_register_user( self, username, first_name, last_name, email, password="", hashed_password="" ): """ Generic function to add user registration """ raise NotImplementedError
[docs] def del_register_user(self, register_user): """ Generic function to delete user registration """ raise NotImplementedError
[docs] def get_user_by_id(self, pk): """ Generic function to return user by it's id (pk) """ raise NotImplementedError
[docs] def find_user(self, username=None, email=None): """ Generic function find a user by it's username or email """ raise NotImplementedError
[docs] def get_all_users(self): """ Generic function that returns all exsiting users """ raise NotImplementedError
[docs] def add_user(self, username, first_name, last_name, email, role, password=""): """ Generic function to create user """ raise NotImplementedError
[docs] def update_user(self, user): """ Generic function to update user :param user: User model to update to database """ raise NotImplementedError
[docs] def count_users(self): """ Generic function to count the existing users """ raise NotImplementedError
""" ---------------------- PRIMITIVES FOR ROLES ---------------------- """ def find_role(self, name): raise NotImplementedError def add_role(self, name): raise NotImplementedError def update_role(self, pk, name): raise NotImplementedError def get_all_roles(self): raise NotImplementedError """ ---------------------------- PRIMITIVES FOR PERMISSIONS ---------------------------- """
[docs] def get_public_role(self): """ returns all permissions from public role """ raise NotImplementedError
[docs] def get_public_permissions(self): """ returns all permissions from public role """ raise NotImplementedError
[docs] def find_permission(self, name): """ Finds and returns a Permission by name """ raise NotImplementedError
def find_roles_permission_view_menus( self, permission_name: str, role_ids: List[int] ): raise NotImplementedError
[docs] def exist_permission_on_roles( self, view_name: str, permission_name: str, role_ids: List[int] ) -> bool: """ Finds and returns permission views for a group of roles """ raise NotImplementedError
[docs] def add_permission(self, name): """ Adds a permission to the backend, model permission :param name: name of the permission: 'can_add','can_edit' etc... """ raise NotImplementedError
[docs] def del_permission(self, name): """ Deletes a permission from the backend, model permission :param name: name of the permission: 'can_add','can_edit' etc... """ raise NotImplementedError
""" ---------------------- PRIMITIVES VIEW MENU ---------------------- """
[docs] def find_view_menu(self, name): """ Finds and returns a ViewMenu by name """ raise NotImplementedError
def get_all_view_menu(self): raise NotImplementedError
[docs] def add_view_menu(self, name): """ Adds a view or menu to the backend, model view_menu param name: name of the view menu to add """ raise NotImplementedError
[docs] def del_view_menu(self, name): """ Deletes a ViewMenu from the backend :param name: name of the ViewMenu """ raise NotImplementedError
""" ---------------------- PERMISSION VIEW MENU ---------------------- """
[docs] def find_permission_view_menu(self, permission_name, view_menu_name): """ Finds and returns a PermissionView by names """ raise NotImplementedError
[docs] def find_permissions_view_menu(self, view_menu): """ Finds all permissions from ViewMenu, returns list of PermissionView :param view_menu: ViewMenu object :return: list of PermissionView objects """ raise NotImplementedError
[docs] def add_permission_view_menu(self, permission_name, view_menu_name): """ Adds a permission on a view or menu to the backend :param permission_name: name of the permission to add: 'can_add','can_edit' etc... :param view_menu_name: name of the view menu to add """ raise NotImplementedError
def del_permission_view_menu(self, permission_name, view_menu_name, cascade=True): raise NotImplementedError def exist_permission_on_views(self, lst, item): raise NotImplementedError def exist_permission_on_view(self, lst, permission, view_menu): raise NotImplementedError
[docs] def add_permission_role(self, role, perm_view): """ Add permission-ViewMenu object to Role :param role: The role object :param perm_view: The PermissionViewMenu object """ raise NotImplementedError
[docs] def del_permission_role(self, role, perm_view): """ Remove permission-ViewMenu object to Role :param role: The role object :param perm_view: The PermissionViewMenu object """ raise NotImplementedError
def load_user(self, pk): return self.get_user_by_id(int(pk)) def load_user_jwt(self, pk): user = self.load_user(pk) # Set flask g.user to JWT user, we can't do it on before request g.user = user return user @staticmethod def before_request(): g.user = current_user