1、队列介绍
任务队列用作跨线程或机器分配工作的机制。
任务队列的输入是称为任务的工作单元。
专用工作进程不断监视任务队列以执行新工作。
Celery通过消息进行通信,通常使用经纪人(brokers)在客户和工人之间进行调解。
为了启动任务,客户端向队列添加消息,然后经纪人(brokers)将该消息传递给工作者。
Celery系统可以由多个工作人员和经纪人组成,让位于高可用性和水平扩展。
Celery是用Python编写的,但协议可以用任何语言实现。
还可以实现语言互操作性,从而暴露HTTP端点并具有请求它的任务(webhooks)。
1.1如何安装
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#id6
1.2 环境 ,自定义包的引入
# 导入包,添加目录的时候只用添加它的父目录。
# 如果想要添加包中的文件,就必须,把包的目录放到path环境中
sys.path.append(r"C:\Users\Administrator\PycharmProjects\untitled1")
1.3启动task、
# 只要启动了celery服务,可以在任何地方添加任务,有两种方式启动
1.3.1 # KeyError: 'module.tasks.tasks.add'# 这是因为在tasks中没有这个任务名 # 我们需要正确的导入, # 如果你想要用源文件的方法记得只能导入那个方法 # from XXXX import tasks.add 这是错误的
1.3.2 # 在相应的文件中添加app的对象,然后重写想要的方法,记得@ # 因为只是校验包名,所以我们可以随便写个方法,但要名字对
2、celery使用
2.1 borkers
Celery需要消息传输(borkers)来发送和接收消息。
2.2 backend
选择并安装消息传输(borkers)。安装Celery并创建您的第一个任务。启动worker并调用任务。在任务转换到不同状态时跟踪任务,并检查返回值。
注意:如果您想跟踪任务的状态,Celery需要在某处存储或发送状态。(backend)(我们需要配置后端)
有几个内置的结果后端可供选择:SQLAlchemy / Django ORM,Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您可以定义自己的。
在本例中,我们使用rpc结果后端,它将状态作为瞬态消息发回。后端是通过Celery的后端参数指定的(或者如果您选择使用配置模块,则通过result_backend设置):
如果存储在redis中,celery表代表没有完成,celery-task-meta-f418abea-7827-4220-b72e-a0669e8b8a08表代表完成。
2.3 使用方式
2.3.1 单文件使用,模块形式
#创建task.py from celery import Celery
#您需要的第一件事是Celery实例。我们称之为Celery应用程序或简称app。
#由于此实例用作您希望在Celery中执行的所有操作的入口点,例如创建任务和管理工作程序,因此其他模块必须可以导入它。
#在本教程中,我们将所有内容保存在单个模块中,但对于较大的项目,您需要创建专用模块。
app = Celery('tasks', broker='pyamqp://guest@localhost//')
#Celery的第一个参数是当前模块的名称。只有在__main__模块中定义任务时才能自动生成名称。
#第二个参数是broker关键字参数,指定要使用的消息代理的URL。这里使用RabbitMQ(也是默认选项)。redis://localhost BROKER_URL = 'redis://localhost:6379/0' 0代表0数据库 @app.task def add(x, y): return x + y
运行的命令:(必须在tasks当前目录下)
$ celery -A tasks worker --loglevel=info
该命令中-A参数表示的是Celery APP的名称,这个实例中指的就是tasks.py,就是这个tasks文件,他会自动从中找到app然后执行,worker是一个执行任务角色,后面的
loglevel=info记录日志类型默认是info,这个命令启动了一个worker,用来执行程序中add这个加法任务(task)
回调任务:
from tasks import add add.delay(4, 4)
跟踪状态:
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
>>> result = add.delay(4, 4)ready()方法返回任务是否已完成处理:
>>> result.ready()Ture
>>> result.traceback
如果任务引发异常,您还可以访问原始回溯:
>>> result.get(propagate=False)
同步操作,timeout可以指定时间;可以直接打印异常;可以拒绝打印异常如上。
后端使用资源来存储和传输结果。要确保释放资源,最后必须在调用任务后返回的EVERY AsyncResult实例上调用get()或forget()。要不然它会一直存在数据库中
2.3.2 配置文件使用
与上面的单文件模块一起使用或者在包方式中使用
您可以通过调用app.config_from_object()方法告诉Celery实例使用配置模块在task.py:
app.config_from_object('celeryconfig')
此模块通常称为“celeryconfig”,但您可以使用任何模块名称。在上面的例子中,一个名为celeryconfig.py的模块必须可以从当前目录或Python路径加载。它可能看起来像这样:
broker_url = 'pyamqp://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True
与消费类应用一样,不需要太多配置即可运行。 它有一个输入和一个输出。输入必须连接到代理,输出可以选择连接到结果后端。 但是,如果仔细观察背面,会有一个盖子显示滑块,刻度盘和按钮的负载:这是配置。 对于大多数用例,默认配置应该足够好,但是可以配置许多选项以使Celery完全按照需要工作。 阅读可用选项是一个好主意,熟悉可配置的内容。您可以在“配置”和“默认值”参考中阅读有关选项的信息。可以直接在应用程序上或使用专用配置模块设置配置。
app.命令 都是可以直接放到tasks文件中的 例如,您可以通过更改task_serializer设置来配置用于序列化任务有效负载的默认序列化程序: app.conf.task_serializer = 'json'
如果您一次配置了许多设置,则可以使用update: app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Europe/Oslo', enable_utc=True, ) 对于大型项目,建议使用专用配置模块。 不鼓励硬编码周期性任务间隔和任务路由选项。将它们保存在集中位置要好得多。 对于库来说尤其如此,因为它使用户能够控制其任务的行为方式。集中配置还允许您的SysAdmin在发生系统故障时进行简单的更改。 为了演示配置文件的强大功能,您可以将行为不当的任务路由到专用队列: celeryconfig.py: task_routes = { 'tasks.add': 'low-priority', } Or instead of routing it you could rate limit the task instead, so that only 10 tasks of this type can be processed in a minute (10/m): celeryconfig.py: task_annotations = { 'tasks.add': {'rate_limit': '10/m'} } If you’re using RabbitMQ or Redis as the broker then you can also direct the workers to set a new rate limit for the task at runtime: $ celery -A tasks control rate_limit tasks.add 10/m worker
2.3.3 包形式使用
文件结构:(必须这样结构,与运行的命令有关)
proj/__init__.py /celery.py /tasks.py
celery.py
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
task.py
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
运行的命令(我们需要移到项目目录的上一级)
celery -A proj worker -l info
2.3.4 一些参数:
- transport:是您在celery模块的broker参数中指定的URL,您还可以使用-b选项在命令行上指定其他代理。 - concurrency:是用于同时处理任务的prefork工作进程的数量,当所有这些进程忙于工作时,新任务必须等待其中一个任务完成才能处理。默认并发数是该计算机上CPU的数量(包括核心),您可以使用celery worker -c选项指定自定义数字。没有建议值,因为最佳数量取决于许多因素,但如果您的任务主要是I / O限制,那么您可以尝试增加它,实验表明添加超过CPU数量的两倍很少有效,而且可能会降低性能。包括默认的prefork池,Celery还支持使用Eventlet,Gevent,并在单个线程中运行(请参阅并发)。 - events:是一个选项,启用后会导致Celery为工作中发生的操作发送监视消息(事件)。这些可用于监视程序,如芹菜事件和Flower - 实时Celery监视器,您可以在监视和管理指南中阅读。 -queues:是工作人员将从中使用任务的队列列表。可以告诉工作人员同时从多个队列中进行消费,这用于将消息路由到特定工作人员,作为服务质量,关注点分离和优先级排序的手段,所有这些都在“路由指南”中进行了描述。
2.3.5 后台运行
celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log
如果不指定 pid 和log的话会在当前生成 w1.pid,w1.log,w1-1.log,w1-2.log (2线程)
w1就是worker1的名称,线程会根据你的cpu自己定 。
十个进程的话 w1 换成10
在生产中,您将需要在后台运行worker,这在daemonization教程中有详细描述。守护程序脚本使用celery multi命令在后台启动一个或多个worker: $ celery multi start w1 -A proj -l info celery multi v4.0.0 (latentcall) > Starting nodes... > w1.halcyon.local: OK You can restart it too: $ celery multi restart w1 -A proj -l info celery multi v4.0.0 (latentcall) > Stopping nodes... > w1.halcyon.local: TERM -> 64024 > Waiting for 1 node..... > w1.halcyon.local: OK > Restarting node w1.halcyon.local: OK celery multi v4.0.0 (latentcall) > Stopping nodes... > w1.halcyon.local: TERM -> 64052 or stop it: $ celery multi stop w1 -A proj -l info The stop command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait command instead, this ensures all currently executing tasks are completed before exiting: $ celery multi stopwait w1 -A proj -l info celery multi不存储有关worker的信息,因此在重新启动时需要使用相同的命令行参数。停止时,只能使用相同的pidfile和logfile参数。 默认情况下,它会在当前目录中创建pid和日志文件,以防止多个工作人员在彼此之上启动,鼓励您将这些文件放在专用目录中: $ mkdir -p /var/run/celery $ mkdir -p /var/log/celery $ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log 使用multi命令可以启动多个worker,并且还有一个强大的命令行语法来为不同的worker指定参数,例如: $ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \ -Q default -L:4,5 debug
2.3.6 定时器功能
使用定时器功能首先得配置一下schedule,刚刚我们是直接在Celery函数加入配置,现在我们专门用一个文件来放配置文件,schedule也会写在这里面。我们可以用celery的beat去周期的生成任务和执行任务
https://blog.csdn.net/freeking101/article/details/74707619
修改tasks.py
from celery import Celery from time import sleep import celeryconfig app = Celery('tasks')#, backend='amqp', broker='amqp://guest@localhost//') app.config_from_object('celeryconfig') @app.task def add(x, y): sleep(5) return x + y
增加配置文件celeryconfig.py
from celery.schedules import crontab BROKER_URL = 'amqp://guest@localhost//' CELERY_RESULT_BACKEND = 'amqp://' CELERYBEAT_SCHEDULE={ "every-1-minute": { 'task': 'tasks.add', 'schedule': crontab(minute='*/1'), #如果是一秒中直接写
'args': (5,6)# 执行的参数 } }
表示一分钟触发一次add的函数,args是传入的参数,表示一分钟执行一次add(5,6),注意如果再添加一个任务,不能与every-1-minute重复,不然只有最后一个生效了。
然后执行celery -A tasks worker -B --loglevel=info 就能够增加触发beat任务了,会在本地生成一个celerybeat-schedule文件。(linux可以直接运行 )
最好在-B后面加一个 -s /tmp/celerybeat-schedule ,不然很可能导致当前目录没有写权限而报permission refused
运行:(这是因为window中必须分离)
celery -A tasks worker --loglevel=info
celery -A tasks beat (beat就是领导发布定时任务)-i info
Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案
pip install eventlet
然后启动worker的时候加一个参数,如下:(总之在最后加上P)
celery -A <mymodule> worker -l info -P eventlet