其实我只是想把邮件发送这个动作移到Celery中执行。
既然用到了Celery,那么每次发邮件都单独开一个线程似乎有点多余,异步任务还是交给Celery吧。
在Flask应用中集成Celery
Celery和Flask一起使用并没有什么不和谐的地方,都可以不用定制的Flask扩展,按照网上随处可见的示例也很简单:
- from flask import Flask
- from celery import Celery
-
- app = Flask(__name__)
- app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
- app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
-
- celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
- celery.conf.update(app.config)
-
- @celery.task
- def send_email():
- ....
然而,稍微上点规模的Flask应用都会使用Factory模式(中文叫工厂函数,我听着特别扭),即只有在创建Flask实例时,才会初始化各种扩展,这样可以动态的修改扩展程序的配置。比如你有一套线上部署的配置和一套本地开发测试的配置,希望通过不同的启动入口,就使用不同的配置。
使用Factory模式的话,上面的代码大概要修改成这个样:
- from flask import Flask
- from celery import Celery
-
- app = Flask(__name__)
- celery = Celery()
-
- def create_app(config_name):
- app.config.from_object(config[config_name])
- celery.conf.update(app.config)
通过config_name
,来动态调整celery的配置。然而,这样子是不行的!
Celery的__init__()
函数会调用celery._state._register_app()
直接就通过传入的配置生成了Celery实例,上面的代码中,celery = Celery()
直接使用默认的amqp作为了broker,随后通过celery.conf.update(app.config)
是更改不了broker的。这也就是为什么网上的示例代码中,在定义Celery实例时,就传入了broker=app.config['CELERY_BROKER_URL']
,而不是之后通过celery.conf.update(app.config)
传入。当你的多套配置文件中,broker设置的不同时,就悲剧了。
当然不用自己造轮子,Flask-Celery-Helper就是解决以上问题的FLask扩展。
看看它的__init__()
函数:
- def __init__(self, app=None):
- """If app argument provided then initialize celery using application config values.
- If no app argument provided you should do initialization later with init_app method.
- :param app: Flask application instance.
- """
- self.original_register_app = _state._register_app # Backup Celery app registration function.
- _state._register_app = lambda _: None # Upon Celery app registration attempt, do nothing.
- super(Celery, self).__init__()
- if app is not None:
- self.init_app(app)
将_state._register_app
函数备份,再置为空。这样__init__()
就不会创建Celery实例了。但如果指定了app
,那么进入init_app
,嗯,大多数Flask扩展都有这个函数,用来动态生成扩展实例。
- def init_app(self, app):
- """Actual method to read celery settings from app configuration and initialize the celery instance.
- :param app: Flask application instance.
- """
- _state._register_app = self.original_register_app # Restore Celery app registration function.
- if not hasattr(app, 'extensions'):
- app.extensions = dict()
- if 'celery' in app.extensions:
- raise ValueError('Already registered extension CELERY.')
- app.extensions['celery'] = _CeleryState(self, app)
-
- # Instantiate celery and read config.
- super(Celery, self).__init__(app.import_name, broker=app.config['CELERY_BROKER_URL'])
- ...
将_state._register_app
函数还原,再执行Celery原本的__init__
。这样就达到动态生成实例的目的了。接着往下看:
- task_base = self.Task
-
- # Add Flask app context to celery instance.
- class ContextTask(task_base):
- """Celery instance wrapped within the Flask app context."""
- def __call__(self, *_args, **_kwargs):
- with app.app_context():
- return task_base.__call__(self, *_args, **_kwargs)
- setattr(ContextTask, 'abstract', True)
- setattr(self, 'Task', ContextTask)
这里重载了celery.Task
类,通过with app.app_context():
,在app.app_context()
的上下文环境下执行Task。对于一个已生成的Flask实例,应用上下文不会随便改变。所以这就现实了在Celery中使用Flask的应用上下文。
下面是官方的示例代码:
- # extensions.py
- from flask_celery import Celery
- celery = Celery()
-
- # application.py
- from flask import Flask
- from extensions import celery
-
- def create_app():
- app = Flask(__name__)
- app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
- app.config['CELERY_BROKER_URL'] = 'redis://localhost'
- app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
- celery.init_app(app)
- return app
-
- # tasks.py
- from extensions import celery
-
- @celery.task()
- def add_together(a, b):
- return a + b
-
- # manage.py
- from application import create_app
- app = create_app()
- app.run()
跟普通的Flask扩展一样了。
Celery中使用Flask上下文
在Flask的view函数中调用task.delay()
时,这个task相当于一个离线的异步任务,它对Flask的应用上下文和请求上下文一无所知。但是这都可能是异步任务需要用到的。比如发送邮件要用到的render_template
和url_for
就分别要用到应用上下文和请求上下文。不在celery中引入它们的话,就是Running code outside of a request
。
引入应用上下文的工作Flask-Celery-Helper已经帮我们做好了,在Flask的文档中也有相关介绍。实现方法和上面Flask-Celery-Helper的一样。然而,不管是Flask-Celery-Helper还是Flask文档,都没有提及如何在Celery中使用请求上下文。
要引入请求上下文,需要考虑这两个问题:
如何在Celery中产生请求上下文。Flask中有
request_context
和test_request_context
可以产生请求上下文。区别是request_context
需要WSGI环境变量environ
,而test_request_context
根据传入的参数生成请求上下文。我没有找到如何在Celery中获取到WSGI环境变量的方法,所以只能自己传入相关参数生成请求上下文了。请求上下文是随HTTP请求产生的,要获取请求上下文,就必须在view函数中处理,view函数通过
task.delay()
发送Celery任务。所以需要重载task.delay()
,以获取请求上下文。
具体的思路还是在init_app
中重载celery.Task
类,通过with app.test_request_context():
,在app.test_request_context()
的上下文环境下执行Task。
首先获取request,从中整理出test_request_context()需要的参数。根据test_request_context
的函数注释,它需要的参数和werkzeug.test.EnvironBuilder类的参数一样。
- CONTEXT_ARG_NAME = '_flask_request_context'
- def _include_request_context(self, kwargs):
- """Includes all the information about current Flask request context
- as an additional argument to the task.
- """
- if not has_request_context():
- return
-
- # keys correspond to arguments of :meth:`Flask.test_request_context`
- context = {
- 'path': request.path,
- 'base_url': request.url_root,
- 'method': request.method,
- 'headers': dict(request.headers),
- 'data': request.form
- }
- if '?' in request.url:
- context['query_string'] = request.url[(request.url.find('?') + 1):]
-
- kwargs[self.CONTEXT_ARG_NAME] = context
_include_request_context
函数从request中提取path
,base_url
,method
,headers
,data
,query_string
。将他们传入test_request_context
,生成伪造的请求上下文可以覆盖大多数的使用情况。
Celery通过apply_async
,apply
,retry
调用异步任务(delay
是apply_async
的简化方法)。这里需要重载它们,让这些函数获取request:
- def apply_async(self, args=None, kwargs=None, **rest):
- self._include_request_context(kwargs)
- return super(ContextTask, self).apply_async(args, kwargs, **rest)
-
- def apply(self, args=None, kwargs=None, **rest):
- self._include_request_context(kwargs)
- return super(ContextTask, self).apply(args, kwargs, **rest)
-
- def retry(self, args=None, kwargs=None, **rest):
- self._include_request_context(kwargs)
- return super(ContextTask, self).retry(args, kwargs, **rest)
最后重载celery.Task
的__call__
方法:
- def __call__(self, *args, **kwargs):
- """Execute task code with given arguments."""
- call = lambda: super(ContextTask, self).__call__(*args, **kwargs)
-
- context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
- if context is None or has_request_context():
- return call()
-
- with app.test_request_context(**context):
- result = call()
-
- # process a fake "Response" so that
- # ``@after_request`` hooks are executed
- app.process_response(make_response(result or ''))
-
- return result
context是我们从request中获取的参数,将它传给test_request_context,伪造请求上下文,并在这个上下文环境中执行task。既然伪造了请求,那也得为这个假请求生成响应,万一你定义了after_request
这个在响应后执行的钩子呢?通过process_response
就可以激活after_request
。
注意这里并没有传入应用上下文,因为Flask在创建请求上下文时,会判断应用上下文是否为空,为空就先创建应用上下文,再创建请求上下文。
完整代码在这里。celery = CeleryWithContext()
创建的Celery实例就可以给各种task使用了。
另外创建一个celery_worker.py文件,生成一个Flask实例,供Celery的worker使用。
- # celery_worker.py
-
- #!/usr/bin/env python
- from app import create_app
- from app.extensions import celery
-
- app = create_app()
启动worker:celery -A celery_worker.celery worker -l info
这下就可以使用Celery发邮件了。唉,还真是麻烦。
reference
http://xion.io/post/code/celery-include-flask-request-context.html