赞
踩
从本教程开篇,我们就说FastAPI这个web框架是异步框架,那它到底是如何体现异步的呢?
想要学习使如何使用FastAPI的异步功能,那就必须要先了解什么是异步,什么是asyncio、async/await
【基础补充】
关于异步编程、协程实行的异步编程的基础知识
【重要结论】
本质上,实现异步的方式有三种:多进程、多线程和协程,FastAPI实现异步使用了多线程(线程池)和协程的方式。
def
定义路径函数,FastAPI内部帮我们使用多线程(线程池)实现异步并发async def
定义路径函数,FastAPI内部使用协程的方式实现异步并发。在FastAPI中可以使用普通函数定义的接口,也可以使用async def 实行定义的接口。但是使用是需要注意,否则会导致程序极慢。
示例1:普通函数形式定义的接口,会按照多线程(线程池)的方式异步执行
time.sleep(5)
模拟耗时5simport time
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def index():
time.sleep(5)
return "index"
示例2:async def 形式的定义的接口,使用单线程协程的形式异步执行
因为使用了协程的方式,当有两个客户端发请求时,单线程内代码块级别的切换,最终5秒后两个客户端都会得到响应。
注意:协程时不能使用同步阻塞的time模块,需要使用asyncio.sleep()。又因为它是协程对象,所以需要使用await
才能被执行。
另外,await
必须使用在async
定义的函数内,否则报错。
同时注意,在async def 内部不能使用同步模块,否则就会编程单线程同步执行的方式。
import asyncio
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def index():
await asyncio.sleep(5)
return "index"
示例3:在协程函数内错误使用同步模块
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def index():
time.sleep(5) # 不能在async def 函数内使用同步阻塞模块。
return "index"
【注意】
FastAPI非常灵活,支持你写普通的函数,也支持你写async def 形式的函数,那到底该如何选择?
下面有几个原则,可以帮助大家做选择:
def
@app.get('/')
def results():
results = some_library()
return results
await
调用时,此时使用 async def
@app.get('/')
async def read_results():
results = await some_library()
return results
await
await
, 该函数必须使用 async def
定义@app.get('/')
async def read_results():
results1 = some_library1()
results2 = await some_library()
return results
当你对协程/async/await/asyncio这些概念不清楚的时候,就使用普通函数。
你需要使用的一个第三方库是同步库,但是你需要它支持协程异步,那需要自己使用线程池的方式运行,参考视频:
传统python代码中发请求(如爬虫)我们一般使用requests模块,但是这个模块是同步阻塞的。
所以在异步asyncio体系中,我们不再使用requests模块,一般会使用异步的 aiohttp
模块。(httpx)
pip3 install aiohttp
示例1:基本使用
import asyncio import aiohttp # 协程函数 async def aiohttp_demo(): # 获取一个连接session async with aiohttp.ClientSession() as session: # 基于连接发送一个get请求并获取像一个response async with session.get('http://www.baidu.com') as response: # 从response中获取响应的各种结果 # 因为基于上下文管理器,所以出自动关闭连接 print("Status:", response.status) print("Content-type:", response.headers['content-type']) html = await response.text() print("Body:", html[:15], "...") # 获取事件循环,在事件循环中执行协程函数 # loop = asyncio.get_event_loop() # loop.run_until_complete(aiohttp_demo()) # 上面两行代码的简写,Python3.7以后的版本才可以使用 asyncio.run(aiohttp_demo())
示例2:在fastapi中使用
import aiohttp from fastapi import FastAPI app = FastAPI(title="使用aiohttp") @app.get("/") async def baidu_index(): async with aiohttp.ClientSession() as session: async with session.get('http://www.baidu.com') as response: return { "status": response.status, "content-type": response.headers['content-type'], "body": await response.text() }
import aiohttp from fastapi import FastAPI app = FastAPI(title="使用aiohttp") async def baidu_html(): async with aiohttp.ClientSession() as session: async with session.get('http://www.baidu.com') as response: return { "status": response.status, "content-type": response.headers['content-type'], "body": await response.text() } @app.get("/") async def index(): return await baidu_html() # 需要使用await 才能执行baidu_html 这个协程函数
在python中操作mysql我们通常使用pymysql作为数据库驱动,但是在异步的世界中我们使用aiomysql当驱动。
官网:https://aiomysql.readthedocs.io/en/latest/index.html
aiomysql
pip3 install aiomysql
示例1:aiomysql简单使用
import aiomysql from aiomysql.cursors import DictCursor from fastapi import FastAPI app = FastAPI(title="使用aiomysql") async def aiomysql_demo(): # 获取连接对象 conn = await aiomysql.connect( host="127.0.0.1", port=3306, user="root", password="12345", db="db", cursorclass=DictCursor # 返回字典格式的数据 ) # 创建游标 cur = await conn.cursor() # 执行SQL await cur.execute("SELECT * from users;") # 获取SQL结果 result = await cur.fetchall() # 关闭CURSOR await cur.close() # 关闭连接 conn.close() return result @app.get("/") async def index(): return await aiomysql_demo()
在异步世界中操作数据库,比如MySQL,我们需要只用 aiomysql,且需要自己手写SQL语句。
使用其他类型的数据库,比如PostgreSQL,则需要基于asyncpg或aiopg。
那样的就有一个问题,当我们的应用需要换一个数据库时,就需要调整基础代码,使用不灵活。
此时出现了一个工具,它封装了不同类型的数据库,我们只需要在使用它提供的接口操作数据库就行了,而不用关心底层的数据库驱动,它就是encode出品的 databases
。
官网简介
Databases gives you simple asyncio support for a range of databases.
It allows you to make queries using the powerful SQLAlchemy Core expression language, and provides support for PostgreSQL, MySQL, and SQLite.
Databases is suitable for integrating against any async Web framework, such as Starlette, Sanic, Responder, Quart, aiohttp, Tornado, or FastAPI.
Documentation: https://www.encode.io/databases/
Requirements: Python 3.7+
下载: pip install databases
示例:在fastapi中使用databases操作MySQL(依赖aiomysql)
from fastapi import FastAPI from databases import Database app = FastAPI(title="使用databases") async def databases_demo(): # 实例化一个db连接并建立连接 database = Database('mysql://root:12345@localhost:3306/db') await database.connect() # Run a database query. query = "SELECT * FROM users" rows = await database.fetch_all(query=query) return rows @app.get("/") async def index(): return await databases_demo()
前面在第8章,我们给大家介绍了在同步代码中,想要使用ORM操作数据,使用了SQLAlchemy,
同样的在基于协程的异步代码中,操作数据库时也可以使用ORM,但此时就不能在使用SQLAlchemy(因为它不支持异步)
asyncio世界中,我们也有可以选择的ORM,比如: tortoise-orm 翻译过来就乌龟ORM。
乌龟ORM简介
官网:https://tortoise-orm.readthedocs.io/en/latest/index.html
Tortoise ORM is an easy-to-use asyncio
ORM (Object Relational Mapper) inspired by Django.
Tortoise ORM was build with relations in mind and admiration for the excellent and popular Django ORM.
Tortoise ORM is supported on CPython >= 3.7 for SQLite, MySQL and PostgreSQL.
下载安装:pip3 install tortoise-orm
示例:fastapi简单集成乌龟ORM
# main.py from fastapi import FastAPI from tortoise import fields from tortoise.models import Model from tortoise.contrib.fastapi import register_tortoise app = FastAPI(title="使用tortoise orm") # 定义模型表 class User(Model): id = fields.IntField(pk=True) username = fields.CharField(max_length=255) password = fields.CharField(max_length=255) email = fields.CharField(max_length=255) class Meta: table = "users" # 表示这个表对应数据库中的表名 # 使用register_tortoise 注册数据库信息 register_tortoise( app, db_url="mysql://root:12345@127.0.0.1:3306/db", modules={"models": ["main"]}, # 指定模型表所在的文本,"main" 表示mian.py中定义了User模型表 ) @app.get("/") async def index(): user = await User.filter(username="liuxu").first() user.email = "1111" await user.save() # 常用的CRUD方法 # fake_user = await User.create(username="111", password="111", email="111") # await User.filter(id=fake_user.id).update(username="Updated name") # await User.filter(id=1).delete() return user
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。