当前位置:   article > 正文

celery介绍

celery

目录

官方

架构: 

 celery 是独立的服务

celery包结构

 celery执行异步任务,延时任务,定时任务

异步任务

 延时任务

 定时任务

 django 使用celery

秒杀逻辑

双写一致性


官方

Celery 官网:Celery - Distributed Task Queue — Celery 5.2.7 documentation

Celery 官方文档英文版:Celery - Distributed Task Queue — Celery 5.3.0b1 documentation

Celery 官方文档中文版:Celery - 分布式任务队列 — Celery 3.1.7 文档

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

1. 完成异步任务:可以提高项目的并发量,以前用多线程实现项目的并发量,现在可以使用celery来做

2. 完成延时任务

3. 完成定时任务

架构: 

消息中间件:  broker 提交的任务【函数】都放在这里, celery本身不能提供消息中间件,需要借助于第三方: redis, rabbitmq

任务执行单元: worker,是真正执行任务的的地方, 一个个进程中执行函数

结果存储: backend, 函数return的结果都存储在这里, celery本身不提供结果存储,需要借助于第三方: redis 数据库, rabbitmq

 celery 是独立的服务

1, 可以不依赖任何服务器,通过自身命令来启动服务

2, celery服务为为其他项目服务提供一部解决任务的需求’  ## 会有两个服务同时运行,一个是项目服务,一个是celery服务, 项目服务讲需要异步处理的任务交给celery服务 , celery就会在需要时一步完成项目的需求

安装

psi3 install celery

使用步骤

1, 写一个main.py: 实例化得到app对象, 写函数,任务,注册成celery的任务, 

2, 在别的程序中提交任务》》》提交到broker中去  add.delay(3,4)

 执行异步任务

  1. add.apply_asyn()
  2. add.delay()

 main.py

  1. import time
  2. from celery import Celery
  3. backend = 'redis://127.0.0.1:6379/1'
  4. broker = 'redis://127.0.0.1:6379/0'
  5. app = Celery('test',backend=backend,broker=broker)
  6. # 以上实例化得到对象
  7. # 写任务
  8. @app.task
  9. def add(a,b):
  10. time.sleep(3)
  11. print(a+b)
  12. return a+b

s1.py

  1. from main import add
  2. print('good evening')
  3. # 执行的时同步任务
  4. res = add(3,4)
  5. print(res)
  6. # 3 执行异步任务
  7. # add.apply_async()
  8. print(add.delay(2, 7)) # 74523b46-68a5-4429-b061-723ccf3f9b82
  9. print(add.delay(7, 5)) # 25e9b218-55ff-4a0a-a41f-76cde3ff8833

s2.py

  1. def outer(func):
  2. def inner(*args,**kwargs):
  3. res = func(*args,**kwargs)
  4. return res
  5. inner.delay='xxx'
  6. return inner
  7. @outer
  8. def add():
  9. print('aaa')
  10. #
  11. # add.name='yietong'
  12. print(add.delay)
'
运行

3, 启动worker, 从broker中去任务执行,执行完放到backend中

  1. win:
  2. celery worker -A main -l info -P eventlet # 4.x及之前用这个
  3. celery -A main worker -l info -P eventlet # 5.x及之后用这个
  4. lin,mac:
  5. celery worker -A main -l info
  6. celery -A main worker -l info

 eventlet模块需要安装  pip3 install eventlet

4, 在backend 中查看任务执行的结果

直接看或者通过代码查看

  1. from main import app
  2. from celery.result import AsyncResult
  3. id = '5f7bbf70-9946-4085-b993-f5b8a8d0bd11'
  4. if __name__ == '__main__':
  5. res = AsyncResult(id=id,app=app)
  6. if res.successful():
  7. result = res.get()
  8. print(result) # 12
  9. elif res.failed():
  10. print('任务失败')
  11. elif res.status == 'PENDING':
  12. print('任务正在等待中')
  13. elif res.status== 'STATED':
  14. print('任务已经开始被执行')

停掉worker后

 重启服务后

celery包结构

项目  

 celery_task【包】
            -__init__.py
            -celery.py
            -user_task.py
            -home_task.py
        add_task.py
        get_result.py

