当前位置:   article > 正文

Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置_celery flask

celery flask

一、概述

1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。

2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的

3、使用celery是需要使用到中间件的,简单点就使用redis做中间件

注意:

在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。

使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。

二、项目结构

依赖环境:

  1. celery==4.4.7
  2. eventlet==0.33.3
  3. Flask==2.1.3
  4. Flask-Caching==1.10.1
  5. Flask-Cors==3.0.10
  6. Flask-Migrate==2.7.0
  7. Flask-RESTful==0.3.9
  8. Flask-SocketIO==5.1.1
  9. Flask-SQLAlchemy==2.5.1
  10. PyMySQL==1.0.2
  11. redis==3.5.3
  12. SQLAlchemy==1.4.0
  13. Werkzeug==2.0.2

目录结构:

flask-project

        |--apps

                |-- user

                        |-- models

                        |--views.py

                        |--urls.py

                |--__init__.py

        |--ext

                |--__init__.py

                |--config.py

        |--celery_task

                |--__init__.py

                |--async_task.py

                |--celery.py

                |--celeryconfig.py

                |--check_task.py

                |--scheduler_task.py

        app.py

三、flask工厂模式下各模块功能

1、apps/user/models.py : 写了一个user表

2、apps/user/views.py:写了测试调用celery异步任务的接口

3、apps/user/urls.py: 注册路由的

4、ext/__init__.py:cache、db、cors的拓展

5、ext/config.py : cache和cors使用到的配置

6、apps/__init__.py: 一个函数create_app,生成flask应用对象

7、app.py: 启动flask应用对象的模块

本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。

在视图中在执行异步任务,并获取异步任务的id:

  1. from celery_task.async_task import send_email_task,cache_user_task
  2. #用户资源:get\put\delete, 对单个进行操作
  3. class UserOneResource(ResourceBase):
  4. def put(self,id):
  5. #测试异步发邮件
  6. email = request.args.get('email')
  7. code = request.args.get('code')
  8. res = send_email_task.delay(email,code)
  9. print(res.id)
  10. return NewResponse(msg='put',data={'task_id':res.id})
  11. def patch(self,id):
  12. #测试异步操作flask的orm和cache
  13. p = request.args.get('p')
  14. if p=='set':
  15. res = cache_user_task.delay()
  16. print(res,type(res))
  17. return NewResponse(msg='patch',data={'task_id':res.id})
  18. else:
  19. from ext import cache
  20. data = cache.get('all-user-data')
  21. return NewResponse(msg='patch',data=data)

res = 异步函数.delay(函数需要的参数)

task_id = res.id

注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。

四、celery项目的配置

1、celery的配置

将celery的配置都放到一个py文件中,方便后期的维护和使用

celeryconfig.py

  1. from celery.schedules import crontab
  2. from datetime import timedelta
  3. '''
  4. 参数解析:
  5. accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
  6. task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
  7. result_serializer:结果序列化格式,默认值为json;
  8. timezone:配置Celery以使用自定义时区;
  9. enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
  10. result_expires: 异步任务结果存活时长
  11. beat_schedule:设置定时任务
  12. '''
  13. #手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
  14. task_module = [
  15. 'celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
  16. 'celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
  17. ]
  18. #celery的配置
  19. config = {
  20. "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
  21. "result_backend" : 'redis://127.0.0.1:6379/1',
  22. "task_serializer" : 'json',
  23. "result_serializer" : 'json',
  24. "accept_content" : ['json'],
  25. "timezone" : 'Asia/Shanghai',
  26. "enable_utc" : False,
  27. "result_expires" : 1*60*60,
  28. "beat_schedule" : { #定时任务配置
  29. # 名字随意命名
  30. 'add-func-30-seconds': {
  31. # 执行add_task下的addy函数
  32. 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
  33. # 每10秒执行一次
  34. 'schedule': timedelta(seconds=30),
  35. # add函数传递的参数
  36. 'args': (10, 21)
  37. },
  38. # 名字随意起
  39. 'add-func-5-minutes': {
  40. 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
  41. # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
  42. 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
  43. 'args': (19, 22) # 定时任务需要的参数
  44. },
  45. # 缓存用户数据到cache中
  46. 'cache-user-func': {
  47. 'task': 'celery_task.scheduler_task.cache_user_func',
  48. # 导入任务函数:from celery_task.scheduler_task import cache_user_func
  49. 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
  50. }
  51. }
  52. }

2、创建celery对象

celery.py

  1. from celery import Celery,Task
  2. from .celeryconfig import config,task_module
  3. import sys
  4. import os
  5. '1、把flask项目路径添加到系统环境变量中'
  6. project_path = os.path.dirname(os.path.dirname(__file__))
  7. sys.path.append(project_path)
  8. '''
  9. 2、创建celery应用对象
  10. 'task'可以任务是该celery对象名字,用于区分celery对象
  11. broker是指定消息中间件
  12. backend是指定任务结果存储位置
  13. include是手动指定异步任务所在的模块的位置
  14. '''
  15. #创建celery异步对象
  16. celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
  17. #导入一些基本配置
  18. celery.conf.update(**config)
  19. '3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
  20. class ContextTask(celery.Task):
  21. def __call__(self, *args, **kwargs):
  22. from apps import create_app
  23. app = create_app()
  24. with app.app_context():
  25. return self.run(*args, **kwargs)
  26. celery.Task = ContextTask

注意:

1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。

2、第二步是创建celery通用的方法了,没什么好说的。

3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)

3、异步任务模块

将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。

