from flask import current_app as app, request
from flask_unchained import Controller, route, lazy_gettext as _
from flask_unchained import injectable
from flask_unchained.bundles.sqlalchemy import SessionManager
from http import HTTPStatus
from werkzeug.datastructures import MultiDict
from ..decorators import anonymous_user_required, auth_required
from ..exceptions import AuthenticationError
from ..extensions import Security
from ..services import SecurityService, SecurityUtilsService
from ..utils import current_user
[docs]class SecurityController(Controller):
"""
The controller for the security bundle.
"""
security: Security = injectable
security_service: SecurityService = injectable
security_utils_service: SecurityUtilsService = injectable
session_manager: SessionManager = injectable
[docs] @route(only_if=False)
@auth_required()
def check_auth_token(self):
"""
View function to check a token, and if it's valid, log the user in.
Disabled by default; must be explicitly enabled in your ``routes.py``.
"""
# the auth_required decorator verifies the token and sets current_user,
# just need to return a success response
return self.jsonify({'user': current_user})
[docs] @route(methods=['GET', 'POST'])
@anonymous_user_required(msg='You are already logged in', category='success')
def login(self):
"""
View function to log a user in. Supports html and json requests.
"""
form = self._get_form('SECURITY_LOGIN_FORM')
if form.validate_on_submit():
try:
self.security_service.login_user(form.user, form.remember.data)
except AuthenticationError as e:
form.errors = {'_error': [str(e)]}
else:
self.after_this_request(self._commit)
if request.is_json:
return self.jsonify({'token': form.user.get_auth_token(),
'user': form.user})
self.flash(_('flask_unchained.bundles.security:flash.login'),
category='success')
return self.redirect('SECURITY_POST_LOGIN_REDIRECT_ENDPOINT')
elif form.errors:
# FIXME-identity
identity_attrs = app.config.SECURITY_USER_IDENTITY_ATTRIBUTES
msg = f"Invalid {', '.join(identity_attrs)} and/or password."
# we just want a single top-level form error
form.errors = {'_error': [msg]}
for field in form._fields.values():
field.errors = None
if form.errors and request.is_json:
return self.jsonify({'error': form.errors.get('_error')[0]},
code=HTTPStatus.UNAUTHORIZED)
return self.render('login',
login_user_form=form,
**self.security.run_ctx_processor('login'))
[docs] @route()
def logout(self):
"""
View function to log a user out. Supports html and json requests.
"""
if current_user.is_authenticated:
self.security_service.logout_user()
if request.is_json:
return '', HTTPStatus.NO_CONTENT
self.flash(_('flask_unchained.bundles.security:flash.logout'),
category='success')
return self.redirect('SECURITY_POST_LOGOUT_REDIRECT_ENDPOINT')
[docs] @route(methods=['GET', 'POST'],
only_if=lambda app: app.config.SECURITY_REGISTERABLE)
@anonymous_user_required
def register(self):
"""
View function to register user. Supports html and json requests.
"""
form = self._get_form('SECURITY_REGISTER_FORM')
if form.validate_on_submit():
user = self.security_service.user_manager.create(**form.to_dict())
self.security_service.register_user(user)
if request.is_json:
return '', HTTPStatus.NO_CONTENT
return self.redirect('SECURITY_POST_REGISTER_REDIRECT_ENDPOINT')
elif form.errors and request.is_json:
return self.errors(form.errors)
return self.render('register',
register_user_form=form,
**self.security.run_ctx_processor('register'))
[docs] @route(methods=['GET', 'POST'],
only_if=lambda app: app.config.SECURITY_CONFIRMABLE)
def send_confirmation_email(self):
"""
View function which sends confirmation token and instructions to a user.
"""
form = self._get_form('SECURITY_SEND_CONFIRMATION_FORM')
if form.validate_on_submit():
self.security_service.send_email_confirmation_instructions(form.user)
self.flash(_('flask_unchained.bundles.security:flash.confirmation_request',
email=form.user.email), category='info')
if request.is_json:
return '', HTTPStatus.NO_CONTENT
return self.redirect('send_confirmation_email')
elif form.errors and request.is_json:
return self.errors(form.errors)
return self.render('send_confirmation_email',
send_confirmation_form=form,
**self.security.run_ctx_processor('send_confirmation_email'))
[docs] @route('/confirm/<token>',
only_if=lambda app: app.config.SECURITY_CONFIRMABLE)
def confirm_email(self, token):
"""
View function to confirm a user's token from the confirmation email send to them.
Supports html and json requests.
"""
expired, invalid, user = \
self.security_utils_service.confirm_email_token_status(token)
if not user or invalid:
invalid = True
self.flash(
_('flask_unchained.bundles.security:flash.invalid_confirmation_token'),
category='error')
already_confirmed = user is not None and user.confirmed_at is not None
if expired and not already_confirmed:
self.security_service.send_email_confirmation_instructions(user)
self.flash(_('flask_unchained.bundles.security:flash.confirmation_expired',
email=user.email,
within=app.config.SECURITY_CONFIRM_EMAIL_WITHIN),
category='error')
if invalid or (expired and not already_confirmed):
return self.redirect('SECURITY_CONFIRM_ERROR_REDIRECT_ENDPOINT',
'security_controller.send_confirmation_email')
if self.security_service.confirm_user(user):
self.after_this_request(self._commit)
self.flash(_('flask_unchained.bundles.security:flash.email_confirmed'),
category='success')
else:
self.flash(_('flask_unchained.bundles.security:flash.already_confirmed'),
category='info')
if user != current_user:
self.security_service.logout_user()
self.security_service.login_user(user)
return self.redirect('SECURITY_POST_CONFIRM_REDIRECT_ENDPOINT',
'SECURITY_POST_LOGIN_REDIRECT_ENDPOINT')
[docs] @route(methods=['GET', 'POST'],
only_if=lambda app: app.config.SECURITY_RECOVERABLE)
@anonymous_user_required(msg='You are already logged in',
category='success')
def forgot_password(self):
"""
View function to request a password recovery email with a reset token.
Supports html and json requests.
"""
form = self._get_form('SECURITY_FORGOT_PASSWORD_FORM')
if form.validate_on_submit():
self.security_service.send_reset_password_instructions(form.user)
self.flash(_('flask_unchained.bundles.security:flash.password_reset_request',
email=form.user.email),
category='info')
if request.is_json:
return '', HTTPStatus.NO_CONTENT
return self.redirect('forgot_password')
elif form.errors and request.is_json:
return self.errors(form.errors)
return self.render('forgot_password',
forgot_password_form=form,
**self.security.run_ctx_processor('forgot_password'))
[docs] @route('/reset-password/<string:token>', methods=['GET', 'POST'],
only_if=lambda app: app.config.SECURITY_RECOVERABLE)
@anonymous_user_required
def reset_password(self, token):
"""
View function verify a users reset password token from the email we sent to them.
It also handles the form for them to set a new password.
Supports html and json requests.
"""
expired, invalid, user = \
self.security_utils_service.reset_password_token_status(token)
if invalid:
self.flash(
_('flask_unchained.bundles.security:flash.invalid_reset_password_token'),
category='error')
return self.redirect('SECURITY_INVALID_RESET_TOKEN_REDIRECT')
elif expired:
self.security_service.send_reset_password_instructions(user)
self.flash(_('flask_unchained.bundles.security:flash.password_reset_expired',
email=user.email,
within=app.config.SECURITY_RESET_PASSWORD_WITHIN),
category='error')
return self.redirect('SECURITY_EXPIRED_RESET_TOKEN_REDIRECT')
spa_redirect = app.config.SECURITY_API_RESET_PASSWORD_HTTP_GET_REDIRECT
if request.method == 'GET' and spa_redirect:
return self.redirect(spa_redirect, token=token, _external=True)
form = self._get_form('SECURITY_RESET_PASSWORD_FORM')
if form.validate_on_submit():
self.security_service.reset_password(user, form.password.data)
self.security_service.login_user(user)
self.after_this_request(self._commit)
self.flash(_('flask_unchained.bundles.security:flash.password_reset'),
category='success')
if request.is_json:
return self.jsonify({'token': user.get_auth_token(),
'user': user})
return self.redirect('SECURITY_POST_RESET_REDIRECT_ENDPOINT',
'SECURITY_POST_LOGIN_REDIRECT_ENDPOINT')
elif form.errors and request.is_json:
return self.errors(form.errors)
return self.render('reset_password',
reset_password_form=form,
reset_password_token=token,
**self.security.run_ctx_processor('reset_password'))
[docs] @route(methods=['GET', 'POST'],
only_if=lambda app: app.config.SECURITY_CHANGEABLE)
@auth_required
def change_password(self):
"""
View function for a user to change their password.
Supports html and json requests.
"""
form = self._get_form('SECURITY_CHANGE_PASSWORD_FORM')
if form.validate_on_submit():
self.security_service.change_password(
current_user._get_current_object(),
form.new_password.data)
self.after_this_request(self._commit)
self.flash(_('flask_unchained.bundles.security:flash.password_change'),
category='success')
if request.is_json:
return self.jsonify({'token': current_user.get_auth_token()})
return self.redirect('SECURITY_POST_CHANGE_REDIRECT_ENDPOINT',
'SECURITY_POST_LOGIN_REDIRECT_ENDPOINT')
elif form.errors and request.is_json:
return self.errors(form.errors)
return self.render('change_password',
change_password_form=form,
**self.security.run_ctx_processor('change_password'))
def _get_form(self, name):
form_cls = app.config.get(name)
if request.is_json:
return form_cls(MultiDict(request.get_json()))
return form_cls(request.form)
def _commit(self, response=None):
self.session_manager.commit()
return response