当前位置:   article > 正文

Celery框架从入门到精通

celery框架
Celery介绍、安装、基本使用

一、Celery服务

什么是Celery:

Celery是一个简单、灵活且可靠的,处理信息的分布式系统

  • Celery可以用来做什么:

  • 非同步任务

  • 延迟任务

Celery的执行原理:

  • 可以不依赖任何服务,通过自身命令,启动服务

  • celery服务为其他项目服务提供异步解决任务需求

  1. # 注:會有兩個服務同時執行
  2. - 專案服務
  3. - celery服務
  4. 專案服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成專案的需求
  5. '''
  6. 人是一個獨立執行的服務 | 醫院也是一個獨立執行的服務
  7. 正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
  8. 人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立執行,人生病時,醫院就來解決人生病的需求
  9. '''

1、celery架构

  • 讯息中介软件:broker

  • 提交的任务【函数】都放在这里, celery本身不能提供信息中介软件

  • 需要借助于第三方: redis或rabbitmq

  • 任务执行单元:worker

  • 真正执行任务的的地方,一个个程序中执行函数

  • 结果储存:backend

  • 函数return的结果都储存在这里, celery本身不提供结果储存

  • 需要借助于第三方: redis或rabbitmq

使用场景 :

  • 异步执行:解决耗时任务

  • 延迟执行:解决延迟任务

  • 定时执行:解决周期任务

2、celery快速使用

Celery不支持在windows上直接执行,通过eventlet支持在win上执行

安装 :

  1. pip install celery
  2. pip install eventlet # windows需要安裝

快速使用:

  1. - 1、第一步:建立一個py檔案(main.py),用於範例化celery物件,編寫需要執行的函數
  2. # 1、匯入模組
  3. from celery import Celery
  4. # 2、指定briker,用於存放提交的非同步任務
  5. broker = 'redis://127.0.0.1:6379/1'
  6. # 3、指定backend,用於存放函數執行結束的結果
  7. backend = 'redis://127.0.0.1:6379/2'
  8. # 範例化celery物件
  9. app = Celery('test', broker=broker, backend=backend)
  10. # 編寫一個函數,裝飾上celery物件
  11. @app.task
  12. def add(a, b):
  13. import time
  14. time.sleep(3)
  15. print('add函數執行完成')
  16. return a + b
  17. - 2、第二步:再次建立一個py檔案(run.py),用於將函數提交給celery
  18. # 1、匯入剛才編寫的函數
  19. from main import add
  20. # 2、將任務提交給broker,函數需要的引數需要傳入
  21. res = add.delay(1, 2)
  22. # 3、提交後可以獲得該任務的ID,可通過ID可以查詢任務執行結果
  23. print(res) # 0213d2c2-453e-41a8-a171-e31f1f2f4883
  24. - 3、第三步:使用命令開啟worker (也可以提前開啟,任務提交後就會直接執行)
  25. # 啟動worker命令,win需要安裝eventlet
  26. # 啟動需要進入main.py檔案的目錄下
  27. win:
  28. -4.x之前版本
  29. celery worker -A main -l info -P eventlet
  30. -4.x之後
  31. celery -A main worker -l info -P eventlet
  32. mac:
  33. celery -A main worker -l info
  34. - 4、第四步:worker會將執行的結果存在之前指定的broker目錄下(指定的redis資料庫)
  35. - 5、第五步:通過程式碼檢視執行結果(建立新的py檔案,專門用於檢視執行結果)
  36. # 1、匯入celery範例的物件
  37. from main import app
  38. # 2、匯入該模組用於檢視結果
  39. from celery.result import AsyncResult
  40. # 3、將提交的任務編號拿過來,用於查詢結果
  41. id = '0213d2c2-453e-41a8-a171-e31f1f2f4883'
  42. # 4、指定該檔案為啟動檔案
  43. if __name__ == '__main__':
  44. # 範例化物件,將任務的ID和celery範例化物件當作引數傳入
  45. a = AsyncResult(id=id, app=app)
  46. # 判斷執行結果
  47. if a.successful(): # 執行完了
  48. result = a.get()
  49. print(result)
  50. elif a.failed():
  51. print('任務失敗')
  52. elif a.status == 'PENDING':
  53. print('任務等待中被執行')
  54. elif a.status == 'RETRY':
  55. print('任務異常後正在重試')
  56. elif a.status == 'STARTED':
  57. print('任務已經開始被執行')

二、Celer包结构

1、建立clery包结构

什么是包结构:通过将celery服务封装成包的形式,放在项目需要使用的时候汇入即可

  1. project
  2. ├── celery_task # celery包
  3. │ ├── __init__.py # 包檔案
  4. │ ├── celery.py # celery連線和設定相關檔案,且名字必須交celery.py
  5. │ └── tasks.py # 所有任務函數
  6. ├── add_task.py # 新增任務
  7. └── get_result.py # 獲取結果

