赞
踩
一定结合自己的项目文件对应的模块来看, 关于项目名下方会有标注,本篇只介绍celery作为异步任务调用功能的调用,定时任务我用的Flask-APScheduler库
先介绍下我的模块划分
apps # 应用包
- app # 单个应用
- tasks.py # celery函数放在对应应用下
app.py # create_app函数放置点
manage.py # 使用flask-script启动, 启动文件
config.py # app配置文件
mycelery.py # celery继承,集成flask上下文等等
需要安装的库
Flask-Celery-Helper
celery==3.1.17
上代码,本篇不介绍celery原理/使用方法, 直接告诉你怎么写
# mycelery.py from flask_celery import Celery from flask import has_request_context, make_response, request class CeleryWithContext(Celery): def init_app(self, app): super(CeleryWithContext, self).init_app(app) task_base = self.Task class ContextTask(task_base): #: Name of the additional parameter passed to tasks #: that contains information about the original Flask request context. CONTEXT_ARG_NAME = '_flask_request_context' def __call__(self, *args, **kwargs): """Execute task code with given arguments.""" call = lambda: super(ContextTask, self).__call__(*args, **kwargs) context = kwargs.pop(self.CONTEXT_ARG_NAME, None) if context is None or has_request_context(): return call() with app.test_request_context(**context): result = call() # process a fake "Response" so that # ``@after_request`` hooks are executed app.process_response(make_response(result or '')) return result def apply_async(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply_async(args, kwargs, **rest) def apply(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply(args, kwargs, **rest) def retry(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).retry(args, kwargs, **rest) def _include_request_context(self, kwargs): """Includes all the information about current Flask request context as an additional argument to the task. """ if not has_request_context(): return # keys correspond to arguments of :meth:`Flask.test_request_context` context = { 'path': request.path, 'base_url': request.url_root, 'method': request.method, 'headers': dict(request.headers), 'data': request.form } if '?' in request.url: context['query_string'] = request.url[(request.url.find('?') + 1):] kwargs[self.CONTEXT_ARG_NAME] = context setattr(ContextTask, 'abstract', True) setattr(self, 'Task', ContextTask) celery = CeleryWithContext() # app.py from flask import Flask from mycelery import celery def create_app(config_name): app = Flask(config_name) app.config.from_object(config[config_name]) # 中间部分是你的其他库的初始化部分,省略 ... celery.init_app(app) return app # manage.py from mycelery import celery BASE_DIR = os.path.abspath(os.path.dirname(__file__)) apps = os.path.join(BASE_DIR, 'apps') sys.path.insert(0, apps) from flask_script import Manager from app import create_app app = create_app(ENV) manager = Manager(app) if __name__ == '__main__': manager.run() # config.py class Config(object): ... # 这块注意看 app.tasks app就对应着我的单个应用,tasks就是下面的文件. # 文件名也只能是tasks CELERY_IMPORTS = ('app.tasks',) CELERY_BROKER_URL = 'redis://localhost' CELERY_RESULT_BACKEND = 'redis://localhost' @staticmethod def init_app(app): pass config = { 'dev': Config } # tasks.py from mycelery import celery @celery.task() def add(a, b): # 并不是一定要写add,就是你封装的耗时比较久的任务 ... do something return str(a+b) # 调用 from app.tasks import add def func(a,b): ... add.delay(a,b) return
配置好后启动, 命令行输入
# manage 应用启动文件 就是初始化app的文件, celery是manage中的方法
celery -A manage.celery worker -l info
启动成功界面
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。