赞
踩
Celery:https://github.com/celery/celery
官网文档
Celery 不支持微软Windows。
Celery是一个异步任务的调度工具,也可以叫做 "分布式任务队列(Distributed Task Queue)"。
在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。
这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。
broker、backend
- broker:是一个消息传输的中间件,其实就是 "消息队列" ,用来发送和接受消息。
- backend,就是数据库。通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。Backend是在Celery的配置中的一个配置项 CELERY_RESULT_BACKEND ,作用是保存结果和状态,如果你需要跟踪任务的状态,那么需要设置这一项,可以是Database backend,也可以是Cache backend
Celery 的架构由三部分组成,
Celery 架构图
可以看到,Celery 主要包含以下几个模块:
Celery 支持
中间人
- RabbitMQ
- Redis
- Amazon SQS
结果存储
- AMQP, Redis
- Memcached,
- SQLAlchemy, Django ORM
- Apache Cassandra, Elasticsearch, Riak
- MongoDB, CouchDB, Couchbase, ArangoDB
- Amazon DynamoDB, Amazon S3
- Microsoft Azure Block Blob, Microsoft Azure Cosmos DB
- File system
并发
- prefork (multiprocessing),
- Eventlet , gevent
- thread (multithreaded)
- solo (single threaded)
序列化
- pickle、json、yaml、msgpack
- zlib、bzip2 compression
- Cryptographic message signing
- 监控。可以针对整个流程进行监控,内置的工具或可以实时说明当前集群的概况。
- 调度。可以通过调度功能在一段时间内指定任务的执行时间 datetime,也可以根据简单每隔一段时间进行执行重复的任务,支持分钟、小时、星期几,也支持某一天或某一年的Crontab表达式。
- 工作流。可以通过“canvas“进行组成工作流,其中包含分组、链接、分块等等。简单和复杂的工作流程可以使用一组“canvas“组成,其中包含分组、链接、分块等。
- 资源(内存)泄漏保护。--max-tasks-per-child 参数适用于可能会出现资源泄漏(例如:内存泄漏)的任务。
- 时间和速率的限制。您可以控制每秒/分钟/小时执行任务的次数,或者任务执行的最长时间,也将这些设置为默认值,针对特定的任务或程序进行定制化配置。
- 自定义组件。开发者可以定制化每一个职程(Worker)以及额外的组件。职程(Worker)是用 “bootsteps” 构建的-一个依赖关系图,可以对职程(Worker)的内部进行细粒度控制。
Celery 很容易与web框架集成,其中一些甚至有集成包:
不需要 | |
|
|
pip install celery
pip install redis
使用celery包含三个方面:
创建一个文件 tasks.py
输入下列代码:
- from celery import Celery
-
- broker = 'redis://127.0.0.1:6379/5'
- backend = 'redis://127.0.0.1:6379/6'
-
-
- app = Celery('tasks', broker=broker, backend=backend)
-
- @app.task
- def add(x, y):
- return x + y
Celery 参数解释
上述代码导入了celery,然后创建了celery 实例 app,实例化的过程中指定了任务名tasks
(和文件名一致),传入了broker和backend。然后创建了一个任务函数add
。
下面启动 celery 服务。在当前命令行终端运行(分别在 env1 和 env2 下执行):
celery -A tasks worker --loglevel=info
目录结构 (celery -A tasks worker --loglevel=info 这条命令当前工作目录必须和 tasks.py 所在的目录相同。即 进入tasks.py所在目录执行这条命令。)
使用 python 虚拟环境 模拟两个不同的 主机。
此时会看见一对输出。包括注册的任务啦。
交互式客户端程序调用方法
打开一个命令行,进入Python环境。
In [0]:from tasks import add
In [1]: r = add.delay(2, 2)
In [2]: add.delay(2, 2)
Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d>In [3]: r = add.delay(3, 3)
In [4]: r.re
r.ready r.result r.revokeIn [4]: r.ready()
Out[4]: TrueIn [6]: r.result
Out[6]: 6In [7]: r.get()
Out[7]: 6
调用 delay 函数即可启动 add 这个任务。这个函数的效果是发送一条消息到broker中去,这个消息包括要执行的函数、函数的参数以及其他信息,具体的可以看 Celery官方文档。这个时候 worker 会等待 broker 中的消息,一旦收到消息就会立刻执行消息。
启动了一个任务之后,可以看到之前启动的worker已经开始执行任务了。
现在是在python环境中调用的add函数,实际上通常在应用程序中调用这个方法。
注意:如果把返回值赋值给一个变量,那么原来的应用程序也会被阻塞,需要等待异步任务返回的结果。因此,实际使用中,不需要把结果赋值。
应用程序中调用方法
新建一个 main.py 文件 代码如下:
- from tasks import add
-
- r = add.delay(2, 2)
- r = add.delay(3, 3)
- print r.ready()
- print r.result
- print r.get()
在celery命令行可以看见celery执行的日志。打开 backend的redis,也可以看见celery执行的信息。
使用 Redis Desktop Manager 查看 Redis 数据库内容如图:
Celery 的配置比较多,可以在 官方配置文档:http://docs.celeryproject.org/en/latest/userguide/configuration.html 查询每个配置项的含义。
上述的使用是简单的配置,下面介绍一个更健壮的方式来使用celery。首先创建一个python包,celery服务,姑且命名为proj。目录文件如下:
-
- ☁ proj tree
- .
- ├── __init__.py
- ├── celery.py # 创建 celery 实例
- ├── config.py # 配置文件
- └── tasks.py # 任务函数
首先是 celery.py
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
-
- from __future__ import absolute_import
- from celery import Celery
-
- app = Celery('proj', include=['proj.tasks'])
-
- app.config_from_object('proj.config')
-
- if __name__ == '__main__':
- app.start()
这一次创建 app,并没有直接指定 broker 和 backend。而是在配置文件中。
config.py
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
-
- from __future__ import absolute_import
-
- CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
- BROKER_URL = 'redis://127.0.0.1:6379/6'
剩下的就是tasks.py
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
-
- from __future__ import absolute_import
- from proj.celery import app
-
- @app.task
- def add(x, y):
- return x + y
使用方法也很简单,在 proj 的同一级目录执行 celery:
celery -A proj worker -l info
现在使用任务也很简单,直接在客户端代码调用 proj.tasks 里的函数即可。
指定 路由 到的 队列
Celery的官方文档 。先看代码(tasks.py):
- from celery import Celery
-
- app = Celery()
- app.config_from_object("celeryconfig")
-
- @app.task
- def taskA(x,y):
- return x + y
-
- @app.task
- def taskB(x,y,z):
- return x + y + z
-
- @app.task
- def add(x,y):
- return x + y
上面的tasks.py中,首先定义了一个Celery对象,然后用celeryconfig.py对celery对象进行设置,之后再分别定义了三个task,分别是taskA,taskB和add。接下来看一下celeryconfig.py 文件
- from kombu import Exchange,Queue
-
- BROKER_URL = "redis://10.32.105.227:6379/0" CELERY_RESULT_BACKEND = "redis://10.32.105.227:6379/0"
-
- CELERY_QUEUES = (
- Queue("default",Exchange("default"),routing_key="default"),
- Queue("for_task_A",Exchange("for_task_A"),routing_key="task_a"),
- Queue("for_task_B",Exchange("for_task_B"),routing_key="task_a")
- )
-
- CELERY_ROUTES = {
- 'tasks.taskA':{"queue":"for_task_A","routing_key":"task_a"},
- 'tasks.taskB":{"queue":"for_task_B","routing_key:"task_b"}
- }
在 celeryconfig.py 文件中,首先设置了brokel以及result_backend,接下来定义了三个Message Queue,并且指明了Queue对应的Exchange(当使用Redis作为broker时,Exchange的名字必须和Queue的名字一样)以及routing_key的值。
现在在一台主机上面启动一个worker,这个worker只执行for_task_A队列中的消息,这是通过在启动worker是使用-Q Queue_Name参数指定的。
celery -A tasks worker -l info -n worker.%h -Q for_task_A
然后到另一台主机上面执行taskA任务。首先 切换当前目录到代码所在的工程下,启动python,执行下面代码启动taskA:
- from tasks import *
-
- task_A_re = taskA.delay(100,200)
执行完上面的代码之后,task_A消息会被立即发送到for_task_A队列中去。此时已经启动的worker.atsgxxx 会立即执行taskA任务。
重复上面的过程,在另外一台机器上启动一个worker专门执行for_task_B中的任务。修改上一步骤的代码,把 taskA 改成 taskB 并执行。
- from tasks import *
-
- task_B_re = taskB.delay(100,200)
在上面的 tasks.py 文件中还定义了add任务,但是在celeryconfig.py文件中没有指定这个任务route到那个Queue中去执行,此时执行add任务的时候,add会route到Celery默认的名字叫做celery的队列中去。
因为这个消息没有在celeryconfig.py文件中指定应该route到哪一个Queue中,所以会被发送到默认的名字为celery的Queue中,但是我们还没有启动worker执行celery中的任务。接下来我们在启动一个worker执行celery队列中的任务。
celery -A tasks worker -l info -n worker.%h -Q celery
然后再查看add的状态,会发现状态由PENDING变成了SUCCESS。
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
一种常见的需求是每隔一段时间执行一个任务。
在celery中执行定时任务非常简单,只需要设置celery对象的CELERYBEAT_SCHEDULE属性即可。
配置如下
config.py
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
-
- from __future__ import absolute_import
-
- CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
- BROKER_URL = 'redis://127.0.0.1:6379/6'
-
- CELERY_TIMEZONE = 'Asia/Shanghai'
-
- from datetime import timedelta
-
- CELERYBEAT_SCHEDULE = {
- 'add-every-30-seconds': {
- 'task': 'proj.tasks.add',
- 'schedule': timedelta(seconds=30),
- 'args': (16, 16)
- },
- }
注意配置文件需要指定时区。这段代码表示每隔30秒执行 add 函数。一旦使用了 scheduler, 启动 celery需要加上-B 参数。
celery -A proj worker -B -l info
设置多个定时任务
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
'taskA_schedule' : {
'task':'tasks.taskA',
'schedule':20,
'args':(5,6)
},
'taskB_scheduler' : {
'task':"tasks.taskB",
"schedule":200,
"args":(10,20,30)
},
'add_schedule': {
"task":"tasks.add",
"schedule":10,
"args":(1,2)
}
}
定义3个定时任务,即每隔20s执行taskA任务,参数为(5,6),每隔200s执行taskB任务,参数为(10,20,30),每隔10s执行add任务,参数为(1,2).通过下列命令启动一个定时任务: celery -A tasks beat。使用 beat 参数即可启动定时任务。
计划任务当然也可以用crontab实现,celery也有crontab模式。修改 config.py
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
-
- from __future__ import absolute_import
-
- CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
- BROKER_URL = 'redis://127.0.0.1:6379/6'
-
- CELERY_TIMEZONE = 'Asia/Shanghai'
-
- from celery.schedules import crontab
-
- CELERYBEAT_SCHEDULE = {
- # Executes every Monday morning at 7:30 A.M
- 'add-every-monday-morning': {
- 'task': 'tasks.add',
- 'schedule': crontab(hour=7, minute=30, day_of_week=1),
- 'args': (16, 16),
- },
- }
scheduler的切分度很细,可以精确到秒。crontab模式就不用说了。
当然celery还有更高级的用法,比如 多个机器 使用,启用多个 worker并发处理 等。
发送任务到队列中
apply_async(args[, kwargs[, …]])、delay(*args, **kwargs) :http://docs.celeryproject.org/en/master/userguide/calling.html
send_task :http://docs.celeryproject.org/en/master/reference/celery.html#celery.Celery.send_task
from celery import Celery
celery = Celery()
celery.config_from_object('celeryconfig')
send_task('tasks.test1', args=[hotplay_id, start_dt, end_dt], queue='hotplay_jy_queue')
帮助文档:https://docs.celeryq.dev/en/latest/userguide/monitoring.html
输入 celery -h 可以看到 celery 的命令和帮助
:https://docs.celeryq.dev/en/latest/getting-started/first-steps-with-celery.html
第一步
编写简单的纯python函数
- def say(x,y):
- return x+y
-
- if __name__ == '__main__':
- say('Hello','World')
第二步
如果这个函数不是简单的输出两个字符串相加,而是需要查询数据库或者进行复杂的处理。这种处理需要耗费大量的时间,还是这种方式执行会是多么糟糕的事情。为了演示这种现象,可以使用sleep函数来模拟高耗时任务。
- import time
-
- def say(x,y):
- time.sleep(5)
- return x+y
-
- if __name__ == '__main__':
- say('Hello','World')
第三步
这时候我们可能会思考怎么使用多进程或者多线程去实现这种任务。对于多进程与多线程的不足这里不做讨论。现在我们可以想想celery到底能不能解决这种问题。
- import time
- from celery import Celery
-
- app = Celery('sample',broker='amqp://guest@localhost//')
-
- @app.task
- def say(x,y):
- time.sleep(5)
- return x+y
-
- if __name__ == '__main__':
- say('Hello','World')
现在来解释一下新加入的几行代码,首先说明一下加入的新代码完全不需要改变原来的代码。导入celery模块就不用解释了,声明一个celery实例app的参数需要解释一下。
第四步
现在我们已经使用了celery框架了,我们需要让它找几个工人帮我们干活。好现在就让他们干活。
celery -A sample worker --loglevel=info
这条命令有些长,我来解释一下吧。
第五步
现在我们的任务已经被加载到了内存中,我们不能再像之前那样执行python sample.py来运行程序了。我们可以通过终端进入python然后通过下面的方式加载任务。输入python语句。
- from sample import say
- say.delay('hello','world')
我们的函数会立即返回,不需要等待。就那么简单celery解决了我们的问题。可以发现我们的say函数不是直接调用了,它被celery 的 task 装饰器 修饰过了。所以多了一些属性。目前我们只需要知道使用delay就行了。
确保你之前的RabbitMQ已经启动。还是官网的那个例子,在任意目录新建一个tasks.py的文件,内容如下:
- from celery import Celery
-
- app = Celery('tasks', broker='amqp://guest@localhost//')
-
- @app.task
- def add(x, y):
- return x + y
使用redis作为消息队列
- app = Celery('task', broker='redis://localhost:6379/4')
- app.conf.update(
- CELERY_TASK_SERIALIZER='json',
- CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
- CELERY_RESULT_SERIALIZER='json',
- CELERYD_CONCURRENCY = 8
- )
-
- @app.task
- def add(x, y):
- return x + y
在同级目录执行:
$ celery -A tasks.app worker --loglevel=info
该命令的意思是启动一个worker ( tasks文件中的app实例,默认实例名为app,-A 参数后也可直接加文件名,不需要 .app),把tasks中的任务(add(x,y))把任务放到队列中。保持窗口打开,新开一个窗口进入交互模式,python或者ipython:
- >>> from tasks import add
- >>> add.delay(4, 4)
到此为止,你已经可以使用celery执行任务了,上面的python交互模式下简单的调用了add任务,并传递 4,4 参数。
但此时有一个问题,你突然想知道这个任务的执行结果和状态,到底完了没有。因此就需要设置backend了
修改之前的tasks.py中的代码为:
- # coding:utf-8
- import subprocess
- from time import sleep
-
- from celery import Celery
-
- backend = 'db+mysql://root:@192.168.0.102/celery'
- broker = 'amqp://guest@192.168.0.102:5672'
-
- app = Celery('tasks', backend=backend, broker=broker)
-
-
- @app.task
- def add(x, y):
- sleep(10)
- return x + y
-
-
- @app.task
- def hostname():
- return subprocess.check_output(['hostname'])
除了添加backend之外,上面还添加了一个who的方法用来测试多服务器操作。修改完成之后,还按之前的方式启动。
同样进入python的交互模型:
- >>> from tasks import add, hostname
- >>> r = add.delay(4, 4)
- >>> r.ready() # 10s内执行,会输出False,因为add中sleep了10s
- >>>
- >>> r = hostname.delay()
- >>> r.result # 输出你的hostname
做完上面的测试之后,产生了一个疑惑,Celery叫做分布式任务管理,那它的分布式体现在哪?它的任务都是怎么执行的?在哪个机器上执行的?在当前服务器上的celery服务不关闭的情况下,按照同样的方式在另外一台服务器上安装Celery,并启动:
$ celery -A tasks worker --loglevel=info
发现前一个服务器的Celery服务中输出你刚启动的服务器的hostname,前提是那台服务器连上了你的rabbitmq。然后再进入python交互模式:
- >>> from tasks import hostname
- >>>
- >>> for i in range(10):
- ... r = hostname.delay()
- ... print r.result # 输出你的hostname
- >>>
看你输入的内容已经观察两台服务器上你启动celery服务的输出。
在实际的项目中我们需要明确前后台的分界线,因此我们的celery编写的时候就应该是分成前后台两个部分编写。在celery简单入门中的总结部分我们也提出了另外一个问题,就是需要分离celery的配置文件。
第一步
编写后台任务tasks.py脚本文件。在这个文件中我们不需要再声明celery的实例,我们只需要导入其task装饰器来注册我们的任务即可。后台处理业务逻辑完全独立于前台,这里只是简单的hello world程序需要多少个参数只需要告诉前台就可以了,在实际项目中可能你需要的是后台执行发送一封邮件的任务或者进行复杂的数据库查询任务等。
- import time
- from celery.task import task
-
-
- @task
- def say(x,y):
- time.sleep(5)
- return x+y
第二步
有了那么完美的后台,我们的前台编写肯定也轻松不少。到底简单到什么地步呢,来看看前台的代码吧!为了形象的表明其职能,我们将其命名为client.py脚本文件。
- from celery import Celery
-
- app = Celery()
-
- app.config_from_object('celeryconfig')
- app.send_task("tasks.say",['hello','world'])
可以看到只需要简单的几步:1. 声明一个celery实例。2. 加载配置文件。3. 发送任务。
第三步
继续完成celery的配置。官方的介绍使用celeryconfig.py作为配置文件名,这样可以防止与你现在的应用的配置同名。
CELERY_IMPORTS = ('tasks')
CELERY_IGNORE_RESULT = False
BROKER_HOST = '127.0.0.1'
BROKER_PORT = 5672
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp'
可以看到我们指定了CELERY_RESULT_BACKEND为amqp默认的队列!这样我们就可以查看处理后的运行状态了,后面将会介绍处理结果的查看。
第四步
启动celery后台服务,这里是测试与学习celery的教程。在实际生产环境中,如果是通过这种方式启动的后台进程是不行的。所谓后台进程通常是需要作为守护进程运行在后台的,在python的世界里总是有一些工具能够满足你的需要。这里可以使用supervisor作为进程管理工具。在后面的文章中将会介绍如何使用supervisor工具。
celery worker -l info --beat
注意现在运行worker的方式也与前面介绍的不一样了,下面简单介绍各个参数。
-l info 与--loglevel=info的作用是一样的。
--beat 周期性的运行。即设置 心跳。
第五步
前台的运行就比较简单了,与平时运行的python脚本一样。python client.py。
现在前台的任务是运行了,可是任务是被写死了。我们的任务大多数时候是动态的,为演示动态工作的情况我们可以使用终端发送任务。
- >>> from celery import Celery
- >>> app = Celery()
- >>> app.config_from_object('celeryconfig')
在python终端导入celery模块声明实例然后加载配置文件,完成了这些步骤后就可以动态的发送任务并且查看任务状态了。注意在配置文件celeryconfig.py中我们已经开启了处理的结果回应模式了CELERY_IGNORE_RESULT = False并且在回应方式配置中我们设置了CELERY_RESULT_BACKEND = 'amqp'这样我们就可以查看到处理的状态了。
- >>> x = app.send_task('task.say',['hello', 'lady'])
- >>> x.ready()
- False
- >>> x.status
- 'PENDING'
- >>> x.ready()
- TRUE
- >>> x.status
- u'SUCCESS'
可以看到任务发送给celery后马上查看任务状态会处于PENDING状态。稍等片刻就可以查看到SUCCESS状态了。这种效果真棒不是吗?在图像处理中或者其他的一些搞耗时的任务中,我们只需要把任务发送给后台就不用去管它了。当我们需要结果的时候只需要查看一些是否成功完成了,如果返回成功我们就可以去后台数据库去找处理后生成的数据了。
第一步
安装好mongodb了!就可以使用它了,首先让我们修改celeryconfig.py文件,使celery知道我们有一个新成员要加入我们的项目,它就是mongodb配置的方式如下。
ELERY_IMPORTS = ('tasks')
CELERY_IGNORE_RESULT = False
BROKER_HOST = '127.0.0.1'
BROKER_PORT = 5672
BROKER_URL = 'amqp://'
#CELERY_RESULT_BACKEND = 'amqp'
CELERY_RESULT_BACKEND = 'mongodb'
CELERY_RESULT_BACKEND_SETTINGS = {
"host":"127.0.0.1",
"port":27017,
"database":"jobs",
"taskmeta_collection":"stock_taskmeta_collection",
}
把#CELERY_RESULT_BACKEND = 'amp'注释掉了,但是没有删除目的是对比前后的改变。为了使用mongodb我们有简单了配置一下主机端口以及数据库名字等。显然你可以按照你喜欢的名字来配置它。
第二步
启动 mongodb 数据库:mongod。修改客户端client.py让他能够动态的传人我们的数据,非常简单代码如下。
- import sys
- from celery import Celery
-
- app = Celery()
-
- app.config_from_object('celeryconfig')
- app.send_task("tasks.say",[sys.argv[1],sys.argv[2]])
任务tasks.py不需要修改!
- import time
- from celery.task import task
-
-
- @task
- def say(x,y):
- time.sleep(5)
- return x+y
第三步
测试代码,先启动celery任务。
celery worker -l info --beat
再来启动我们的客户端,注意这次启动的时候需要给两个参数啦!
mongo
python client.py welcome landpack
等上5秒钟,我们的后台处理完成后我们就可以去查看数据库了。
第四步
查看mongodb,需要启动一个mongodb客户端,启动非常简单直接输入 mongo 。然后是输入一些简单的mongo查询语句。
最后查到的数据结果可能是你不想看到的,因为mongo已经进行了处理。想了解更多可以查看官方的文档。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。