写一个celery包,以后在任意项目中需要使用的时候把包copy进去,导入使用即可。

使用步骤;

  •  新建包: celery_task
  • 在包里先新建一个celery.py
  • 初始化app
  1. import celery
  2. from celery import Celery
  3. app = celery.Celery()
  4. backend = 'redis://127.0.0.1:6379/1'
  5. broker = 'redis://127.0.0.1:6379/0'
  6. # 一定不要忘了include
  7. app = Celery(__name__,broker=broker,backend=backend,include=['celery_task.home_task','celery_task.user_task'])
  • 在包里新建user_task.py 编写用户相关任务
  1. import time
  2. from .celery import app
  3. @app.task
  4. def send_sms(mobile,code):
  5. time.sleep(1)
  6. print('短信发送成功:%s,验证码是%s'%(mobile,code))
  7. return True
  • 在包里新建home_task.py 编写首页相关任务
  1. import time
  2. from .celery import app
  3. @app.task
  4. def add(a,b):
  5. time.sleep(3)
  6. print('计算结果时%s'%(a+b))
  7. return a+b

  • 其他程序提交任务

  • 启动worker》》》它可以先启动【在提交任务之前】》》》cd 到包所在的目录下
celery -A celery_task worker -l info -P eventlet
  • 查看任务执行的结果

 get_result.py

  1. from celery_task.celery import app
  2. from celery.result import AsyncResult
  3. def get(task_id):
  4. asy = AsyncResult(id=task_id, app=app)
  5. if asy.successful():
  6. res = asy.get()
  7. print('任务执行结果:', res)
  8. elif asy.failed():
  9. print('任务失败')
  10. elif asy.status == 'PENDING':
  11. print('任务等待中被执行')
  12. elif asy.status == 'RETRY':
  13. print('任务异常后正在重试')
  14. elif asy.status == 'STARTED':
  15. print('任务已经开始被执行')
  16. if __name__ == '__main__':
  17. # 任务id,提交任务时返回的结果
  18. task_id = 'bb52fd1a-43e6-4c36-852c-9b1c940a1ad7'
  19. get(task_id)

 celery执行异步任务,延时任务,定时任务

异步任务

task.delay(*args,**kwargs)

 延时任务

  1. task.apply_async(args=[参数,参数],eta=时间对象(utc时间))
  2. """
  3. 参数
  4. args:任务需要的参数
  5. countdown:几秒后执行
  6. retry:任务失败是否重试,默认为True
  7. 其他参数:
  8. eta:时间对象
  9. """
  1. from celery_task.home_task import add
  2. # 提交一个add的异步任务
  3. #eta 是一个时间任务。 要写一个5秒后的时间对象
  4. from datetime import datetime,timedelta
  5. # 得到10miao后的时间,celery 默认使用utc时间
  6. eta = datetime.utcnow()+timedelta(seconds=10)
  7. res = add.apply_async(args=(200,20),eta=eta)
  8. print(res) # 9d609f6f-4d08-4b62-999c-a9466d3819e5

 定时任务

  -1 app的配置文件中配置 ,写在celery.py中

  1. app.conf.beat_schedule = {
  2. 'send_sms_task': {
  3. 'task': 'celery_task.user_task.send_sms',
  4. 'schedule': timedelta(seconds=5),
  5. # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
  6. 'args': ('1897334444', '7777'),
  7. },
  8. 'add_task': {
  9. 'task': 'celery_task.home_task.add',
  10. 'schedule': crontab(hour=12, minute=10, day_of_week=3), # 每周一早八点
  11. 'args': (10, 20),
  12. }
  13. }

    -2 启动worker :干活的人
          

 celery -A celery_task worker -l info -P eventlet


        -3 启动beat :提交任务的人
         

   celery -A celery_task beat -l info
  1. # 设置时区
  2. app.conf.timezone ='Asia/Shanghai'
  3. # 是否使用utc时间
  4. app.conf.enable_utc = False
  5. from celery.schedules import crontab
  6. # app的配置信息
  7. app.conf.beat_schedule = {
  8. 'send_sms_task': {
  9. 'task': 'celery_task.user_task.send_sms',
  10. 'schedule': timedelta(seconds=5),
  11. # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
  12. 'args': ('18595992917', '7777'),
  13. },
  14. 'add_task': {
  15. 'task': 'celery_task.home_task.add',
  16. 'schedule': crontab(hour=22, minute=10, day_of_week=3), # 每周一早八点
  17. 'args': (10, 20),
  18. }
  19. }

