赞
踩
Celery本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的消息服务有RabbitMQ、Redis、Amazon SQS;Celery 是用 Python 编写的,但协议可以用任何语言实现。除了 Python 语言实现之外,还有Node.js的 node-celery 和php的 celery-php 。
需要注意的是:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
大概意思就是Celery 是一个资金最少的项目,因此我们不支持 Microsoft Windows,请不要提出与该平台相关的任何问题。官方不支持windows,可以通过插件eventlet
在windows中使用,功能有部分缩减。
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。
启动celery服务(配置好broker和backend),手动或自动添加任务到broker中,broker将任务分发给worker,worker后台异步执行任务,执行完成后将结果存放到backend中。
通过pip安装
# 本机Python环境为Python3.7.4
pip install celery
# 验证安装 查看版本
celery --version
---console---
5.2.3 (dawn-chorus)
使用redis作为backend和broker,需要本机安装redis,并启动redis-server,还需要安装Python第三方模块redis
pip install redis
由于官方不支持windows安装和使用,需要借助Python第三方模块eventlet
,此模块主要作用通过协程实现并发。
pip install celery
pip install eventlet
# 验证安装 查看版本
celery --version
---console---
5.2.3 (dawn-chorus)
目录结构
celery对目录要求严格,如果不在目录下加入__init__.py,worker执行任务可能会出现NotRegistered的情况
demo1
- __init__.py
- tasks.py
- run.py
tasks.py
from celery import Celery
# 通过使用本机redis且没有密码,使用远程redis有密码格式为
# 'redis://:密码@ip:6379/1'
broker = 'redis://127.0.0.1:6379/1' # 任务储存
backend = 'redis://127.0.0.1:6379/2' # 结果存储,执行完之后结果放在这
# 创建出app对象
app = Celery(__name__, broker=broker, backend=backend)
# 任务通过装饰器@app.task进行装饰
@app.task
def add(x, y):
return x + y
run.py
from tasks import add
# 添加任务
# 返回一个 AsyncResult 实例,可以用于进行跟踪任务状况
result = add.delay(3,4)
print(result)
# 需要cd到tasks文件对应的路径 # 启动一个worker,日志打印级别为info # windows平台启动 # celery -A tasks worker -l info -P eventlet # linux启动 # 格式为:celery -A app对象所在的文件 worker -l 日志级别 celery -A tasks worker -l info ---console--- /usr/local/lib/python3.7/site-packages/celery/platforms.py:841: SecurityWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@6ae5fd398c10 v5.2.3 (dawn-chorus) --- ***** ----- -- ******* ---- Linux-3.10.0-1160.59.1.el7.x86_64-x86_64-with-debian-9.11 2022-03-31 02:52:29 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f00470ca550 - ** ---------- .> transport: redis://:**@127.0.0.1:6379/1 - ** ---------- .> results: redis://:**@127.0.0.1:6379/2 - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2022-03-31 02:52:29,388: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/1 [2022-03-31 02:52:29,391: INFO/MainProcess] mingle: searching for neighbors [2022-03-31 02:52:30,497: INFO/MainProcess] mingle: sync with 4 nodes [2022-03-31 02:52:30,497: INFO/MainProcess] mingle: sync complete [2022-03-31 02:52:30,511: INFO/MainProcess] celery@6ae5fd398c10 ready. # 从控制台中可以看到一些celery的一些配置信息和运行信息
# 需要cd到run.py同路径,且需要新打开一个bash窗口
python run.py
459b9b9f-de0e-4caf-bd23-f49ef89c797c
---console---
# 此时查看worker所在的bash窗口,窗口打印出了新的log信息
---console---
[2022-03-31 03:20:09,985: INFO/MainProcess] Task tasks.add[459b9b9f-de0e-4caf-bd23-f49ef89c797c] received
[2022-03-31 03:20:10,011: INFO/ForkPoolWorker-1] Task tasks.add[459b9b9f-de0e-4caf-bd23-f49ef89c797c] succeeded in 0.02414786597364582s: 7
连接redis查看结果存储
[root@centos-01 demo1]# redis-cli
127.0.0.1:6379> select 2
OK
127.0.0.1:6379[2]> keys *
1) "celery-task-meta-459b9b9f-de0e-4caf-bd23-f49ef89c797c"
127.0.0.1:6379[2]> get celery-task-meta-459b9b9f-de0e-4caf-bd23-f49ef89c797c
"{\"status\": \"SUCCESS\", \"result\": 7, \"traceback\": null, \"children\": [], \"date_done\": \"2022-03-31T03:20:09.987307\", \"task_id\": \"459b9b9f-de0e-4caf-bd23-f49ef89c797c\"}"
127.0.0.1:6379[2]>
# celery中backend使用的库为2,切换到对应的库中,查看所有的key,发现只有一个key,且key中包含AsyncResult 实例,查看该key
# 对应的value,正是我们执行的一些信息
在run.py中添加
# 查看任务执行状态,执行完成(不管是否发生异常还是正常执行)返回True,未执行返回False print(result.ready()) # 如果任务出现异常,可以通过以下命令进行回溯 result.traceback from tasks import app from celery.result import AsyncResult asy = AsyncResult(id=result, app=app) # 是否正常执行完成,正常执行成功返回True,否则返回False print(asy.successful()) # 执行成功打印结果 print(asy.get()) # 是否正常执行失败,失败返回True,否则返回False print(asy.failed()) # 查看任务状态 print(asy.status) # 执行状态一般有三种PENDING、RETRY、STARTED
直接通过Ctrl + C停止即可
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。