创建包:

建立一个包,名为:celery_task

  1. - 1、第一步:在包下建立py檔案(名字必須為celery.py)
  2. # 匯入celery模組
  3. from celery import Celery
  4. # 匯入設定broker和backend
  5. from .settings import BACKEND, BROKER
  6. # 範例化celery物件
  7. app = Celery('test',
  8. broker=BROKER,
  9. backend=BACKEND,
  10. include=['celery_task.order_task',
  11. 'celery_task.user_task'])
  12. - 2、第二步:建立settings.py,用於存放設定
  13. BROKER = 'redis://127.0.0.1:6379/1'
  14. BACKEND = 'redis://127.0.0.1:6379/2'
  15. - 3、第三步,建立py檔案(task.py),用於存放需要執行的非同步任務
  16. # 匯入celery範例物件
  17. from .celery import app
  18. # 計算函數
  19. @app.task()
  20. def add(a, b):
  21. print('計算結果為:', a + b)
  22. return True
  23. # 模擬傳送簡訊
  24. @app.task()
  25. def send_sms(mobile, code):
  26. print('已向手機號:%s 傳送簡訊,驗證碼為:%s' % (mobile, code))
  27. return True
  28. - 4、第四步:開啟worker
  29. 切換到celery所在的目錄下,開啟worker命令
  30. celery -A celery_task worker -l info -P eventlet
  31. - 5、第五步:提橋任務: # add_task.py 檔案下
  32. # 提交任務,這裡模擬的是非同步任務的提交
  33. res = add.delay(a, b) # 提交後可以接收任務的ID
  34. res1 = send_sms.delay(mobile, code)
  35. - 6、第六步:檢視任務執行結果: # get_result.py 檔案下
  36. # 匯入celery範例
  37. from celery_task.celery import app
  38. from celery.result import AsyncResult
  39. id = res
  40. id1 = res1
  41. # 通過傳入任務的ID就可以查詢到任務的執行結果
  42. def res_func(id):
  43. id = id
  44. a = AsyncResult(id=id, app=app)
  45. if a.successful(): # 執行完了
  46. result = a.get()
  47. if result: return '執行完成'
  48. elif a.failed():
  49. return '任務失敗,失敗的原因可能是未開啟worker'
  50. elif a.status == 'PENDING':
  51. return '任務等待中被執行,當前任務較多或未開啟worker'
  52. elif a.status == 'RETRY':
  53. return '任務異常後正在重試'
  54. elif a.status == 'STARTED':
  55. return '任務已經開始被執行,請稍後查詢'

2、Celery执行异步任务、延迟任务、定时任务

执行非同步任务:

  1. # 程式碼用法:
  2. 函數名.delay('函數執行需要的引數')
  3. res = func.delay(*args,**kwargs) # res 用於接收提交任務的ID

运行延迟任务 :

  1. # 程式碼用法:
  2. # 1、執行延遲任務
  3. from datetime import datetime, timedelta
  4. # 設定延遲後的時間,一分鐘後執行
  5. eat = datetime.utcnow() + timedelta(minutes=1)
  6. # 提交任務
  7. res = send_sms.apply_async(args=['13855411111', '123'], eta=eta)

执行定时任务:

执行定时任务需要启动beat和worker

  • beat:定时提交任务的程序---》设定在app.conf.beat_schedule的任务

  • worker:执行任务

  1. - 第一步:在celery的py檔案中寫入
  2. # 匯入定時需要的模組
  3. from celery.schedules import crontab
  4. # 第一步:在celery的py檔案中寫入
  5. app.conf.timezone = 'Asia/Shanghai'
  6. # 是否使用UTC
  7. app.conf.enable_utc = False
  8. # celery的組態檔#####
  9. # 任務的定時設定
  10. app.conf.beat_schedule = {
  11. 'send_sms': { # 設定執行函數的名字
  12. 'task': 'celery_task.task.send_sms', # 匯入任務的位置
  13. # 'schedule': timedelta(seconds=3), # 時間物件
  14. # 'schedule': crontab(hour=8, day_of_week=1), # 每週一早八點
  15. 'schedule': crontab(hour=9, minute=43), # 每天9點43
  16. 'args': ('18888888', '6666'), # 設定執行函數需要的引數
  17. },
  18. }
  19. - 第二步:啟動beat # 啟動後配設定的任務會自動提交
  20. celery -A celery_task beat -l info
  21. - 第三步:啟動worker # beat提交的任務被被執行
  22. celery -A celery_task worker -l info -P eventlet

三、Django中使用celery

补充:

