更改sqlite为mysql
- from tortoise import Tortoise
- import asyncio
- async def init():
- user = 'root'
- password = '123456'
- db_name = 'test'
- await Tortoise.init(
- #指定mysql信息
- db_url=f'mysql://{user}:{password}@127.0.0.1:3306/{db_name}',
- #指定models
- modules={'models': ['sanic_bp.models']}
- )
- #按照模型生成表
- # await Tortoise.generate_schemas()
- asyncio.run(init())
在sanic中使用:
- from sanic import Sanic
- from tortoise import Tortoise
-
- app = Sanic(__name__)
-
- @app.listener('after_server_start')
- async def notify_server_started(app, loop):
- print('sanic sanic服务启动后建立mysql连接')
- #实例化mysql连接
- await Tortoise.init(
- db_url='mysql://root:123456@127.0.0.1:3306/test?maxsize=50&minsize=3',
- modules={'models': ['sanic_bp.models']}
- )
基础使用
- from tortoise.contrib.sanic import register_tortoise
-
- register_tortoise(
- app, db_url="sqlite://:memory:", modules={"models": ["models"]}, generate_schemas=True
- )
sanic 和 fastapi中使用
tortoise-orm 连接多个库
- async def run():
- await Tortoise.init(
- {
- #这里面指定两个不同的数据库,虽然它填写的都是一样的
- "connections": {
- "first": {
- "engine": "tortoise.backends.sqlite",
- "credentials": {"file_path": "example.sqlite3"},
- },
- "second": {
- "engine": "tortoise.backends.sqlite",
- "credentials": {"file_path": "example1.sqlite3"},
- },
- },
- #可以指定多个不同的models, 嗯,它指定的也是一样的
- "apps": {
- "tournaments": {"models": ["__main__"], "default_connection": "first"},
- "events": {"models": ["__main__"], "default_connection": "second"},
- },
- }
- )
- #await Tortoise.generate_schemas()
- #选取数据库
- client = Tortoise.get_connection("first")
- #选取 models
- second_client = Tortoise.get_connection("second")
- #然后这下面就可以进行orm操作了
增删改查
- from typing import List
- from fastapi import APIRouter, HTTPException
- from tortoise.contrib.fastapi import HTTPNotFoundError
-
- from schemas.MaintainedData import MaintainedData_Pydantic, MaintainedDataIn_Pydantic
- from models.MaintainedData import MaintainedData
- from schemas.basic import Response404, Response200
-
- data = APIRouter(tags=['数据模块'])
-
-
- @data.get("/", summary="数据列表", response_model=List[MaintainedData_Pydantic])
- async def data_list(order:str,limit: int = 10, page: int = 1):
- # limit 是显示的条数 ,page是页数
-
- skip = (page - 1) * limit
- # select * from movie limit offset,limit
- # select * from movie limit 0,10
- return await MaintainedData_Pydantic.from_queryset(MaintainedData.all().offset(skip).limit(limit))
-
-
- @data.post("/data", summary="新增数据", )
- async def add_data(md_data: MaintainedData_Pydantic):
- movie_obj = await MaintainedData.create(**md_data.dict(exclude_unset=True))
- # movie_obj = await Movie.create(name="",year="",xx="")
- return await MaintainedData_Pydantic.from_tortoise_orm(movie_obj)
-
-
- @data.put("/data/{data_id}", summary="编辑数据", responses={404: {"model": HTTPNotFoundError}})
- async def update_data(site_id: str, md_data: MaintainedDataIn_Pydantic):
- updated_count = await MaintainedData.filter(site_id=site_id).update(**md_data.dict(exclude_unset=True))
- print(updated_count)
- if not updated_count:
- return Response404(msg=f"data {site_id} not found")
- return Response200(data=await MaintainedData_Pydantic.from_queryset_single(MaintainedData.get(site_id=site_id)))
-
-
- @data.get("/data/{data_id}", summary="查找数据", response_model=MaintainedData_Pydantic,
- responses={404: {"model": HTTPNotFoundError}})
- async def get_user(site_id: str):
- return await MaintainedData_Pydantic.from_queryset_single(MaintainedData.get(site_id=site_id))
-
-
- @data.delete("/data/{data_id}", summary="删除数据", responses={404: {"model": HTTPNotFoundError}})
- async def del_movie(site_id: str):
- deleted_count = await MaintainedData.filter(site_id=site_id).delete()
- if not deleted_count:
- raise HTTPException(status_code=404, detail=f"data {site_id} not found")
- return HTTPException(status_code=200, detail=f"Deleted Movie {site_id}")
models.py
- from datetime import datetime
- from typing import Optional, Iterable
-
- from tortoise import fields, models, BaseDBAsyncClient
-
-
- class MaintainedData(models.Model):
- """
- The User model
- """
- id = fields.SmallIntField(pk=True, index=True, description="ID", )
- site_id = fields.CharField(max_length=20, unique=True, description="网站ID")
- site_name = fields.CharField(max_length=128, null=False, description="网站名称")
- site_type = fields.CharField(max_length=16, null=False, description="网站类型")
- description = fields.CharField(max_length=255, default="",description="网站备注", )
- site_path_name = fields.CharField(max_length=128, null=True, description="目录名称")
- site_path_url = fields.CharField(max_length=128, null=True, description="目录链接")
- status = fields.CharField(max_length=128, null=True, description="脚本状态")
- run_computer = fields.CharField(max_length=255, null=True, description="运行电脑")
- run_directory = fields.CharField(max_length=255, null=True, description="运行目录")
- crawling_time :Optional[datetime]= fields.DatetimeField(auto_add=True, description="运行时间")
- err_message = fields.CharField(max_length=255, null=True, description="错误信息")
order by
db_Secc = MaintainedData.all().order_by("crawling_time")
Eum 枚举
- class order_byEnum(str, Enum):
- pear = ""
- banana = '-'
-
-
- @data.get("/", summary="数据列表", response_model=List[MaintainedData_Pydantic])
- async def data_list(
- order_by: order_byEnum = "",
- limit: int = 10,
- page: int = 1):
- # limit 是显示的条数 ,page是页数
- skip = (page - 1) * limit
- return await MaintainedData_Pydantic.from_queryset(MaintainedData.all().offset(
- skip).limit(
- limit).order_by(
- f"{order_by}crawling_time"))