当前位置:   article > 正文

Flask项目中使用Celery遇到的一些坑整理_celery + flask 版本问题

celery + flask 版本问题

Flask项目中Celery做异步任务遇到的一些坑

项目目录结构

在这里插入图片描述

一、celery_app.py 创建celery_app

  • 我选择celery_app放在celery_tasks文件夹目录下
    在这里插入图片描述
  • celery异步任务中若要用到Sqlalchemy做ORM的话,需要让application导入mysql_config
  • 例如有两个异步任务分别放在了"user/tool/semantic_analysis_tasks.py"以及"globals/home.py"两个py文件里,那么需要在celery_server中用include参数进行路径的导入才能正常使用任务
  • 下面是celery_app.py文件的示例代码
# -*- coding: utf8 -*-

# package import
from celery import Celery
from flask import Flask
from config import celery_config, mysql_config
from dao.mysql.a1_and_cloud.connect import MySQLConnection
from services.helpler.data_operate import LogDataOperate
from util.exts import db

"""
新初始化一个Flask,否则会造成循环引用。
名字一定不能与app.py中的Flask应用重名
例如app.py中的Flask应用的名称为app,tasks.py文件中的Flask应用名称为application
"""

application = Flask(__name__)

# import celery_config and mysql_config for application
application.config.from_object(celery_config)
application.config.from_object(mysql_config)

# init db
db.init_app(application)


# make celery function
def make_celery(input_app):
    celery_server = Celery(
        input_app.import_name, backend=input_app.config['RESULT_BACKEND'], broker=input_app.config['BROKER_URL'],
        include=['celery_tasks.globals.home', 'celery_tasks.user.tool.semantic_analysis_tasks']
    )
    celery_server.conf.update(input_app.config)

    task_base = celery_server.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            with input_app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery_server.Task = ContextTask
    return celery_server


celery_app = make_celery(application)

"""
if you don't want to use include params in celery_server, you can use following code, it's same to include.  
"""
# celery_app.autodiscover_tasks(['celery_tasks.user.tool.semantic_analysis_tasks', 'celery_tasks.globals.home'])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

二、manage.py 数据迁移

  • Flask中若要使用ORM的话,需要自己写个manage.py文件进行迁移。而项目中要用到celery做异步任务的话,manage.py与纯Flask项目有些区别。下面是用到celery的manage.py的示例代码
"""
# -*- coding:utf-8 -*-
用命令对数据库表进行迁移

这是Flask项目进行迁移的命令
python manage.py db init
python manage.py db migrate
python manage.py db upgrade

如果想回退到上一级迁移文件的话,要用到下面这个命令
python manage.py db downgrade
"""

# package import
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand

# import app and celery_app
from app import app
from celery_tasks.celery_app import celery_app

from models.models import *
from util.exts import db


"""
tips:
    You must define two manager if you want to use celery, because Flask basic app and Celery app is different app
    This is important !
    If you don't create celery_manager, celery task will except a lot of exception, such as SqlAlchemy don't callback
"""

# define the manager for flask basic app
manager = Manager(app)

# use Migrate band db and flask basic app
migrate = Migrate(app, db)

# add migrate command to flask basic app manager
manager.add_command('db', MigrateCommand)

# define the manager for celery app
celery_manager = Manager(celery_app)

# use Migrate band db and celery app
celery_migrate = Migrate(celery_app, db)

# add migrate command to celery basic app manager
celery_manager.add_command('db', MigrateCommand)


# run flask basic app manager and celery app manager
if __name__ == "__main__":

	# run manager and celery_manager
    manager.run()

    celery_manager.run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

三、start_celery.sh 启动celery服务

  • celery_app我放在了celery_tasks目录下,所以需要用下面的命令启动celery服务
# start celery serve
celery -A celery_tasks.celery_app.celery_app worker -l info -P eventlet  -c 10
  • 1
  • 2

四、get_celery_result.py 获取任务结果

  • 如果要获取任务结果的话,AsyncResult中传入的id一定要是字符串,负责会报错,下面是获取celery任务结果的示例代码
# -*- coding: utf8 -*-

# package import
from celery.result import AsyncResult
from celery_tasks.celery_app import celery_app


# create class CeleryResultGet
class CeleryResultGet:
    def __init__(self, task_id, data_type):
        """
        :param task_id: celery task id
        :param data_type: the type of you want to get (status, result...)
        """

        self.task_id = str(task_id)
        self.data_type = data_type
        self.data = None

    # get celery task data function
    def get_task_data(self):

        if self.data_type == 'status':

            self.data = AsyncResult(id=self.task_id, app=celery_app).status

        elif self.data_type == 'result':

            self.data = AsyncResult(id=self.task_id, app=celery_app).result

        return self.data
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

目前就遇到这些问题,有新的坑再更新

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/414314
推荐阅读
相关标签
  

闽ICP备14008679号