赞
踩
Celery介绍、安装、基本使用
什么是Celery:
Celery是一个简单、灵活且可靠的,处理信息的分布式系统
Celery可以用来做什么:
非同步任务
延迟任务
Celery的执行原理:
可以不依赖任何服务,通过自身命令,启动服务
celery服务为其他项目服务提供异步解决任务需求
- # 注:會有兩個服務同時執行
- - 專案服務
- - celery服務
- 專案服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成專案的需求
-
-
- '''
- 人是一個獨立執行的服務 | 醫院也是一個獨立執行的服務
- 正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
- 人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立執行,人生病時,醫院就來解決人生病的需求
-
- '''
讯息中介软件:broker
提交的任务【函数】都放在这里, celery本身不能提供信息中介软件
需要借助于第三方: redis或rabbitmq
任务执行单元:worker
真正执行任务的的地方,一个个程序中执行函数
结果储存:backend
函数return的结果都储存在这里, celery本身不提供结果储存
需要借助于第三方: redis或rabbitmq
使用场景 :
异步执行:解决耗时任务
延迟执行:解决延迟任务
定时执行:解决周期任务
Celery不支持在windows上直接执行,通过eventlet支持在win上执行
安装 :
- pip install celery
- pip install eventlet # windows需要安裝
快速使用:
- 1、第一步:建立一個py檔案(main.py),用於範例化celery物件,編寫需要執行的函數 # 1、匯入模組 from celery import Celery # 2、指定briker,用於存放提交的非同步任務 broker = 'redis://127.0.0.1:6379/1' # 3、指定backend,用於存放函數執行結束的結果 backend = 'redis://127.0.0.1:6379/2' # 範例化celery物件 app = Celery('test', broker=broker, backend=backend) # 編寫一個函數,裝飾上celery物件 @app.task def add(a, b): import time time.sleep(3) print('add函數執行完成') return a + b - 2、第二步:再次建立一個py檔案(run.py),用於將函數提交給celery # 1、匯入剛才編寫的函數 from main import add # 2、將任務提交給broker,函數需要的引數需要傳入 res = add.delay(1, 2) # 3、提交後可以獲得該任務的ID,可通過ID可以查詢任務執行結果 print(res) # 0213d2c2-453e-41a8-a171-e31f1f2f4883 - 3、第三步:使用命令開啟worker (也可以提前開啟,任務提交後就會直接執行) # 啟動worker命令,win需要安裝eventlet # 啟動需要進入main.py檔案的目錄下 win: -4.x之前版本 celery worker -A main -l info -P eventlet -4.x之後 celery -A main worker -l info -P eventlet mac: celery -A main worker -l info - 4、第四步:worker會將執行的結果存在之前指定的broker目錄下(指定的redis資料庫) - 5、第五步:通過程式碼檢視執行結果(建立新的py檔案,專門用於檢視執行結果) # 1、匯入celery範例的物件 from main import app # 2、匯入該模組用於檢視結果 from celery.result import AsyncResult # 3、將提交的任務編號拿過來,用於查詢結果 id = '0213d2c2-453e-41a8-a171-e31f1f2f4883' # 4、指定該檔案為啟動檔案 if __name__ == '__main__': # 範例化物件,將任務的ID和celery範例化物件當作引數傳入 a = AsyncResult(id=id, app=app) # 判斷執行結果 if a.successful(): # 執行完了 result = a.get() print(result) elif a.failed(): print('任務失敗') elif a.status == 'PENDING': print('任務等待中被執行') elif a.status == 'RETRY': print('任務異常後正在重試') elif a.status == 'STARTED': print('任務已經開始被執行')
什么是包结构:通过将celery服务封装成包的形式,放在项目需要使用的时候汇入即可
- project
- ├── celery_task # celery包
- │ ├── __init__.py # 包檔案
- │ ├── celery.py # celery連線和設定相關檔案,且名字必須交celery.py
- │ └── tasks.py # 所有任務函數
- ├── add_task.py # 新增任務
- └── get_result.py # 獲取結果
创建包:
建立一个包,名为:celery_task
- 1、第一步:在包下建立py檔案(名字必須為celery.py) # 匯入celery模組 from celery import Celery # 匯入設定broker和backend from .settings import BACKEND, BROKER # 範例化celery物件 app = Celery('test', broker=BROKER, backend=BACKEND, include=['celery_task.order_task', 'celery_task.user_task']) - 2、第二步:建立settings.py,用於存放設定 BROKER = 'redis://127.0.0.1:6379/1' BACKEND = 'redis://127.0.0.1:6379/2' - 3、第三步,建立py檔案(task.py),用於存放需要執行的非同步任務 # 匯入celery範例物件 from .celery import app # 計算函數 @app.task() def add(a, b): print('計算結果為:', a + b) return True # 模擬傳送簡訊 @app.task() def send_sms(mobile, code): print('已向手機號:%s 傳送簡訊,驗證碼為:%s' % (mobile, code)) return True - 4、第四步:開啟worker 切換到celery所在的目錄下,開啟worker命令 celery -A celery_task worker -l info -P eventlet - 5、第五步:提橋任務: # add_task.py 檔案下 # 提交任務,這裡模擬的是非同步任務的提交 res = add.delay(a, b) # 提交後可以接收任務的ID res1 = send_sms.delay(mobile, code) - 6、第六步:檢視任務執行結果: # get_result.py 檔案下 # 匯入celery範例 from celery_task.celery import app from celery.result import AsyncResult id = res id1 = res1 # 通過傳入任務的ID就可以查詢到任務的執行結果 def res_func(id): id = id a = AsyncResult(id=id, app=app) if a.successful(): # 執行完了 result = a.get() if result: return '執行完成' elif a.failed(): return '任務失敗,失敗的原因可能是未開啟worker' elif a.status == 'PENDING': return '任務等待中被執行,當前任務較多或未開啟worker' elif a.status == 'RETRY': return '任務異常後正在重試' elif a.status == 'STARTED': return '任務已經開始被執行,請稍後查詢'
执行非同步任务:
- # 程式碼用法:
- 函數名.delay('函數執行需要的引數')
- res = func.delay(*args,**kwargs) # res 用於接收提交任務的ID
运行延迟任务 :
- # 程式碼用法:
- # 1、執行延遲任務
- from datetime import datetime, timedelta
-
- # 設定延遲後的時間,一分鐘後執行
- eat = datetime.utcnow() + timedelta(minutes=1)
-
- # 提交任務
- res = send_sms.apply_async(args=['13855411111', '123'], eta=eta)
执行定时任务:
执行定时任务需要启动beat和worker
beat:定时提交任务的程序---》设定在app.conf.beat_schedule的任务
worker:执行任务
- 第一步:在celery的py檔案中寫入 # 匯入定時需要的模組 from celery.schedules import crontab # 第一步:在celery的py檔案中寫入 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # celery的組態檔##### # 任務的定時設定 app.conf.beat_schedule = { 'send_sms': { # 設定執行函數的名字 'task': 'celery_task.task.send_sms', # 匯入任務的位置 # 'schedule': timedelta(seconds=3), # 時間物件 # 'schedule': crontab(hour=8, day_of_week=1), # 每週一早八點 'schedule': crontab(hour=9, minute=43), # 每天9點43 'args': ('18888888', '6666'), # 設定執行函數需要的引數 }, } - 第二步:啟動beat # 啟動後配設定的任務會自動提交 celery -A celery_task beat -l info - 第三步:啟動worker # beat提交的任務被被執行 celery -A celery_task worker -l info -P eventlet
补充:
如果在公司中,只做定时任务有一个框架更简单一点
APSchedule:https://blog.csdn.net/qq_41341757/article/details/118759836
使用步骤 :
- -1 把咱們寫的包,複製到專案目錄下
- -luffy_api
- -celery_task #celery的包路徑
- -luffy_api #原始碼路徑
-
- -2 在使用提交非同步任務的位置,匯入使用即可
- -檢視函數中使用,匯入任務
- -任務.delay() # 提交任務
-
-
- -3 啟動worker,如果有定時任務,啟動beat
-
- -4 等待任務被worker執行
-
- -5 在檢視函數中,查詢任務執行的結果
后端
view.py
from celery.result import AsyncResult from celery_task.celery import app from celery_task.task import sckill_task # 秒殺介面 class SeckillView(ViewSet): # 開啟秒殺 @action(methods=['GET'], detail=False) def seckill(self, request): # 獲取商品連結 goods_id = request.query_params.get('goods_id') # 將任務提交給worker res = sckill_task.delay(goods_id) # 將任務的ID反饋給前端 return APIResponse(task_id=str(res)) # 查詢秒殺結果 @action(methods=['GET'], detail=False) def get_result(self, request): # 前端將任務ID產過來,用於接收結果 task_id = request.query_params.get('task_id') # 呼叫介面,查詢結果 a = AsyncResult(id=task_id, app=app) if a.successful(): result = a.get() if result: return APIResponse(msg='秒殺成功') else: return APIResponse(code=101, msg='手速滿了,秒殺失敗') elif a.status == 'PENDING': return APIResponse(code=666, msg='加速秒殺中') return APIResponse(msg='錯誤')
celery.py ---->秒杀任务
- import random
-
-
- # 秒殺函數
- @app.task()
- def sckill_task(goods_id):
- print('商品正在秒殺中')
- time.sleep(random.choice([6, 7, 8, 9]))
- print('商品秒殺結束')
- return random.choice([True, False])
前端:
<template> <div> <button @click="clickHandle">點選秒殺</button> </div> </template> <script> export default { name: "Template", data() { return { // 用於接收任務ID task_id: '', // 使用者存放定時任務 t: '' } }, methods: { // 使用者點選秒殺後傳送請求 clickHandle() { // 向厚點提交秒殺任務 this.$axios.get(this.$settings.BASE_URL + '/user/seckill/seckill/?goods_id=1').then(res => { // 判斷任務是否提交成功 if (res.data.code == 100) { // 提交成功會獲取到任務ID this.task_id = res.data.task_id // 告知使用者商品正在秒殺中 this.$message('正在秒殺中') // 啟動一個定時任務,每隔3秒向後端傳送請求,獲取任務是否提交成功 this.t = setInterval(res => { // 定時向後端傳送請求,判斷秒殺結果 this.$axios.get(this.$settings.BASE_URL + '/user/seckill/get_result/?task_id=' + this.task_id).then(res => { // 判斷任務是否結束 if (res.data.code == 666) { this.$message(res.data.msg) // 任務結束反饋結果,關閉定時器 } else { this.$message(res.data.msg) // 關閉定時器 clearInterval(this.t) this.t = '' } }) }, 3000) } }) } } } </script>
第一步:将celery包复制到项目路径下
- -luffy_api
- -celery_task #celery的包路徑
- celery.py # 一定不要忘了一句話
- import os
- # 重點:celery中使用djagno,任務中可能會使用django的orm,快取,表模型。。。。一定要加
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
- -luffy_api #原始碼路徑
第二步:在需要使用异步的地方汇入celery示例即可使用
- -檢視函數中使用,匯入任務
- -任務.delay() # 提交任務
第三步:启动worker,如果有定时任务,启动beat
第四步: 等待任务被worker执行
第五步:在检视函数中,查询任务结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。