Flask+Celery
安装
- 模块
- pip install celery
- pip install eventlet
- pip install -U "celery[redis]"
- pip install redis
在Windows操作系统上,还需要安装另外一个东西,eventlet
- redis数据库
- redis desktop manager
相关文档
关系
- task,任务
- broker(中间人),存储任务的队列(借助redis实现)
- worker:真正执行任务的工作者
- backend:用来存储任务执行后的结果
配置
想要在flask中使用celery,可以配合redis数据库,redis配置如下:
app.conf.broker_url = 'redis://localhost:6379/0'
其中redis数据库链接格式为:
redis://:password@hostname:port/db_number
若没有密码,则为:
redis://hostname:port/db_number
多个redis链接
app.conf.broker_url = 'redis://localhost:6379/0;redis://localhost:6378/0'
URL 的所有配置都可以自定义配置的,默认使用的是 localhost 的 6379 端口中 0 数据库。( Redis 默认有 16 个数据库)
可见性超时
可见性超时为将消息重新下发给另外一个程序之前等待确认的任务秒数。请注意查看下面的。
可以通过 broker_transport_options
选项进行修改:
app.conf.broker_transport_options = {'visibility_timeout': 3600} # 一个小时
默认的可见性超时时间为1个小时。
结果
如果想保存任务执行返回结果保存到Redis,需要进行以下配置:
app.conf.result_backend = 'redis://localhost:6379/0'
简单测试
- 用法
- celery = Celery("tasks",
- broker="redis://118.24.128.18:6379/0",
- backend="redis://118.24.128.18:6379/0")
存储结果的位置:backend
- 第一个参数为当前模块的名称
- 第二个参数为中间人(Broker)的链接 URL
- 第三个参数backend为存储结果的位置
- 创建task.py
- from celery import Celery
- import time
-
- celery = Celery("tasks",
- broker="redis://localhost:6379/0",
- backend="redis://localhost:6379/0")
-
- @celery.task
- def send_mail(a,b):
- time.sleep(5000)
- return a+b
可以看见send_mail()
中设置等待5000秒,通常这个函数会运行很久,毫无体验感,但是为了获取体验感使用celery进行加速。
运行 Celery 职程(Worker)服务
现在可以使用 worker 参数进行执行刚刚创建职程(Worker):
celery -A task worker --loglevel=info
格式:
celery -A 文件名称 worker --loglevel=info
调用任务
需要调用我们创建的实例任务,可以通过 delay()
进行调用。
delay()
是 apply_async()
的快捷方法,可以更好的控制任务的执行:
- >>> from task import *
- >>> r = add.delay(4,5)
- >>> r
- <AsyncResult: 1d616754-2447-43df-a1a4-1368ce01a86b>
如果出现以下报错:
ValueError: not enough values to unpack (expected 3, got 0)
解决
celery -A your_app_name worker --pool=solo -l info
其中mymodule为文件名称
ready()
可以检测是否已经处理完毕:
- >>> r.ready()
- True
整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:
- >>> r.get()
- 9
>>> r.get(timeout=1)
如果运行出现如下错误:
如果任务出现异常,get()
会再次引发异常,可以通过 propagate 参数进行覆盖:
>>> r.get(propagate=False)
如果任务出现异常,可以通过以下命令进行回溯:
>>> r.traceback
flask-celery
最简示例
通过上面的步骤,下面即是在 Flask 中使用 Celery 的最简示例:
- from flask import Flask
- from celery import Celery
-
- app = Flask(__name__)
- app.config.update(
- CELERY_BROKER_URL='redis://localhost:6379',
- CELERY_RESULT_BACKEND='redis://localhost:6379'
- )
-
- def make_celery(app):
- celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
- celery.conf.update(app.config)
- TaskBase = celery.Task
- class ContextTask(TaskBase):
- abstract = True
- def __call__(self, *args, **kwargs):
- with app.app_context():
- return TaskBase.__call__(self, *args, **kwargs)
- celery.Task = ContextTask
- return celery
-
- celery = make_celery()
-
- @celery.task()
- def add(a, b):
- return a + b
-
- if __name__ == "__main__":
- app.run()
在后台调用:
- >>> from app import *
- >>> result = add.delay(4,5)
- >>> result.wait()
运行 Celery 职程
通过动手得知如上.wait()
永远不会实际地返回。这是因为需要运行 Celery。可以这样把 Celery 以职程运行:
celery -A your_app_name.celery worker --pool=solo --loglevel=info
实例:
celery -A app.celery worker --pool=solo --loglevel=info
而在linux中,这样使用
celery -A app.celery worker --pool=eventlet --loglevel=info
这次结果就有了。
- >>> from app import *
- >>> result = add.delay(4,5)
- >>> result.wait()
- 9
常见用途
- 邮件发送,由于可能在发送邮件时,由于网络等其他原因,导致发送时间过长,从而时用户体验感低,因此配合celery使用,就能够让等待时间降低,从而提高用户体验感。