赞
踩
1、运行过程中经常报错:peewee.OperationalError: Connection already open
Peewee不是为异步框架设计的,必须添加一些代码才能将Peewee与FastAPI一起使用,详情可查看官方文档:使peewee异步兼容PeeweeConnectionState,
from contextvars import ContextVar import peewee DATABASE_NAME = "test.db" db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None} db_state = ContextVar("db_state", default=db_state_default.copy()) class PeeweeConnectionState(peewee._ConnectionState): def __init__(self, **kwargs): super().__setattr__("_state", db_state) super().__init__(**kwargs) def __setattr__(self, name, value): self._state.get()[name] = value def __getattr__(self, name): return self._state.get()[name] # MySQL数据库 db = MySQLDatabase( os.getenv('MYSQL_DATABASE'), user=os.getenv('MYSQL_USERNAME'), host=os.getenv('MYSQL_HOST'), password=os.getenv('MYSQL_PASSWORD'), port=os.getenv('MYSQL_PORT') ) db._state = PeeweeConnectionState()
在官方文档中,所有接口都添加了get_db的依赖:
创建一个依赖项,该依赖项将在请求开始时连接数据库,并在请求结束时断开连接:
async def reset_db_state():
# print(("reset_db_state()"))
db._state._state.set(db_state_default.copy())
db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
db.connect()
yield
finally:
if not db.is_closed():
db.close()
它连接到数据库并将连接数据存储在独立于每个请求的内部变量中
由于数据库连接可能是 I/O 阻塞,因此将使用正常函数def创建此依赖项。
然后,在每个需要访问数据库的路径操作函数中,我们将其添加为依赖项。
但是我们没有使用此依赖项给出的值(它实际上没有给出任何值,因为它有一个空)。因此,我们不会将其添加到路径操作函数中,而是添加到参数中的路径操作装饰器中:
即:
@app.post("/users/show", response_model=schemas.User, dependencies=[Depends(get_db)])
每个接口都添加依赖会有点麻烦,可以添加全局依赖:
debug = bool(os.getenv('DEBUG'))
app = FastAPI(
debug=settings.DEBUG,
title=settings.TITLE if debug else None,
description=settings.DESCRIPTION if debug else None,
docs_url=settings.DOCS_URL if debug else None,
openapi_url=settings.OPENAPI_URL if debug else None,
redoc_url=settings.REDOC_URL if debug else None,
dependencies=[Depends(get_db)]
)
2、部署后出现请求后端接口无响应,一直在加载的情况
类似问题情况
分析觉得是出现了IO线程阻塞问题
tornado是一个异步网络IO非阻塞框架,这意味着涉及到IO阻塞操作,我们都应该以异步的形式去进行。而peewee本身并不是异步的,因此我们还需要引入另外一些库才能更好的契合
peewee与异步操作
FastAPI也是异步框架,所以考虑将peewee换成peewee-async,具体修改及使用参考:peewee:精致小巧的orm,某些场景下可以成为sqlalchemy的一个很好的替代品
为了使用peewee-async,连接数据库完整代码:
import math import os from peewee import _ConnectionState, Model, ModelSelect, SQL, DateTimeField, MySQLDatabase from contextvars import ContextVar # 异步数据库 from peewee_async import Manager, MySQLDatabase as AsyncMySQLDatabase from playhouse.shortcuts import ReconnectMixin # 异步数据库 # 连接池 from peewee_async import PooledMySQLDatabase as AsyncPooledMySQLDatabase from fastapi import Depends db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None} db_state = ContextVar("db_state", default=db_state_default.copy()) async def reset_db_state(): # print(("reset_db_state()")) db._state._state.set(db_state_default.copy()) db._state.reset() def get_db(db_state=Depends(reset_db_state)): try: db.connect() yield finally: if not db.is_closed(): db.close() class PeeweeConnectionState(_ConnectionState): def __init__(self, **kwargs): super().__setattr__("_state", db_state) super().__init__(**kwargs) def __setattr__(self, name, value): self._state.get()[name] = value def __getattr__(self, name): return self._state.get()[name] # # MySQL数据库 # db = MySQLDatabase( # os.getenv('MYSQL_DATABASE'), # user=os.getenv('MYSQL_USERNAME'), # host=os.getenv('MYSQL_HOST'), # password=os.getenv('MYSQL_PASSWORD'), # port=os.getenv('MYSQL_PORT') # ) # 超时后将回收连接。max_connections=8, # 打开的连接数的上限。stale_timeout=300, # peewee 同步、异步、断线重连、连接池 https://www.cnblogs.com/gcxblogs/p/14969019.html # # 断线重连+连接池 class ReconnectAsyncPooledMySQLDatabase(ReconnectMixin, AsyncPooledMySQLDatabase): _instance = None @classmethod def get_db_instance(cls): db_config = { 'host': os.getenv('MYSQL_HOST'), 'port': int(os.getenv('MYSQL_PORT')) if os.getenv('MYSQL_PORT') else os.getenv('MYSQL_PORT'), 'user': os.getenv('MYSQL_USERNAME'), 'password': os.getenv('MYSQL_PASSWORD'), 'database': os.getenv('MYSQL_DATABASE'), } if not cls._instance: cls._instance = cls(**db_config, max_connections=100) return cls._instance db = ReconnectAsyncPooledMySQLDatabase.get_db_instance() # 然后我们需要将db传入到Manager中得到一个async_db async_db = Manager(db) async_db._state = PeeweeConnectionState() db._state = PeeweeConnectionState() # # No need for sync anymore! # db.set_allow_sync() class BaseModel(Model): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) """将事务改成atomic_async""" self.trans = db.atomic_async """添加一个Manager类""" self.object = Manager(db) class Meta: database = db
问题测试也可参考:peewee与异步操作
打开命令控制台,mysql -u root -p
,输入密码,启动mysql交互界面,锁住user表的读写操作,模拟硬盘IO阻塞。
use database_name;
lock table user write;
然后访问操作锁住的表的接口和未被锁住的表的接口,同步状态下,当一个接口阻塞,则其他接口均不能访问,异步操作下一个接口阻塞不会影响其他接口。
最后是timeout的设置,peewee_async连接PooledMySQLDatabase设置timeout
peewee_async官方文档只找到函数
connect_async(loop=None, timeout=None)
Set up async connection on specified event loop or on default event loop.
在指定的事件循环上设置异步连接或 默认事件循环。
或者也可以在FastAPI中间件中处理连接超时的问题:
@app.middleware("http") async def logger_request(request: Request, call_next) -> Response: # https://stackoverflow.com/questions/60098005/fastapi-starlette-get-client-real-ip account = 'login' hs = request.headers token = hs.get("token") host = hs.get("host") if token != None: # 解析token。将id从token中解析出来去。 try: payload = jwt.decode( token, # settings.SECRET_KEY, algorithms=[settings.ALGORITHM] os.getenv('SECRET_KEY'), algorithms=[os.getenv('ALGORITHM')] ) account = payload.get("sub") except jwt.ExpiredSignatureError as e: # TODO token续期 print('登录已经过期') return resp.fail(resp.Unauthorized.set_msg('登录已经过期.')) except Exception as e: return resp.fail(resp.Unauthorized.set_msg('登录失败,请重试。'), detail=str(e)) logger.info( f"访问记录:{request.method} url:{request.url} account:{account} IP:{host} data:{data}") try: response = await asyncio.wait_for(call_next(request),10) return response except asyncio.TimeoutError: print('Timeout error') if not db.is_closed(): db.close() db.close_async() return resp.fail(resp.ServerError.set_msg('请求超时.'))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。