补充:

如果公司只想做定时任务, celery比较大,比较麻烦,一般公司会使用 pip install apscheduler

  1. # 每隔多长事件
  2. import time
  3. from apscheduler.schedulers.blocking import BlockingScheduler
  4. # 任务
  5. def my_job(i):
  6. print (i)
  7. sched = BlockingScheduler()
  8. sched.add_job(my_job, 'interval', seconds=5,values=['学会了'])
  9. ## 按年月日
  10. import datetime
  11. from apscheduler.schedulers.blocking import BlockingScheduler
  12. scheduler = BlockingScheduler()
  13. def my_job(text):
  14. print(text)
  15. # datetime类型(用于精确时间)
  16. scheduler.add_job(my_job, 'date', run_date=datetime(2022, 4, 25, 17, 30, 5), args=['测试任务'])
  17. ## 按corn
  18. import datetime
  19. from apscheduler.schedulers.background import BackgroundScheduler
  20. def job_func(text):
  21. print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
  22. scheduler = BackgroundScheduler()
  23. # 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务
  24. scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')

 django 使用celery

使用步骤:

   1 把写好的包(celery_task)复制到项目路径下
    2 在包内的celery.py 的上面加入代码

  1.     import os
  2.         os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
  3.         import django
  4.         django.setup()


   3 在django的视图类中,导入,提交任务

  1. 任务.delay()
  2. 任务.apply_async()


    4 启动worker,beat

  1. celery -A celery_task worker -l info -P eventlet
  2. celery -A celery_task beat -l info

celery 实现定时更新缓存

双写一致性

redis双写一致性指的是redis和数据库的数据要同时更新

我们都知道把数据库的数据暂存与redis,之后取数据都去redis中去, 这样就可以减少时间消耗,但是会出现一个问题, 数据库更i轩尼诗,redis没有更新,使用取数据取到的还是原来的值。

首先要了解数据库的数据是什么时候存到redis中的

一般来说, 前端发送Ajax请求,会先从redis中取,如果有值,则直接返回,如果没有值,就从数据库中取值并保存到redis中。

使用根据以上流程有以下解决办法

1. 先更新数据库,在更新缓存

2, 先删除缓存,在更新数据库

3,先更新数据库,在删除缓存(这种比较多)

4,定时更新缓存(隔5分钟更新一次)

定时更新缓存

视图类

  1. class BannerView(GenericViewSet, ListModelMixin):
  2. queryset = models.Banner.objects.all()
  3. serializer_class = serializer.BannerSerializer
  4. def list(self, request, *args, **kwargs):
  5. banner_list = cache.get('banner_list')
  6. if banner_list:
  7. # redis中有值直接返回
  8. return Response(banner_list)
  9. else:
  10. # redis中没有值,获取数据再存入redis
  11. res = super(BannerView, self).list(request, *args, **kwargs)
  12. cache.set('banner_list', res.data)
  13. return res

celery.py

  1. from celery import Celery
  2. import os
  3. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
  4. import django
  5. django.setup()
  6. broker = 'redis://127.0.0.1:6379/2'
  7. backend = 'redis://127.0.0.1:6379/3'
  8. include = [
  9. 'celery_task.tasks'
  10. ]
  11. app = Celery('main', broker=broker, backend=backend, include=include)
  12. from datetime import timedelta
  13. app.conf.beat_schedule = {
  14. 'banner_update': {
  15. 'task': 'celery_task.tasks.banner_update', # 任务路径
  16. 'schedule': timedelta(seconds=10), # 定时
  17. 'args': (), # 任务参数
  18. }
  19. }

任务

  1. @app.task
  2. def banner_update():
  3. query_set = models.Banner.objects.all()
  4. ser = serializer.BannerSerializer(instance=query_set, many=True)
  5. cache.set('banner_list', ser.data)
  6. return True

秒杀逻辑

