赞
踩
1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。
2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的
3、使用celery是需要使用到中间件的,简单点就使用redis做中间件
注意:
在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。
使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。
依赖环境:
- celery==4.4.7
- eventlet==0.33.3
- Flask==2.1.3
- Flask-Caching==1.10.1
- Flask-Cors==3.0.10
- Flask-Migrate==2.7.0
- Flask-RESTful==0.3.9
- Flask-SocketIO==5.1.1
- Flask-SQLAlchemy==2.5.1
- PyMySQL==1.0.2
- redis==3.5.3
- SQLAlchemy==1.4.0
- Werkzeug==2.0.2
目录结构:
flask-project
|--apps
|-- user
|-- models
|--views.py
|--urls.py
|--__init__.py
|--ext
|--__init__.py
|--config.py
|--celery_task
|--__init__.py
|--async_task.py
|--celery.py
|--celeryconfig.py
|--check_task.py
|--scheduler_task.py
app.py
1、apps/user/models.py : 写了一个user表
2、apps/user/views.py:写了测试调用celery异步任务的接口
3、apps/user/urls.py: 注册路由的
4、ext/__init__.py:cache、db、cors的拓展
5、ext/config.py : cache和cors使用到的配置
6、apps/__init__.py: 一个函数create_app,生成flask应用对象
7、app.py: 启动flask应用对象的模块
本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。
在视图中在执行异步任务,并获取异步任务的id:
- from celery_task.async_task import send_email_task,cache_user_task
- #用户资源:get\put\delete, 对单个进行操作
- class UserOneResource(ResourceBase):
- def put(self,id):
- #测试异步发邮件
- email = request.args.get('email')
- code = request.args.get('code')
- res = send_email_task.delay(email,code)
- print(res.id)
- return NewResponse(msg='put',data={'task_id':res.id})
-
- def patch(self,id):
- #测试异步操作flask的orm和cache
- p = request.args.get('p')
- if p=='set':
- res = cache_user_task.delay()
- print(res,type(res))
- return NewResponse(msg='patch',data={'task_id':res.id})
- else:
- from ext import cache
- data = cache.get('all-user-data')
- return NewResponse(msg='patch',data=data)
res = 异步函数.delay(函数需要的参数)
task_id = res.id
注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。
将celery的配置都放到一个py文件中,方便后期的维护和使用
celeryconfig.py
- from celery.schedules import crontab
- from datetime import timedelta
- '''
- 参数解析:
- accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
- task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
- result_serializer:结果序列化格式,默认值为json;
- timezone:配置Celery以使用自定义时区;
- enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
- result_expires: 异步任务结果存活时长
- beat_schedule:设置定时任务
- '''
- #手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
- task_module = [
- 'celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
- 'celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
- ]
-
- #celery的配置
- config = {
- "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
- "result_backend" : 'redis://127.0.0.1:6379/1',
- "task_serializer" : 'json',
- "result_serializer" : 'json',
- "accept_content" : ['json'],
- "timezone" : 'Asia/Shanghai',
- "enable_utc" : False,
- "result_expires" : 1*60*60,
- "beat_schedule" : { #定时任务配置
- # 名字随意命名
- 'add-func-30-seconds': {
- # 执行add_task下的addy函数
- 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
- # 每10秒执行一次
- 'schedule': timedelta(seconds=30),
- # add函数传递的参数
- 'args': (10, 21)
- },
- # 名字随意起
- 'add-func-5-minutes': {
- 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
- # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
- 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
- 'args': (19, 22) # 定时任务需要的参数
- },
- # 缓存用户数据到cache中
- 'cache-user-func': {
- 'task': 'celery_task.scheduler_task.cache_user_func',
- # 导入任务函数:from celery_task.scheduler_task import cache_user_func
- 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
- }
- }
- }
celery.py
- from celery import Celery,Task
- from .celeryconfig import config,task_module
- import sys
- import os
- '1、把flask项目路径添加到系统环境变量中'
- project_path = os.path.dirname(os.path.dirname(__file__))
- sys.path.append(project_path)
-
- '''
- 2、创建celery应用对象
- 'task'可以任务是该celery对象名字,用于区分celery对象
- broker是指定消息中间件
- backend是指定任务结果存储位置
- include是手动指定异步任务所在的模块的位置
- '''
- #创建celery异步对象
- celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
- #导入一些基本配置
- celery.conf.update(**config)
-
- '3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
- class ContextTask(celery.Task):
- def __call__(self, *args, **kwargs):
- from apps import create_app
- app = create_app()
- with app.app_context():
- return self.run(*args, **kwargs)
- celery.Task = ContextTask
注意:
1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。
2、第二步是创建celery通用的方法了,没什么好说的。
3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)
将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。
async_task.py
- # 导入celery对象app
- from celery_task.celery import celery
- from ext import cache
- import time
-
-
- '''
- 1、没有返回值的,@app.task(ignore_result=True)
- 2、有返回值的任务,@app.task 默认就是(ignore_result=False)
- '''
-
-
- # 没有返回值,禁用掉结果后端
- @celery.task
- def send_email_task(receiver_email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作
- '''
- :param email: 接收消息的邮箱,用户的邮箱
- :return:
- '''
- # 模拟邮件发送验证码
- time.sleep(5)
- return {'result':'邮件已经发送',receiver_email:'2356'}
-
- @celery.task
- def cache_user_task():
- #orm查询数据,放到cache中
- from apps.user.models import UserModel
- user = UserModel.query.all()
- lis = []
- for u in user:
- id = u.id
- name = u.name
- dic = {'id':id,'name':name}
- lis.append(dic)
- print(dic)
- cache.set('all-user-data',lis)
- return {'code':200,'msg':'查询数据成功'}
将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。
schedulser_task.py
- from celery_task.celery import celery
- import time
-
-
- # 有返回值,返回值可以从结果后端中获取
- @celery.task
- def add_func(a, b):
- print('执行了加法函数',a+b)
- return a + b
-
-
- # 不需要返回值,禁用掉结果后端
- @celery.task(ignore_result=True)
- def cache_user_func():
- print('all')
-
-
check_task.py:
- from celery.result import AsyncResult
- from celery_task.celery import celery
-
- '''验证任务的执行状态的'''
-
-
- def check_task_status(task_id):
- '''
- 任务的执行状态:
- PENDING :等待执行
- STARTED :开始执行
- RETRY :重新尝试执行
- SUCCESS :执行成功
- FAILURE :执行失败
- :param task_id:
- :return:
- '''
- result = AsyncResult(id=task_id, app=celery)
- dic = {
- 'type': result.status,
- 'msg': '',
- 'data': None,
- 'code': 400
- }
- if result.status == 'PENDING':
- dic['msg'] = '任务等待中'
- elif result.status == 'STARTED':
- dic['msg'] = '任务开始执行'
- elif result.status == 'RETRY':
- dic['msg'] = '任务重新尝试执行'
- elif result.status == 'FAILURE':
- dic['msg'] = '任务执行失败了'
- elif result.status == 'SUCCESS':
- result = result.get()
- dic['msg'] = '任务执行成功'
- dic['data'] = result
- dic['code'] = 200
- # result.forget() # 将结果删除
- # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
- # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
- return dic
-
在视图函数中调用该方法,通过task_id ,返回任务的运行结果。
flask项目(在项目根目录下执行):
flask run --host 0.0.0.0 --port 5000
celery项目(在项目根目录下执行):
启动celery进程:
windows系统:
celery -A celery_task.celery worker -l info -P eventlet
linux系统:
celery -A celery_task.celery worker -l info
启动定时任务(先启动celery进程在启动定时任务):
celery -A celery_task.celery beat -l info
1、执行异步任务中,将orm数据存到cache中
2、执行定时任务了
1、在系统中要先安装好redis和mysql,并都启动了
2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。
3、当前的配置下,celery的目录必须是在flask根目录下
如果你想将celery_task包移动到apps包下,此时你需要修改什么?
1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变
- '1、把flask项目路径添加到系统环境变量中'
- project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化
- '1、加上apps.'
- task_module = [
- 'apps.celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
- 'apps.celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
- ]
-
-
- '2、task参数对应的字符串,加上apps.'
- config = {
- "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
- "result_backend" : 'redis://127.0.0.1:6379/1',
- "task_serializer" : 'json',
- "result_serializer" : 'json',
- "accept_content" : ['json'],
- "timezone" : 'Asia/Shanghai',
- "enable_utc" : False,
- "result_expires" : 1*60*60,
- "beat_schedule" : { #定时任务配置
- # 名字随意命名
- 'add-func-30-seconds': {
- # 执行add_task下的addy函数
- 'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
- # 每10秒执行一次
- 'schedule': timedelta(seconds=30),
- # add函数传递的参数
- 'args': (10, 21)
- },
- # 名字随意起
- 'add-func-5-minutes': {
- 'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
- # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
- 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
- 'args': (19, 22) # 定时任务需要的参数
- },
- # 缓存用户数据到cache中
- 'cache-user-func': {
- 'task': 'apps.celery_task.scheduler_task.cache_user_func',
- # 导入任务函数:from celery_task.scheduler_task import cache_user_func
- 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
- }
- }
- }
3、在视图函数导入异步任务的路径也变了
- #异步任务
- from apps.celery_task.async_task import send_email_task,cache_user_task
4、启动celery和定时任务的命令变量【在项目根目录下执行命令】
启动celery:
windows启动命令: celery -A apps.celery_task.celery worker -l info -P eventlet
linux启动命令: celery -A apps.celery_task.celery worker -l info
启动定时任务:
celery -A apps.celery_task beat -l info
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。