赞
踩
# -*- coding: utf8 -*- # package import from celery import Celery from flask import Flask from config import celery_config, mysql_config from dao.mysql.a1_and_cloud.connect import MySQLConnection from services.helpler.data_operate import LogDataOperate from util.exts import db """ 新初始化一个Flask,否则会造成循环引用。 名字一定不能与app.py中的Flask应用重名 例如app.py中的Flask应用的名称为app,tasks.py文件中的Flask应用名称为application """ application = Flask(__name__) # import celery_config and mysql_config for application application.config.from_object(celery_config) application.config.from_object(mysql_config) # init db db.init_app(application) # make celery function def make_celery(input_app): celery_server = Celery( input_app.import_name, backend=input_app.config['RESULT_BACKEND'], broker=input_app.config['BROKER_URL'], include=['celery_tasks.globals.home', 'celery_tasks.user.tool.semantic_analysis_tasks'] ) celery_server.conf.update(input_app.config) task_base = celery_server.Task class ContextTask(task_base): abstract = True def __call__(self, *args, **kwargs): with input_app.app_context(): return task_base.__call__(self, *args, **kwargs) celery_server.Task = ContextTask return celery_server celery_app = make_celery(application) """ if you don't want to use include params in celery_server, you can use following code, it's same to include. """ # celery_app.autodiscover_tasks(['celery_tasks.user.tool.semantic_analysis_tasks', 'celery_tasks.globals.home'])
""" # -*- coding:utf-8 -*- 用命令对数据库表进行迁移 这是Flask项目进行迁移的命令 python manage.py db init python manage.py db migrate python manage.py db upgrade 如果想回退到上一级迁移文件的话,要用到下面这个命令 python manage.py db downgrade """ # package import from flask_script import Manager from flask_migrate import Migrate, MigrateCommand # import app and celery_app from app import app from celery_tasks.celery_app import celery_app from models.models import * from util.exts import db """ tips: You must define two manager if you want to use celery, because Flask basic app and Celery app is different app This is important ! If you don't create celery_manager, celery task will except a lot of exception, such as SqlAlchemy don't callback """ # define the manager for flask basic app manager = Manager(app) # use Migrate band db and flask basic app migrate = Migrate(app, db) # add migrate command to flask basic app manager manager.add_command('db', MigrateCommand) # define the manager for celery app celery_manager = Manager(celery_app) # use Migrate band db and celery app celery_migrate = Migrate(celery_app, db) # add migrate command to celery basic app manager celery_manager.add_command('db', MigrateCommand) # run flask basic app manager and celery app manager if __name__ == "__main__": # run manager and celery_manager manager.run() celery_manager.run()
# start celery serve
celery -A celery_tasks.celery_app.celery_app worker -l info -P eventlet -c 10
# -*- coding: utf8 -*- # package import from celery.result import AsyncResult from celery_tasks.celery_app import celery_app # create class CeleryResultGet class CeleryResultGet: def __init__(self, task_id, data_type): """ :param task_id: celery task id :param data_type: the type of you want to get (status, result...) """ self.task_id = str(task_id) self.data_type = data_type self.data = None # get celery task data function def get_task_data(self): if self.data_type == 'status': self.data = AsyncResult(id=self.task_id, app=celery_app).status elif self.data_type == 'result': self.data = AsyncResult(id=self.task_id, app=celery_app).result return self.data
目前就遇到这些问题,有新的坑再更新
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。