当前位置:   article > 正文

基于FastAPI和peewee的项目踩坑_fastapi peewee

fastapi peewee

FastAPI结合peewee_async的项目踩坑

1、运行过程中经常报错:peewee.OperationalError: Connection already open

Peewee不是为异步框架设计的,必须添加一些代码才能将Peewee与FastAPI一起使用,详情可查看官方文档:使peewee异步兼容PeeweeConnectionState

from contextvars import ContextVar
import peewee

DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())

class PeeweeConnectionState(peewee._ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]

# MySQL数据库
db = MySQLDatabase(
    os.getenv('MYSQL_DATABASE'),
    user=os.getenv('MYSQL_USERNAME'),
    host=os.getenv('MYSQL_HOST'),
    password=os.getenv('MYSQL_PASSWORD'),
    port=os.getenv('MYSQL_PORT')
)

db._state = PeeweeConnectionState()


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

在官方文档中,所有接口都添加了get_db的依赖:
在这里插入图片描述

创建依赖项

创建一个依赖项,该依赖项将在请求开始时连接数据库,并在请求结束时断开连接:

async def reset_db_state():
    # print(("reset_db_state()"))
    db._state._state.set(db_state_default.copy())
    db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
    try:
        db.connect()
        yield
    finally:
        if not db.is_closed():
            db.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

它连接到数据库并将连接数据存储在独立于每个请求的内部变量中
由于数据库连接可能是 I/O 阻塞,因此将使用正常函数def创建此依赖项。
然后,在每个需要访问数据库的路径操作函数中,我们将其添加为依赖项。

