当前位置:   article > 正文

异步框架Celery在Django中的运用

异步框架Celery在Django中的运用

参考博客:https://www.cnblogs.com/pyedu/p/12461819.html

参考视频:01 celery的工作机制_哔哩哔哩_bilibili

官方文档:First steps with Django — Celery 5.4.0rc1 documentation


目录

定义

主要架构

同步请求和异步请求

使用场景

主要优点

Celery 分布式任务队列

谈谈对Celery的理解

特点

工作原理

实践案例


定义

简单灵活、处理大量消息的分布式系统,专注于实时处理异步队列,支持任务调度

主要架构
  1. 消息中间件:message broker 可以集成第三方消息中间件如Redis、RabbitMQ
  2. 任务执行单元:worker 是celery提供的执行的任务执行的单元,并发分布在分布式的系统节点中
  3. 任务执行结果存储:task result store来存储执行任务的结果,支持方式 redis、AMQP
同步请求和异步请求

同步请求: 顺序进行IO操作等待阻塞进程依次执行

异步请求:异步进行,当IO操作阻塞时放到执行单元中完成放到数据库中而不影响其他单元的执行,当主进程需要阻塞的进程结果时会向是数据库中取出该数据(即将耗时操作放到异步队列中不影响主进程的执行),继续向下进行

使用场景
  1. 异步任务:将耗时操作任务提交到celery异步执行,如:发送短信、消息推送、音视频处理
  2. 定时任务:定时执行某件事情,如:每日数据统计
主要优点
  • 简单:使用和维护不要配置文件,只需添加基本信息的配置
  • 高可用:在work和client网络连接丢失或失败时会自动进行重试
  • 快速:单个celery进程可每分钟处理百万级任务,只需要毫秒级的往返延迟
  • 灵活:可以扩展使用,自定义池的实现、序列化、日志记录、消费者、broker消息传输
Celery 分布式任务队列

情景:用户发起 request,并等待 response 返回。在本些 views 中,可能需要执行一段耗时的程序,那么用户就会等待很长时间,造成不好的用户体验,比如发送邮件、手机验证码等。

解决 :将多个耗时的任务添加到队列 queue 中,也就是用 redis 实现 broker 中间人,然后用多个 worker 去监听队列里的任务去执行。

组成:

任务 task:就是一个 Python 函数。 worker:在一个新进程中,负责执行队列中的任务,也就是执行单元。 broker:负责调度,在布置环境中使用Redis,并且负责worker和服务器之间的信息传递。 backend:存储消息以及celery执行结果。

谈谈对Celery的理解

    Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。

    Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。

特点

    简单:熟悉celery的工作流程后,配置使用简单 高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务 快速:一个单进程的celery每分钟可处理上百万个任务 灵活:几乎celery的各组件都可以被扩展及自定制

工作原理

Celery由以下三部分构成:消息中间件(Broker)、任务执行单元Worker、结果存储(Backend)

工作原理:任务模块Task包含异步任务和定时任务。 其中,异步任务通常在业务逻辑中被触发并发往消息队列,而定时任务由Celery Beat进程周期性地将任务发往消息队列; 任务执行单元Worker实时监视消息队列获取队列中的任务执行; Woker执行完任务后将结果保存在Backend中;

  • 消息中间件Broker,消息中间件Broker官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ。
  • 任务执行单元Worker,Worker是任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心。
  • 结果存储Backend,Backend结果存储官方也提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apach

安装:

pip install celery
实践案例
  1. """
  2. 异步任务执行文件:celery_task.py
  3. 消费者模型
  4. """
  5. import celery
  6. import time
  7. # task.py
  8. import os
  9. os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
  10. backend='redis://127.0.0.1:6379/1'
  11. broker='redis://127.0.0.1:6379/2'
  12. cel=celery.Celery('test',backend=backend,broker=broker)
  13. @cel.task
  14. def send_email(name):
  15. print("向%s发送邮件..."%name)
  16. time.sleep(5)
  17. print("向%s发送邮件完成"%name)
  18. return "ok"
  19. @cel.task
  20. def send_msg(name):
  21. print("向%s发送短信..."%name)
  22. time.sleep(5)
  23. print("向%s发送短信完成"%name)
  24. return "ok"
  25. """"
  26. 执行任务文件: produce_task.py
  27. 生成者模型
  28. """
  29. from celery_task import send_email,send_msg
  30. result = send_email.delay("yuan") # 当执行delay函数时会自动调用消息中间件的任务执行队列,放到任务执行单元中
  31. print(result.id)
  32. result = send_msg.delay("alex")
  33. print(result.id)