async_task.py

  1. # 导入celery对象app
  2. from celery_task.celery import celery
  3. from ext import cache
  4. import time
  5. '''
  6. 1、没有返回值的,@app.task(ignore_result=True)
  7. 2、有返回值的任务,@app.task 默认就是(ignore_result=False)
  8. '''
  9. # 没有返回值,禁用掉结果后端
  10. @celery.task
  11. def send_email_task(receiver_email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作
  12. '''
  13. :param email: 接收消息的邮箱,用户的邮箱
  14. :return:
  15. '''
  16. # 模拟邮件发送验证码
  17. time.sleep(5)
  18. return {'result':'邮件已经发送',receiver_email:'2356'}
  19. @celery.task
  20. def cache_user_task():
  21. #orm查询数据,放到cache中
  22. from apps.user.models import UserModel
  23. user = UserModel.query.all()
  24. lis = []
  25. for u in user:
  26. id = u.id
  27. name = u.name
  28. dic = {'id':id,'name':name}
  29. lis.append(dic)
  30. print(dic)
  31. cache.set('all-user-data',lis)
  32. return {'code':200,'msg':'查询数据成功'}

4、定时任务模块

将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。

schedulser_task.py

  1. from celery_task.celery import celery
  2. import time
  3. # 有返回值,返回值可以从结果后端中获取
  4. @celery.task
  5. def add_func(a, b):
  6. print('执行了加法函数',a+b)
  7. return a + b
  8. # 不需要返回值,禁用掉结果后端
  9. @celery.task(ignore_result=True)
  10. def cache_user_func():
  11. print('all')

5、检测任务id获取任务状态和返回值

check_task.py:

  1. from celery.result import AsyncResult
  2. from celery_task.celery import celery
  3. '''验证任务的执行状态的'''
  4. def check_task_status(task_id):
  5. '''
  6. 任务的执行状态:
  7. PENDING :等待执行
  8. STARTED :开始执行
  9. RETRY :重新尝试执行
  10. SUCCESS :执行成功
  11. FAILURE :执行失败
  12. :param task_id:
  13. :return:
  14. '''
  15. result = AsyncResult(id=task_id, app=celery)
  16. dic = {
  17. 'type': result.status,
  18. 'msg': '',
  19. 'data': None,
  20. 'code': 400
  21. }
  22. if result.status == 'PENDING':
  23. dic['msg'] = '任务等待中'
  24. elif result.status == 'STARTED':
  25. dic['msg'] = '任务开始执行'
  26. elif result.status == 'RETRY':
  27. dic['msg'] = '任务重新尝试执行'
  28. elif result.status == 'FAILURE':
  29. dic['msg'] = '任务执行失败了'
  30. elif result.status == 'SUCCESS':
  31. result = result.get()
  32. dic['msg'] = '任务执行成功'
  33. dic['data'] = result
  34. dic['code'] = 200
  35. # result.forget() # 将结果删除
  36. # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
  37. # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
  38. return dic

在视图函数中调用该方法,通过task_id ,返回任务的运行结果。

五、测试

1、运行项目

flask项目(在项目根目录下执行):

        flask run --host 0.0.0.0 --port 5000

celery项目(在项目根目录下执行):

启动celery进程:

windows系统:

        celery -A celery_task.celery worker -l info  -P  eventlet

linux系统:

        celery -A celery_task.celery worker -l info 

启动定时任务(先启动celery进程在启动定时任务):

celery -A celery_task.celery beat -l info

2、运行结果

1、执行异步任务中,将orm数据存到cache中

2、执行定时任务了

六、注意事项

1、在系统中要先安装好redis和mysql,并都启动了

2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。

3、当前的配置下,celery的目录必须是在flask根目录下

七、拓展-改变celery_task的位置

如果你想将celery_task包移动到apps包下,此时你需要修改什么?

1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变

  1. '1、把flask项目路径添加到系统环境变量中'
  2. project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化

  1. '1、加上apps.'
  2. task_module = [
  3. 'apps.celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
  4. 'apps.celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
  5. ]
  6. '2、task参数对应的字符串,加上apps.'
  7. config = {
  8. "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
  9. "result_backend" : 'redis://127.0.0.1:6379/1',
  10. "task_serializer" : 'json',
  11. "result_serializer" : 'json',
  12. "accept_content" : ['json'],
  13. "timezone" : 'Asia/Shanghai',
  14. "enable_utc" : False,
  15. "result_expires" : 1*60*60,
  16. "beat_schedule" : { #定时任务配置
  17. # 名字随意命名
  18. 'add-func-30-seconds': {
  19. # 执行add_task下的addy函数
  20. 'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
  21. # 每10秒执行一次
  22. 'schedule': timedelta(seconds=30),
  23. # add函数传递的参数
  24. 'args': (10, 21)
  25. },
  26. # 名字随意起
  27. 'add-func-5-minutes': {
  28. 'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
  29. # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
  30. 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
  31. 'args': (19, 22) # 定时任务需要的参数
  32. },
  33. # 缓存用户数据到cache中
  34. 'cache-user-func': {
  35. 'task': 'apps.celery_task.scheduler_task.cache_user_func',
  36. # 导入任务函数:from celery_task.scheduler_task import cache_user_func
  37. 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
  38. }
  39. }
  40. }

3、在视图函数导入异步任务的路径也变了

  1. #异步任务
  2. from apps.celery_task.async_task import send_email_task,cache_user_task

4、启动celery和定时任务的命令变量【在项目根目录下执行命令】

启动celery:

windows启动命令: celery  -A  apps.celery_task.celery worker -l info  -P  eventlet

linux启动命令: celery  -A  apps.celery_task.celery worker -l info 

启动定时任务:

celery -A apps.celery_task beat -l info

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/414256
推荐阅读
相关标签
  

闽ICP备14008679号