当前位置:   article > 正文

Fastapi+docker+tortoise-orm+celery

Fastapi+docker+tortoise-orm+celery

因为项目是后期引入celery,所以导致构建docker的时候只有fastapi的项目,celery的重启比较麻烦
1.docker安装celery

pip install celery
  • 1

安装celery的时候注意python版本与celery版本的适配,有些celery的版本不支持python的版本,具体的版本请看celery官网里面的版本信息

2.在工程目录中创建celery的启动文件,这里我创建的是tasks.py 文件

from celery import Celery


celery_app = Celery("worker",
                    broker="redis://:frasergen2022@192.168.2.189:26379/0",
                    backend="redis://:frasergen2022@192.168.2.189:26379/0",
                    include=["apps.tools.my_celery.__init__"]
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这是celery的启动文件,里面最好不要引入你的项目里面的变量,如果引入你项目里的变了,后期去定义后台任务的时候,引入celery_app会陷入到循环引用的深坑

3.测试celery

celery -A tasks.celery_app worker --loglevel=info
  • 1

在这里插入图片描述
4.可以后台启动celery

celery multi start w1 -A tasks.celery_app -l info --logfile=celerylog.log
  • 1

在这里插入图片描述
这里不用启动,后续会用看门狗(watchdog)监控任务文件,如果文件修改,会重启celery

5.安装看门狗(watchdog)

pip install watchdog
  • 1

参考链接:https://whoosy.cn/2019/08/01/Celery/celery使用/
后台启动watchdog

nohup watchmedo auto-restart --directory=/data/cloud_platform/apps/tools/my_celery/ --pattern=*.py --recursive -- celery -A tasks.celery_app worker --loglevel=info --logfile=celerylog.log > watchmedo.log 2> watchmedo.elog &
  • 1

–directory : 监控路径
–pattern: 监控文件后缀
在这里插入图片描述
6.配置celery后台任务
1.配置tortoise-orm数据库连接

import asyncio

from tortoise import Tortoise
from celery.signals import worker_process_init, worker_process_shutdown

from apps.models import User
from tasks import celery_app
from apps.tools.db_config import ORM_LINK_CONF


async def init_db():
    await Tortoise.init(
        config=ORM_LINK_CONF
    )


@worker_process_init.connect
def on_worker_init(*args, **kwargs):
    print('初始化数据库')
    from celery._state import _task_stack
    if _task_stack.top is not None:
        loop = _task_stack.top.request.loop
    else:
        loop = asyncio.get_event_loop()
    loop.run_until_complete(init_db())


@worker_process_shutdown.connect
def on_worker_shutdown(*args, **kwargs):
    print('关闭数据库')
    from celery._state import _task_stack
    if _task_stack.top is not None:
        loop = _task_stack.top.request.loop
    else:
        loop = asyncio.get_event_loop()
    loop.run_until_complete(Tortoise.close_connections())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

2.配置后台任务

@celery_app.task(name="get_user_task")
def get_user_task(*args, **kwargs):
    asyncio.get_event_loop().run_until_complete(_get_user_task())


async def _get_user_task():
    user = await User.filter().all()
    for item in user:
        print(item.nickname, item.username)
        print(f"{item.phone=}")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

代码上传到docker后,celery会重启
在这里插入图片描述
7.测试接口+后台异步任务

@router.get("/get_user_info", summary="测试后台任务获取用户信息")
async def get_user_info():
    get_user_task.delay()
    return res()
  • 1
  • 2
  • 3
  • 4

结果:
在这里插入图片描述

注意:
1.我这里的数据库用的是mysql数据库,他有一个断联机制,就是连接长时间没有请求会将这个链接断开,如果你的celery长时间没有任务,会导致mysql链接断开链接,报错如下:
在这里插入图片描述
我用的是 tortoise-orm 的版本是0.17.7,这个版本的orm没有保活机制,所以要隔一段时间去定时请求数据库,可以用celery的定时任务处理,但是我没这边的小项目的后台任务只是用来下载大数据,没有定时任务需求,所以换别的方法去处理,有几种方法:
1.设置数据库的超时时间:
show GLOBAL variables like “wait_timeout”
set wait_timeout = 1814400;
2.运用tortoise-orm 的保活机制,具体的情况请参考tortoise-orm官方文档
3.运用celery定时任务,定时想mysql发送请求
4.运用 apscheduler的定时任务去调用celery的任务(我这里用的这种方式)

  1. celery代码
async def func_get_user_task():
    user = await User.filter().limit(1).all()
    for item in user:
        print(item.nickname, item.username)
        print(f"{item.phone=}")


@celery_app.task(name="get_user_task")
def get_user_task(*args, **kwargs):
    asyncio.get_event_loop().run_until_complete(func_get_user_task())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. apscheduler代码
async def tortoise_orm_survival():
    """celery 的 tortoise_orm 保活机制,用于保持celery定时对mysql的请求"""
    get_user_task.delay()
  • 1
  • 2
  • 3
  1. apscheduler启动代码
import os
from uvicorn import run
from starlette.responses import HTMLResponse
from aioredis import create_redis_pool, Redis
from apscheduler.schedulers.asyncio import AsyncIOScheduler

from apps import create_app
from apps.configs import config
from apps.tools.scheduler_tools import *

app = create_app()


async def get_redis_pool() -> Redis:
    redis = await create_redis_pool(
        f"redis://:{config.REDIS_PASSWORD}@{config.REDIS_HOST}:26379/{config.REDIS_DB}?encoding=utf-8")
    return redis


@app.on_event("startup")
async def startup_event():
    # redis
    app.state.redis = await get_redis_pool()
    scheduler = AsyncIOScheduler()
    # 每 2分钟运行一次, 下载文件
    scheduler.add_job(down_file_tasks, "interval", seconds=60 * 2, max_instances=5, misfire_grace_time=3600)
    # 每一个小时运行一次,调用celery异步任务,保持mysqlde连接
    scheduler.add_job(tortoise_orm_survival, "interval", seconds=60 * 60, max_instances=5, misfire_grace_time=3600)
    scheduler.start()


@app.on_event("shutdown")
async def shutdown_event():
    app.state.redis.close()
    await app.state.redis.wait_closed()


@app.get("/", tags=['index'])
async def index():
    """首页 """
    data = open('static/index.html', 'r', encoding='utf8').read()
    return HTMLResponse(content=data)


if __name__ == "__main__":
    print("启动")
    print("文档地址:http://127.0.0.1/docs")
    run("main:app", host="0.0.0.0", port=80, debug=True, reload=True, lifespan="on", workers=4)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/584018
推荐阅读
相关标签
  

闽ICP备14008679号