先启动redis进程

 使用特定命令下发指令执行celery任务:

(注意celery5.0之前的命令是不一样的:celery worker -A celery_task -l info)

 先执行produce_task.py

返回ID: 

 fd27bc20-ccac-4855-9b3d-150708bad2a6
c07cb5b1-845a-44c4-963b-7ce3f92b98c8

 检查celery的异步队列查看执行结果

 注:当遇到以下情况

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "D:\python3\lib\site-packages\billiard\pool.py", line 361, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "D:\python3\lib\site-packages\celery\app\trace.py", line 664, in fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
[2024-02-24 15:31:20,394: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')

解决方法:

在消费者模型中添加以下代码

import os
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')

 查看异步执行的结果:

  1. """
  2. 查看任务执行结果: result.py
  3. """
  4. from celery.result import AsyncResult
  5. from celery_task import cel
  6. async_result=AsyncResult(id="fd27bc20-ccac-4855-9b3d-150708bad2a6", app=cel)
  7. if async_result.successful():
  8. result = async_result.get()
  9. print(result)
  10. # result.forget() # 将结果删除
  11. elif async_result.failed():
  12. print('执行失败')
  13. elif async_result.status == 'PENDING':
  14. print('任务等待中被执行')
  15. elif async_result.status == 'RETRY':
  16. print('任务异常后正在重试')
  17. elif async_result.status == 'STARTED':
  18. print('任务已经开始被执行')
  19. # 运行结果是上面执行返回的结果:
  20. ok

celery多任务结构下异步执行:注意celery_tasks的celery名字是固定,不然会报错

  1. # celery
  2. from celery import Celery
  3. cel = Celery('celery_demo',
  4. broker='redis://127.0.0.1:6379/1',
  5. backend='redis://127.0.0.1:6379/2',
  6. # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
  7. include=['celery_tasks.task01',
  8. 'celery_tasks.task02'
  9. ])
  10. # 时区
  11. cel.conf.timezone = 'Asia/Shanghai'
  12. # 是否使用UTC
  13. cel.conf.enable_utc = False
  14. # task01
  15. import time
  16. from .celery import cel
  17. @cel.task
  18. def send_email(res):
  19. time.sleep(5)
  20. return "完成向%s发送邮件任务"%res
  21. # task02
  22. import time
  23. from .celery import cel
  24. @cel.task
  25. def send_msg(name):
  26. time.sleep(5)
  27. return "完成向%s发送短信任务"%name
  28. # """"
  29. 执行任务文件: produce_task.py 和上面的celery_task保持在同一级目录
  30. 生成者模型
  31. """
  32. from celery_tasks.task01 import send_email
  33. from celery_tasks.task02 import send_msg
  34. # 立即告知celery去执行test_celery任务,并传入一个参数
  35. result = send_email.delay('yuan')
  36. print(result.id)
  37. result = send_msg.delay('yuan')
  38. print(result.id)
E:\desktop\my_drf\celerypro>celery -A celery_tasks worker -l info -P eventlet

运行结果:

 定时任务的配置:

  1. # 更新produce_task 文件,增加定时任务
  2. from celery_task import send_email
  3. from datetime import datetime
  4. # 方式一
  5. # v1 = datetime(2020, 3, 11, 16, 19, 00)
  6. # print(v1)
  7. # v2 = datetime.utcfromtimestamp(v1.timestamp())
  8. # print(v2)
  9. # result = send_email.apply_async(args=["egon",], eta=v2) # 定时任务
  10. # print(result.id)
  11. # 方式二
  12. ctime = datetime.now()
  13. # 默认用utc时间
  14. utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
  15. from datetime import timedelta
  16. time_delay = timedelta(seconds=10) # 当时时间10s后执行任务
  17. task_time = utc_ctime + time_delay
  18. # 使用apply_async并设定时间
  19. result = send_email.apply_async(args=["egon"], eta=task_time)
  20. print(result.id)
  21. # 更新setting
  22. cel.conf.beat_schedule = {
  23. # 名字随意命名
  24. 'add-every-10-seconds': {
  25. # 执行tasks1下的test_celery函数
  26. 'task': 'celery_tasks.task01.send_email',
  27. # 每隔2秒执行一次
  28. # 'schedule': 1.0,
  29. # 'schedule': crontab(minute="*/1"),
  30. 'schedule': timedelta(seconds=6),
  31. # 传递参数
  32. 'args': ('张三',)
  33. },
  34. # 'add-every-12-seconds': {
  35. # 'task': 'celery_tasks.task01.send_email',
  36. # 每年411号,842分执行
  37. # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
  38. # 'args': ('张三',)
  39. # },
  40. }

运行结果:

 根据上述配置每6s执行task01发送邮件任务

注意:

  1. # 周期性执行任务单元,要注意先启动beat进程而后执行worker单元
  2. E:\desktop\my_drf\celerypro>celery -A celery_tasks beat
  3. E:\desktop\my_drf\celerypro>celery -A celery_tasks worker -l info -P eventlet
  4. 注意: 当打开beat后而若没有打开worker执行单元会导致beat进程不断向数据库中加入数据

 

  查看redis堆积的数据方法:cmd命令如下

 python脚本实现:

 celery结合django中集成的运用

  1. # tasks
  2. # celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
  3. from mycelery.main import app
  4. import time
  5. import logging
  6. log = logging.getLogger("django")
  7. @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
  8. def send_sms(mobile):
  9. """发送短信"""
  10. print("向手机号%s发送短信成功!"%mobile)
  11. time.sleep(5)
  12. return "send_sms OK"
  13. @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
  14. def send_sms2(mobile):
  15. print("向手机号%s发送短信成功!" % mobile)
  16. time.sleep(5)
  17. return "send_sms2 OK"
  18. # config
  19. broker_url = 'redis://127.0.0.1:6379/15'
  20. result_backend = 'redis://127.0.0.1:6379/14'
  21. # main
  22. # 主程序
  23. import os
  24. from celery import Celery
  25. # 创建celery实例对象
  26. app = Celery("sms")
  27. # import os
  28. os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1') # 注意: 默认配置要这样配置,下列的配置会找不到组件导致失败
  29. # 把celery和django进行组合,识别和加载django的配置文件
  30. # os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celerypro.settings.dev')
  31. # os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")
  32. # 通过app对象加载配置
  33. app.config_from_object("mycelery.config")
  34. # 加载任务
  35. # 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
  36. # app.autodiscover_tasks(["任务1","任务2"])
  37. app.autodiscover_tasks(["mycelery.sms",])
  38. # view
  39. from django.shortcuts import render,HttpResponse
  40. from mycelery.sms.tasks import send_sms,send_sms2
  41. from datetime import timedelta
  42. from datetime import datetime
  43. def test(request):
  44. ################################# 异步任务
  45. # 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决
  46. send_sms.delay("110")
  47. send_sms2.delay("119")
  48. # send_sms.delay() # 如果调用的任务函数没有参数,则不需要填写任何内容
  49. ################################# 定时任务
  50. ctime = datetime.now()
  51. # 默认用utc时间
  52. utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
  53. time_delay = timedelta(seconds=3) # 3s 发送消息
  54. task_time = utc_ctime + time_delay
  55. result = send_sms.apply_async(["911", ], eta=task_time)
  56. print(result.id)
  57. return HttpResponse('ok')

启动Celery的命令

  1. # 强烈建议切换目录到mycelery根目录下启动
  2. # E:\desktop\my_drf\celerypro>celery -A mycelery.main worker --loglevel=info

运行结果:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/169950
推荐阅读
相关标签
  

闽ICP备14008679号