如果在公司中,只做定时任务有一个框架更简单一点

  • APSchedule:https://blog.csdn.net/qq_41341757/article/details/118759836

使用步骤 :

  1. -1 把咱們寫的包,複製到專案目錄下
  2. -luffy_api
  3. -celery_task #celery的包路徑
  4. -luffy_api #原始碼路徑
  5. -2 在使用提交非同步任務的位置,匯入使用即可
  6. -檢視函數中使用,匯入任務
  7. -任務.delay() # 提交任務
  8. -3 啟動worker,如果有定時任務,啟動beat
  9. -4 等待任務被worker執行
  10. -5 在檢視函數中,查詢任務執行的結果

1、模拟写一个异步秒杀任务

后端

view.py

  1. from celery.result import AsyncResult
  2. from celery_task.celery import app
  3. from celery_task.task import sckill_task
  4. # 秒殺介面
  5. class SeckillView(ViewSet):
  6. # 開啟秒殺
  7. @action(methods=['GET'], detail=False)
  8. def seckill(self, request):
  9. # 獲取商品連結
  10. goods_id = request.query_params.get('goods_id')
  11. # 將任務提交給worker
  12. res = sckill_task.delay(goods_id)
  13. # 將任務的ID反饋給前端
  14. return APIResponse(task_id=str(res))
  15. # 查詢秒殺結果
  16. @action(methods=['GET'], detail=False)
  17. def get_result(self, request):
  18. # 前端將任務ID產過來,用於接收結果
  19. task_id = request.query_params.get('task_id')
  20. # 呼叫介面,查詢結果
  21. a = AsyncResult(id=task_id, app=app)
  22. if a.successful():
  23. result = a.get()
  24. if result:
  25. return APIResponse(msg='秒殺成功')
  26. else:
  27. return APIResponse(code=101, msg='手速滿了,秒殺失敗')
  28. elif a.status == 'PENDING':
  29. return APIResponse(code=666, msg='加速秒殺中')
  30. return APIResponse(msg='錯誤')

celery.py ---->秒杀任务

  1. import random
  2. # 秒殺函數
  3. @app.task()
  4. def sckill_task(goods_id):
  5. print('商品正在秒殺中')
  6. time.sleep(random.choice([6, 7, 8, 9]))
  7. print('商品秒殺結束')
  8. return random.choice([True, False])

前端:

  1. <template>
  2. <div>
  3. <button @click="clickHandle">點選秒殺</button>
  4. </div>
  5. </template>
  6. <script>
  7. export default {
  8. name: "Template",
  9. data() {
  10. return {
  11. // 用於接收任務ID
  12. task_id: '',
  13. // 使用者存放定時任務
  14. t: ''
  15. }
  16. },
  17. methods: {
  18. // 使用者點選秒殺後傳送請求
  19. clickHandle() {
  20. // 向厚點提交秒殺任務
  21. this.$axios.get(this.$settings.BASE_URL + '/user/seckill/seckill/?goods_id=1').then(res => {
  22. // 判斷任務是否提交成功
  23. if (res.data.code == 100) {
  24. // 提交成功會獲取到任務ID
  25. this.task_id = res.data.task_id
  26. // 告知使用者商品正在秒殺中
  27. this.$message('正在秒殺中')
  28. // 啟動一個定時任務,每隔3秒向後端傳送請求,獲取任務是否提交成功
  29. this.t = setInterval(res => {
  30. // 定時向後端傳送請求,判斷秒殺結果
  31. this.$axios.get(this.$settings.BASE_URL + '/user/seckill/get_result/?task_id=' + this.task_id).then(res => {
  32. // 判斷任務是否結束
  33. if (res.data.code == 666) {
  34. this.$message(res.data.msg)
  35. // 任務結束反饋結果,關閉定時器
  36. } else {
  37. this.$message(res.data.msg)
  38. // 關閉定時器
  39. clearInterval(this.t)
  40. this.t = ''
  41. }
  42. })
  43. }, 3000)
  44. }
  45. })
  46. }
  47. }
  48. }
  49. </script>

2、总结

  • 第一步:将celery包复制到项目路径下

  1. -luffy_api
  2. -celery_task #celery的包路徑
  3. celery.py # 一定不要忘了一句話
  4. import os
  5. # 重點:celery中使用djagno,任務中可能會使用django的orm,快取,表模型。。。。一定要加
  6. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
  7. -luffy_api #原始碼路徑
  • 第二步:在需要使用异步的地方汇入celery示例即可使用

  1. -檢視函數中使用,匯入任務
  2. -任務.delay() # 提交任務
  • 第三步:启动worker,如果有定时任务,启动beat

  • 第四步: 等待任务被worker执行

  • 第五步:在检视函数中,查询任务结果

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号