前端使用秒杀按钮, 

事件: 像后端秒杀接口发送请求, 发送完立马起一个定时任务, 每隔5秒,像后端查看一下是否秒杀成功, 如果没成功,定时任务继续执行, 如果成功,清空定时任务。

  1. handleClick() {
  2. this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
  3. if (res.data.code == 100) {
  4. let task_id = res.data.id
  5. this.$message({
  6. message: res.data.msg,
  7. type: 'error'
  8. });
  9. // 起个定时任务,每隔5s向后端查询一下是否秒杀成功
  10. let t = setInterval(() => {
  11. this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
  12. res => {
  13. if (res.data.code == 100 || res.data.code == 101) { //秒杀结束了,要么成功,要么失败了
  14. alert(res.data.msg)
  15. // 销毁掉定时任务
  16. clearInterval(t)
  17. } else if (res.data.code == 102) {
  18. //什么事都不干
  19. }
  20. }
  21. )
  22. }, 5000)
  23. }
  24. })
  25. }

后端: 

秒杀接口

提交秒杀任务

  1. def seckill(request):
  2. # 提交秒杀任务
  3. res = seckill_task.delay()
  4. return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})
'
运行

查询是否秒杀成功的接口  【根据用户传入的id,查询任务是否成功】

  1. def get_result(request):
  2. task_id = request.GET.get('id')
  3. res = AsyncResult(id=task_id, app=app)
  4. if res.successful():
  5. result = res.get() # 7
  6. return JsonResponse({'code': 100, 'msg': str(result)})
  7. elif res.failed():
  8. print('任务失败')
  9. return JsonResponse({'code': 101, 'msg': '秒杀失败'})
  10. elif res.status == 'PENDING':
  11. print('任务等待中被执行')
  12. return JsonResponse({'code': 102, 'msg': '还在排队'})
'
运行

 

双写一致性

接口增加缓存

首页轮播图接口增加缓存, 提高了接口的响应速度,提高并发量

  1. class BannerView(GenericViewSet, CommonListModelMixin):
  2. queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
  3. serializer_class = BannerSerializer
  4. def list(self, request, *args, **kwargs):
  5. result = cache.get('banner_list')
  6. if result: # 缓存里有
  7. print('走了缓存,速度很快')
  8. return APIResponse(result=result)
  9. else:
  10. # 去数据库拿
  11. print('走了数据库,速度慢')
  12. res = super().list(request, *args, **kwargs)
  13. result = res.data.get('result') # {code:100,msg:成功,result:[{},{}]}
  14. cache.set('banner_list', result)
  15. return res

加了缓存,如果MySQL数据库变了,由于请求的都是缓存的数据,导致MySQL和redis的数据不一致, 这就涉及到了双写一致性的问题

1, 修改MySQL数据库,删除缓存,

2, 修改数据库,修改缓存

3, 定时更新缓存  》》  针对实时性不是很高的接口适合定时更新

给首页轮播图接口加了缓存,出现了双写一致性问题, 使用定时更新来解决双写一致性的问题【会存在不一致的情况,可以忽略】 使用celery定时任务

home_task.py

  1. @app.task
  2. def update_banner():
  3. # 更新缓存
  4. # 查询出现在轮播图的数据
  5. queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
  6. ser = BannerSerializer(instance=queryset, many=True)
  7. # ser 中得图片,没有前面地址
  8. for item in ser.data:
  9. item['image'] = settings.HOST_URL + item['image']
  10. cache.set('banner_list', ser.data)
  11. return True

 celery.py

  1. app.conf.beat_schedule = {
  2. 'update_banner': {
  3. 'task': 'celery_task.home_task.update_banner',
  4. 'schedule': timedelta(seconds=50),
  5. 'args': (),
  6. }
  7. }

启动django, worker,beat

第一次访问: 查的数据库放入了缓存,以后再访问就走缓存。 一旦MySQL数据改了,缓存可能不一致。 定时更新,保持了一致

补充:

@app.task 与@shared.task的区别

他俩的作用一样, 第一个需要执行app, 第二个直接导入使用, 直接从内存中取出来app对象

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/819400
推荐阅读
相关标签
  

闽ICP备14008679号