赞
踩
from flask import Flask, g from celery import Celery def make_celery(app): celery = Celery(app.import_name) celery.conf.update(app.config["CELERY_CONFIG"]) return celery flask_app = Flask(__name__) flask_app.config.update(CELERY_CONFIG={ 'broker_url': 'redis://localhost:6379', 'result_backend': 'redis://localhost:6379', }) celery = make_celery(flask_app) @celery.task() def add_together(a, b): # 在这里调用g会出现上下文错误 print(f'g.result in celery is: {g.result}') return a + b @flask_app.route("/calc_sum") def calc_sum(): g.result = 3 print(f'g.result in view is: {g.result}') task = add_together.apply_async((1, 2)) return {'task_id': task.id}
上述代码调用calc_sum接口时就会出现如下错误, 其原因是函数视图中的g并不会在其调用了异步任务后继续沿用
RuntimeError: Working outside of application context.
This typically means that you attempted to use functionality that needed
to interface with the current application object in some way. To solve
this, set up an application context with app.app_context(). See the
documentation for more information.
假设你已定义好你的flask_app & celery_app 并在视图中调用异步方法(以Flask官网给的flask+celery代码为例(可点击跳转)) ,下面例子中可以获取g对象,但是会产生AttributeError,原因是异步中的g并没有result这个属性
from flask import Flask, g from celery import Celery def make_celery(app): celery = Celery(app.import_name) celery.conf.update(app.config["CELERY_CONFIG"]) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery flask_app = Flask(__name__) flask_app.config.update(CELERY_CONFIG={ 'broker_url': 'redis://localhost:6379', 'result_backend': 'redis://localhost:6379', }) celery = make_celery(flask_app) @celery.task() def add_together(a, b): # 在这里调用g会出现属性错误 print(f'g.result in celery is: {g.result}') return a + b @flask_app.route("/calc_sum") def calc_sum(): g.result = 3 print(f'g.result in view is: {g.result}') task = add_together.apply_async((1, 2)) return {'task_id': task.id}
其实正常情况如果没有 with app.app_context() 来引用flask_app上下文的话,异步任务中连g都不存在,而引用上下文后视图和异步任务中并不是同一个g, 而在视图中执行异步任务时(apply_async或者delay)是拥有视图的上下文对象的,我们就可以在其调用异步任务时把g对象中所需信息传递到异步任务中,代码如下
def make_celery(app): _celery = Celery("rqams2") celery_conf = config.pop("celery", {}) _celery.conf.update(celery_conf) class ContextTask(_celery.Task): def __init__(self, *args, **kwargs): # 强制typing=False来跳过参数检测,也可以不在init中转而在调用处加上 self.typing = False super(ContextTask, self).__init__(*args, **kwargs) def apply_async(self, *args, **kwargs): # 此时还有视图中g的值,将其同步值调用方法的参数中 arguments = inspect.signature(super(ContextTask, self).apply_async).bind(*args, **kwargs).arguments.copy() arguments.setdefault('kwargs', {})["__flask_g"] = jsonpickle.dumps(g.__dict__) return super(ContextTask, self).apply_async(**arguments) def __call__(self, *args, **kwargs): with app.app_context(): # task运行时将参数中g的值重新加载到任务上下文的g对象中 for k, v in jsonpickle.loads(kwargs.pop("__flask_g")).items(): setattr(g, k, v) return self.run(*args, **kwargs) # 用自定义任务覆盖原生任务 _celery.Task = ContextTask return _celery
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。