当前位置:   article > 正文

flask工厂函数中celery的应用_flask create_app 使用celery 循环引用

flask create_app 使用celery 循环引用

flask工厂函数中celery的应用

一定结合自己的项目文件对应的模块来看, 关于项目名下方会有标注,本篇只介绍celery作为异步任务调用功能的调用,定时任务我用的Flask-APScheduler库

先介绍下我的模块划分

apps                		# 应用包
- app						# 单个应用
	- tasks.py				# celery函数放在对应应用下
app.py						# create_app函数放置点
manage.py					# 使用flask-script启动, 启动文件
config.py					# app配置文件
mycelery.py					# celery继承,集成flask上下文等等
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

需要安装的库

Flask-Celery-Helper
celery==3.1.17
  • 1
  • 2

上代码,本篇不介绍celery原理/使用方法, 直接告诉你怎么写

# mycelery.py
from flask_celery import Celery
from flask import has_request_context, make_response, request


class CeleryWithContext(Celery):
    def init_app(self, app):
        super(CeleryWithContext, self).init_app(app)

        task_base = self.Task

        class ContextTask(task_base):

            #: Name of the additional parameter passed to tasks
            #: that contains information about the original Flask request context.
            CONTEXT_ARG_NAME = '_flask_request_context'

            def __call__(self, *args, **kwargs):
                """Execute task code with given arguments."""
                call = lambda: super(ContextTask, self).__call__(*args, **kwargs)

                context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
                if context is None or has_request_context():
                    return call()

                with app.test_request_context(**context):
                    result = call()

                    # process a fake "Response" so that
                    # ``@after_request`` hooks are executed
                    app.process_response(make_response(result or ''))

                return result

            def apply_async(self, args=None, kwargs=None, **rest):
                self._include_request_context(kwargs)
                return super(ContextTask, self).apply_async(args, kwargs, **rest)

            def apply(self, args=None, kwargs=None, **rest):
                self._include_request_context(kwargs)
                return super(ContextTask, self).apply(args, kwargs, **rest)

            def retry(self, args=None, kwargs=None, **rest):
                self._include_request_context(kwargs)
                return super(ContextTask, self).retry(args, kwargs, **rest)

            def _include_request_context(self, kwargs):
                """Includes all the information about current Flask request context
                as an additional argument to the task.
                """
                if not has_request_context():
                    return

                # keys correspond to arguments of :meth:`Flask.test_request_context`
                context = {
                    'path': request.path,
                    'base_url': request.url_root,
                    'method': request.method,
                    'headers': dict(request.headers),
                    'data': request.form
                }
                if '?' in request.url:
                    context['query_string'] = request.url[(request.url.find('?') + 1):]

                kwargs[self.CONTEXT_ARG_NAME] = context

        setattr(ContextTask, 'abstract', True)
        setattr(self, 'Task', ContextTask)


celery = CeleryWithContext()


# app.py
from flask import Flask
from mycelery import celery

def create_app(config_name):
    app = Flask(config_name)
    app.config.from_object(config[config_name])
    # 中间部分是你的其他库的初始化部分,省略
    ...
    celery.init_app(app)
    return app
  
# manage.py
from mycelery import celery
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
apps = os.path.join(BASE_DIR, 'apps')
sys.path.insert(0, apps)
from flask_script import Manager
from app import create_app

app = create_app(ENV)
manager = Manager(app)

if __name__ == '__main__':
    manager.run() 
    
    
# config.py
class Config(object):
  ...
  # 这块注意看 app.tasks app就对应着我的单个应用,tasks就是下面的文件.
  # 文件名也只能是tasks
  CELERY_IMPORTS = ('app.tasks',)
  CELERY_BROKER_URL = 'redis://localhost'
  CELERY_RESULT_BACKEND = 'redis://localhost'
  
  @staticmethod
    def init_app(app):
        pass
  
config = {
  'dev': Config
}


# tasks.py
from mycelery import celery

@celery.task()
def add(a, b):
  # 并不是一定要写add,就是你封装的耗时比较久的任务
  ... do something
  return str(a+b)

# 调用
from app.tasks import add
def func(a,b):
  ...
  add.delay(a,b)
  return
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133

配置好后启动, 命令行输入

# manage 应用启动文件 就是初始化app的文件, celery是manage中的方法
celery -A manage.celery worker -l info
  • 1
  • 2

启动成功界面

参考资料

想知道mycelery文件干了什么事戳它➡️celery中使用flask的上下文

想看celery原理更多应用方式戳它➡️异步神器celery

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/414347
推荐阅读
相关标签
  

闽ICP备14008679号