当前位置:   article > 正文

使用Python国产API框架开发REST接口_restapi接口教程

restapi接口教程

概述

本篇文档配套有完整的视频教程和源码,大家感兴趣的可以留言或者私信我。

学习目标:

  • 掌握REST风格的接口开发技能
  • 实现用户管理系统的后端接口
  • 掌握RestClient工具的使用技巧

目录:

01 概述
02 部署RestClient工具
03 发送GET请求
04 发送POST请求
05 发送PUT请求
06 发送DELETE请求
07 发送PATCH请求
08 Docker部署MySQL8
09 创建数据库和表
10 实现新增用户接口
11 实现查询所有用户接口
12 实现根据ID查询用户接口
13 实现根据ID修改用户接口
14 实现根据ID删除用户接口
15 总结
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

部署RestClient工具

第一步:加载镜像

docker load -i .\data\rest_client_v1.tar
  • 1

第二步:启动镜像

docker run -d --name rest_client --restart=always -p 10001:80 rest
  • 1

发送GET请求

第一步:安装zdppy_api框架

pip install .\data\zdppy_api-0.1.0.tar.gz
pip install uvicorn
  • 1
  • 2

第二步:编写接口

from api import resp, Api, middleware

app = Api(
    routes=[resp.json_route("/user", [{"id": 1, "name": "张三"}])],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

第三步:运行服务,使用RestClient发送请求

{
  "method":"get",
  "url": "http://localhost:8000/user"
}
  • 1
  • 2
  • 3
  • 4

发送POST请求

第一步:编写接口

from api import req, resp, Api, middleware


async def post_method(r):
    """新增用户"""
    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    # 响应
    return resp.success(user)


# 用户相关的路由
user_routes = [
    resp.post("/user", post_method)
]

app = Api(
    routes=user_routes,
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

第二步:使用RestClient进行测试

{
  "method":"post",
  "url": "http://localhost:8000/user",
  "data":{
    "name":"张三"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

发送PUT请求

第一步:编写接口

from api import req, resp, Api, middleware


async def add_user(r):
    user = await req.get_json(r)
    print("....", user)
    return resp.success(user)


async def update_user(r):
    uid = req.get_path(r, "uid")
    user = await req.get_json(r)
    user["id"] = uid
    return resp.success(user)


app = Api(
    routes=[
        resp.post("/user", add_user),
        resp.put("/user/{uid}", update_user),
    ],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

第二步:使用RestClient进行测试

{
  "method":"put",
  "url": "http://localhost:8000/user/1",
  "data":{
    "name":"张三"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

发送DELETE请求

第一步:编写接口

from api import req, resp, Api, middleware


async def add_user(r):
    user = await req.get_json(r)
    print("....", user)
    return resp.success(user)


async def update_user(r):
    uid = req.get_path(r, "uid")
    user = await req.get_json(r)
    user["id"] = uid
    return resp.success(user)


async def delete_user(r):
    uid = req.get_path(r, "uid")
    return resp.success({"id": uid})


app = Api(
    routes=[
        resp.post("/user", add_user),
        resp.put("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
    ],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

第二步:发送请求

{
  "method":"delete",
  "url": "http://localhost:8000/user/1"
}
  • 1
  • 2
  • 3
  • 4

发送PATCH请求

第一步:编写接口

from api import req, resp, Api, middleware


async def add_user(r):
    user = await req.get_json(r)
    print("....", user)
    return resp.success(user)


async def update_user(r):
    uid = req.get_path(r, "uid")
    user = await req.get_json(r)
    user["id"] = uid
    return resp.success(user)


async def delete_user(r):
    uid = req.get_path(r, "uid")
    return resp.success({"id": uid})


app = Api(
    routes=[
        resp.post("/user", add_user),
        resp.patch("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
    ],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

第二步:发送请求

{
  "method":"patch",
  "url": "http://localhost:8000/user/1",
  "data": {
    "name": "张三"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Docker部署MySQL8

拉取镜像:

docker pull mysql:8.0.25
  • 1

创建容器:

docker run --name mysql -p 3306:3306  --restart=always -e MYSQL_ROOT_PASSWORD=zhangdapeng520 -d mysql:8.0.25
  • 1

配置信息如下:

  • 主机:本机ip
  • 端口:3306
  • 账号:root
  • 密码:zhangdapeng520

创建数据库和表

from mysql.db_object import Database

db = Database(password="zhangdapeng520")

# 创建数据库
db.add_database_force("user")
db = Database(password="zhangdapeng520", database="user")

# 创建表
user_table_sql = "create table user(id int primary key auto_increment, name varchar(36))"
db.execute(user_table_sql)

# 查看所有的表
print(db.get_all_table())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

实现新增用户接口

from api import req, resp, Api, middleware
from mysql.db_object import Database

# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]


async def add_user(r):
    """新增用户"""
    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    db.add(table, columns, [user.get("name")])
    # 响应
    return resp.success(user)


async def update_user(r):
    """修改用户"""
    uid = req.get_path(r, "uid")

    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    user["id"] = uid

    # 响应
    return resp.success(user)


async def delete_user(r):
    """删除用户"""
    uid = req.get_path(r, "uid")

    # 响应
    return resp.success({"uid": uid})


app = Api(
    routes=[
        resp.post("/user", add_user),
        resp.patch("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
    ],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

实现查询所有用户接口

第一步:定义接口方法

async def get_user(r):
    """获取用户"""
    users = db.get_all(table)
    return resp.success(users)
  • 1
  • 2
  • 3
  • 4

第二步:注册接口路由

app = Api(
    routes=[
        resp.get("/user", get_user),
        resp.post("/user", add_user),
        resp.patch("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
    ],
    middleware=[middleware.cors()]
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

第三步:启动服务,并使用RestClient进行测试

{
  "method":"get",
  "url": "http://localhost:8000/user"
}
  • 1
  • 2
  • 3
  • 4

完整示例代码:

from api import req, resp, Api, middleware
from mysql.db_object import Database

# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]


async def add_user(r):
    """新增用户"""
    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    db.add(table, columns, [user.get("name")])
    # 响应
    return resp.success(user)


async def get_user(r):
    """获取用户"""
    users = db.get_all(table)
    return resp.success(users)


async def update_user(r):
    """修改用户"""
    uid = req.get_path(r, "uid")

    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    user["id"] = uid

    # 响应
    return resp.success(user)


async def delete_user(r):
    """删除用户"""
    uid = req.get_path(r, "uid")

    # 响应
    return resp.success({"uid": uid})


app = Api(
    routes=[
        resp.get("/user", get_user),
        resp.post("/user", add_user),
        resp.patch("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
    ],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

实现根据ID查询用户接口

第一步:编写根据ID获取用户的接口

async def get_user_by_id(r):
    """根据id查询用户"""
    uid = req.get_path(r, "uid")
    user = db.get_by_id(table, uid)
    return resp.success(user)
  • 1
  • 2
  • 3
  • 4
  • 5

第二步:注册接口

app = Api(
    routes=[
        resp.get("/user", get_user),
        resp.post("/user", add_user),
        resp.patch("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
        resp.get("/user/{uid}", get_user_by_id),
    ],
    middleware=[middleware.cors()]
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

第三步:使用RestClient测试接口

{
  "method":"get",
  "url": "http://localhost:8000/user/3"
}
  • 1
  • 2
  • 3
  • 4

完整示例代码:

from api import req, resp, Api, middleware
from mysql.db_object import Database

# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]


async def add_user(r):
    """新增用户"""
    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    db.add(table, columns, [user.get("name")])
    # 响应
    return resp.success(user)


async def get_user(r):
    """获取用户"""
    users = db.get_all(table)
    return resp.success(users)


async def update_user(r):
    """修改用户"""
    uid = req.get_path(r, "uid")

    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    user["id"] = uid

    # 响应
    return resp.success(user)


async def delete_user(r):
    """删除用户"""
    uid = req.get_path(r, "uid")

    # 响应
    return resp.success({"uid": uid})


async def get_user_by_id(r):
    """根据id查询用户"""
    uid = req.get_path(r, "uid")
    user = db.get_by_id(table, uid)
    return resp.success(user)


app = Api(
    routes=[
        resp.get("/user", get_user),
        resp.post("/user", add_user),
        resp.patch("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
        resp.get("/user/{uid}", get_user_by_id),
    ],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

实现根据ID修改用户接口

第一步:编写修改用户接口

async def update_user(r):
    """修改用户"""
    uid = req.get_path(r, "uid")
    user = await req.get_json(r)
    user["id"] = uid
    db.update(table, uid, columns, [user.get("name")])
    return resp.success(user)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

第二步:注册接口

app = Api(
    routes=[
        resp.get("/user", get_user),
        resp.post("/user", add_user),
        resp.put("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
        resp.get("/user/{uid}", get_user_by_id),
    ],
    middleware=[middleware.cors()]
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

第三步:使用RestClient测试接口

{
  "method":"put",
  "url": "http://localhost:8000/user/3",
  "data": {
    "name": "王五333"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

第四步:校验数据是否修改成功

{
  "method":"get",
  "url": "http://localhost:8000/user/3"
}
  • 1
  • 2
  • 3
  • 4

完整示例代码:

from api import req, resp, Api, middleware
from mysql.db_object import Database

# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]


async def add_user(r):
    """新增用户"""
    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    db.add(table, columns, [user.get("name")])
    # 响应
    return resp.success(user)


async def get_user(r):
    """获取用户"""
    users = db.get_all(table)
    return resp.success(users)


async def update_user(r):
    """修改用户"""
    uid = req.get_path(r, "uid")
    user = await req.get_json(r)
    user["id"] = uid
    db.update(table, uid, columns, [user.get("name")])
    return resp.success(user)


async def delete_user(r):
    """删除用户"""
    uid = req.get_path(r, "uid")

    # 响应
    return resp.success({"uid": uid})


async def get_user_by_id(r):
    """根据id查询用户"""
    uid = req.get_path(r, "uid")
    user = db.get_by_id(table, uid)
    return resp.success(user)


app = Api(
    routes=[
        resp.get("/user", get_user),
        resp.post("/user", add_user),
        resp.put("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
        resp.get("/user/{uid}", get_user_by_id),
    ],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

实现根据ID删除用户接口

第一步:编写接口

async def delete_user(r):
    """删除用户"""
    uid = req.get_path(r, "uid")
    db.delete(table, uid)
    return resp.success()
  • 1
  • 2
  • 3
  • 4
  • 5

第二步:注册接口

app = Api(
    routes=[
        resp.get("/user", get_user),
        resp.post("/user", add_user),
        resp.put("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
        resp.get("/user/{uid}", get_user_by_id),
    ],
    middleware=[middleware.cors()]
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

第三步:测试接口

{
  "method":"delete",
  "url": "http://localhost:8000/user/3"
}
  • 1
  • 2
  • 3
  • 4

第四步:使用查询所有接口,测试删除是否成功

{
  "method":"get",
  "url": "http://localhost:8000/user"
}
  • 1
  • 2
  • 3
  • 4

完整示例代码:

from api import req, resp, Api, middleware
from mysql.db_object import Database

# 创建db对象
db = Database(password="zhangdapeng520", database="user")
table = "user"
columns = ["name"]


async def add_user(r):
    """新增用户"""
    # 获取用户传递过来的用户信息
    user = await req.get_json(r)
    db.add(table, columns, [user.get("name")])
    # 响应
    return resp.success(user)


async def get_user(r):
    """获取用户"""
    users = db.get_all(table)
    return resp.success(users)


async def update_user(r):
    """修改用户"""
    uid = req.get_path(r, "uid")
    user = await req.get_json(r)
    user["id"] = uid
    db.update(table, uid, columns, [user.get("name")])
    return resp.success(user)


async def delete_user(r):
    """删除用户"""
    uid = req.get_path(r, "uid")
    db.delete(table, uid)
    return resp.success()


async def get_user_by_id(r):
    """根据id查询用户"""
    uid = req.get_path(r, "uid")
    user = db.get_by_id(table, uid)
    return resp.success(user)


app = Api(
    routes=[
        resp.get("/user", get_user),
        resp.post("/user", add_user),
        resp.put("/user/{uid}", update_user),
        resp.delete("/user/{uid}", delete_user),
        resp.get("/user/{uid}", get_user_by_id),
    ],
    middleware=[middleware.cors()]
)

if __name__ == '__main__':
    import uvicorn

    uvicorn.run("main:app")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

总结

1、学会了编写不同请求方法的接口

2、学会了操作数据库,对用户做增删改查

3、学会了开发REST风格的接口,比如用户的增删改查接口

后续建议:

  • 做一些实战类型的项目:《Vue3用户管理系统模板开发》,《Python+Vue3前后端分离用户管理系统实战》
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/791551
推荐阅读
相关标签
  

闽ICP备14008679号