但是我们没有使用此依赖项给出的值(它实际上没有给出任何值,因为它有一个空)。因此,我们不会将其添加到路径操作函数中,而是添加到参数中的路径操作装饰器中:
即:
@app.post("/users/show", response_model=schemas.User, dependencies=[Depends(get_db)])
每个接口都添加依赖会有点麻烦,可以添加全局依赖:

	debug = bool(os.getenv('DEBUG'))
    app = FastAPI(
        debug=settings.DEBUG,
        title=settings.TITLE if debug else None,
        description=settings.DESCRIPTION if debug else None,
        docs_url=settings.DOCS_URL if debug else None,
        openapi_url=settings.OPENAPI_URL if debug else None,
        redoc_url=settings.REDOC_URL if debug else None,
        dependencies=[Depends(get_db)]
    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2、部署后出现请求后端接口无响应,一直在加载的情况

类似问题情况
分析觉得是出现了IO线程阻塞问题

tornado是一个异步网络IO非阻塞框架,这意味着涉及到IO阻塞操作,我们都应该以异步的形式去进行。而peewee本身并不是异步的,因此我们还需要引入另外一些库才能更好的契合
peewee与异步操作

FastAPI也是异步框架,所以考虑将peewee换成peewee-async,具体修改及使用参考:peewee:精致小巧的orm,某些场景下可以成为sqlalchemy的一个很好的替代品
为了使用peewee-async,连接数据库完整代码:

import math
import os

from peewee import _ConnectionState, Model, ModelSelect, SQL, DateTimeField, MySQLDatabase
from contextvars import ContextVar

# 异步数据库
from peewee_async import Manager, MySQLDatabase as AsyncMySQLDatabase
from playhouse.shortcuts import ReconnectMixin
# 异步数据库 # 连接池
from peewee_async import PooledMySQLDatabase as AsyncPooledMySQLDatabase
from fastapi import Depends

db_state_default = {"closed": None, "conn": None,
                    "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())

async def reset_db_state():
    # print(("reset_db_state()"))
    db._state._state.set(db_state_default.copy())
    db._state.reset()
    
def get_db(db_state=Depends(reset_db_state)):
    try:
        db.connect()
        yield
    finally:
        if not db.is_closed():
            db.close()


class PeeweeConnectionState(_ConnectionState):
    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]


# # MySQL数据库
# db = MySQLDatabase(
#     os.getenv('MYSQL_DATABASE'),
#     user=os.getenv('MYSQL_USERNAME'),
#     host=os.getenv('MYSQL_HOST'),
#     password=os.getenv('MYSQL_PASSWORD'),
#     port=os.getenv('MYSQL_PORT')
# )

# 超时后将回收连接。max_connections=8,
# 打开的连接数的上限。stale_timeout=300,


# peewee 同步、异步、断线重连、连接池 https://www.cnblogs.com/gcxblogs/p/14969019.html
# # 断线重连+连接池
class ReconnectAsyncPooledMySQLDatabase(ReconnectMixin, AsyncPooledMySQLDatabase):
    _instance = None
    @classmethod
    def get_db_instance(cls):
        db_config = {
            'host': os.getenv('MYSQL_HOST'),
            'port': int(os.getenv('MYSQL_PORT')) if os.getenv('MYSQL_PORT') else os.getenv('MYSQL_PORT'),
            'user': os.getenv('MYSQL_USERNAME'),
            'password': os.getenv('MYSQL_PASSWORD'),
            'database': os.getenv('MYSQL_DATABASE'),
        }
        if not cls._instance:
            cls._instance = cls(**db_config, max_connections=100)
        return cls._instance

db = ReconnectAsyncPooledMySQLDatabase.get_db_instance()

# 然后我们需要将db传入到Manager中得到一个async_db
async_db = Manager(db)
async_db._state = PeeweeConnectionState()
db._state = PeeweeConnectionState()


# # No need for sync anymore!
# db.set_allow_sync()

 class BaseModel(Model):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        """将事务改成atomic_async"""
        self.trans = db.atomic_async
        """添加一个Manager类"""
        self.object = Manager(db)

    class Meta:
        database = db
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94

问题测试也可参考:peewee与异步操作

打开命令控制台,mysql -u root -p,输入密码,启动mysql交互界面,锁住user表的读写操作,模拟硬盘IO阻塞。

use database_name;
lock table user write;
  • 1
  • 2

然后访问操作锁住的表的接口和未被锁住的表的接口,同步状态下,当一个接口阻塞,则其他接口均不能访问,异步操作下一个接口阻塞不会影响其他接口。

最后是timeout的设置,peewee_async连接PooledMySQLDatabase设置timeout
peewee_async官方文档只找到函数

connect_async(loop=None, timeout=None)
Set up async connection on specified event loop or on default event loop.
在指定的事件循环上设置异步连接或 默认事件循环。

或者也可以在FastAPI中间件中处理连接超时的问题:

    @app.middleware("http")
    async def logger_request(request: Request, call_next) -> Response:
        # https://stackoverflow.com/questions/60098005/fastapi-starlette-get-client-real-ip
        account = 'login'
        hs = request.headers
        token = hs.get("token")
        host = hs.get("host")
        if token != None:
            # 解析token。将id从token中解析出来去。
            try:
                payload = jwt.decode(
                    token,
                    # settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
                    os.getenv('SECRET_KEY'), algorithms=[os.getenv('ALGORITHM')]
                )
                account = payload.get("sub")

            except jwt.ExpiredSignatureError as e:
                # TODO token续期
                print('登录已经过期')
                return resp.fail(resp.Unauthorized.set_msg('登录已经过期.'))
            except Exception as e:
                return resp.fail(resp.Unauthorized.set_msg('登录失败,请重试。'), detail=str(e))
        logger.info(
            f"访问记录:{request.method} url:{request.url} account:{account} IP:{host} data:{data}")
        try:
            response = await asyncio.wait_for(call_next(request),10)
            return response
        except asyncio.TimeoutError:
            print('Timeout error')
            if not db.is_closed():
                db.close()
                db.close_async()
            return resp.fail(resp.ServerError.set_msg('请求超时.'))
            
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/172718
推荐阅读
  

闽ICP备14008679号