Source code for flask_unchained.bundles.security.decorators.auth_required

from flask import _request_ctx_stack, current_app, request
from flask_principal import Identity, identity_changed
from flask_unchained import unchained
from functools import wraps

from .roles_accepted import roles_accepted
from .roles_required import roles_required
from ..utils import current_user

security = unchained.get_local_proxy('security')


[docs]def auth_required(decorated_fn=None, **role_rules): """ Decorator for requiring an authenticated user, optionally with roles. Roles are passed as keyword arguments, like so:: @auth_required(role='REQUIRE_THIS_ONE_ROLE') @auth_required(roles=['REQUIRE', 'ALL', 'OF', 'THESE', 'ROLES']) @auth_required(one_of=['EITHER_THIS_ROLE', 'OR_THIS_ONE']) Either of the `role` or `roles` kwargs can also be combined with one_of:: @auth_required(role='REQUIRED', one_of=['THIS', 'OR_THIS']) Aborts with ``HTTP 401: Unauthorized`` if no user is logged in, or ``HTTP 403: Forbidden`` if any of the specified role checks fail. """ required_roles = [] one_of_roles = [] if not (decorated_fn and callable(decorated_fn)): if 'role' in role_rules and 'roles' in role_rules: raise RuntimeError('specify only one of `role` or `roles` kwargs') elif 'role' in role_rules: required_roles = [role_rules['role']] elif 'roles' in role_rules: required_roles = role_rules['roles'] if 'one_of' in role_rules: one_of_roles = role_rules['one_of'] def wrapper(fn): @wraps(fn) @_auth_required() @roles_required(*required_roles) @roles_accepted(*one_of_roles) def decorated(*args, **kwargs): return fn(*args, **kwargs) return decorated if decorated_fn and callable(decorated_fn): return wrapper(decorated_fn) return wrapper
def _auth_required(): """ Decorator that protects endpoints through token and session auth mechanisms """ login_mechanisms = ( ('token', _check_token), ('session', lambda: current_user.is_authenticated), ) def wrapper(fn): @wraps(fn) def decorated_view(*args, **kwargs): for _, mechanism in login_mechanisms: if mechanism and mechanism(): return fn(*args, **kwargs) return security._unauthorized_callback() return decorated_view return wrapper def _check_token(): user = security.login_manager._request_callback(request) if user and user.is_authenticated: _request_ctx_stack.top.user = user identity_changed.send(current_app._get_current_object(), identity=Identity(user.id)) return True return False