Source code for flask_unchained.bundles.celery.tasks

from flask import current_app

from .extensions import celery
from ..mail.extensions import mail
from ..mail.utils import make_message


def _send_mail_async(subject_or_message=None, to=None, template=None, **kwargs):
    subject_or_message = subject_or_message or kwargs.pop('subject')
    if current_app and current_app.testing:
        return async_mail_task.apply([subject_or_message, to, template], kwargs)
    return async_mail_task.delay(subject_or_message, to, template, **kwargs)


@celery.task(serializer='dill')
def async_mail_task(subject_or_message, to=None, template=None, **kwargs):
    """
    Celery task to send emails asynchronously using the mail bundle.
    """
    to = to or kwargs.pop('recipients', [])
    msg = make_message(subject_or_message, to, template, **kwargs)
    with mail.connect() as connection:
        connection.send(msg)