Source code for flask_unchained.bundles.celery.extensions.celery
"""
code adapted from:
https://stackoverflow.com/questions/12044776/how-to-use-flask-sqlalchemy-in-a-celery-task
"""
import flask
from celery import Celery as BaseCelery
from dill import dumps as dill_dumps, load as dill_load
from kombu.serialization import pickle_loads, pickle_protocol, registry
from kombu.utils.encoding import str_to_bytes
[docs]class Celery(BaseCelery):
"""
The `Celery` extension::
from flask_unchained.bundles.celery import celery
"""
def __init__(self, *args, **kwargs):
self._register_dill()
super().__init__(*args, **kwargs)
self.override_task_class()
def override_task_class(self):
BaseTask = self.Task
_celery = self
class ContextTask(BaseTask):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return BaseTask.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return BaseTask.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.main = app.import_name
self.__autoset('broker_url', app.config.CELERY_BROKER_URL)
self.__autoset('result_backend', app.config.CELERY_RESULT_BACKEND)
self.config_from_object(app.config)
# we don't use self.autodiscover_tasks here, preferring instead to allow the
# DiscoverTasksHook to discover tasks. This way allows for bundles to define
# what module their tasks are located in (and in a consistent way with how it
# works for the rest of Flask Unchained)
def _register_dill(self):
def encode(obj, dumper=dill_dumps):
return dumper(obj, protocol=pickle_protocol)
def decode(s):
return pickle_loads(str_to_bytes(s), load=dill_load)
registry.register(
name='dill',
encoder=encode,
decoder=decode,
content_type='application/x-python-serialize',
content_encoding='binary'
)
# the same as upstream, but we need to copy it here so we can access it
def __autoset(self, key, value):
if value:
self._preconf[key] = value
self._preconf_set_by_auto.add(key)