当前位置:   article > 正文

Python-gRPC实践(3)--使用Python实现gRPC服务_python grpc服务端

python grpc服务端

前言

通过前面的文章了解到了gRPC是什么,以及清楚使用它的优缺点,现在终于可以开始实现一个gRPC服务了。

这里演示的是一个用户与书互动的项目,用户可以通过该项目进行注册,登录,注销等操作,同时也可以上传,查看和评论对应的书籍,通常情况下我们会由一个简单的Web应用来提供这些服务,现在,我们假设这个服务非常庞大,需要把他们按照功能拆分成不同的微服务了,这些服务与Web应用通过gRPC进行通信。

注:由于篇幅原因,不会夹杂大量的源代码,需要跳转到Github中查看,同时对于业务逻辑也不会详细的介绍,所以可能需要一些接口开发经验才容易阅读懂。

1.初始化准备

在创建项目之前,我们需要确定我们的需求是什么,就像开发API接口一样,先了解需求,然后多方根据需求定义好接口,最后才为每个接口编写对应的代码,在这个项目中,我假定了拆分了两个服务,一个是与用户有关, 一个是与书籍有关,书籍部分又细分为书籍管理,书籍社交两部分。为此,先编写了Protobuf文件,之前在中说过,我们创建gRPC对应的Protobuf文件应该放在一个公有的仓库中,这样就方便后续的Protobuf文件升级以及不同语言都能共享同一份Protobuf文件。

所以创建一个gRPC服务的第一步就是先创建一个包含Protobuf文件的仓库,我把它命名为grpc-example-common,具体源码可以通过获取。

这个仓库中pyproject.toml文件的tool.poetry.dependencies部分如下:

[tool.poetry.dependencies]
python = "^3.8"
grpcio = "^1.43.0"
grpcio-tools = "^1.43.0"

  • 1
  • 2
  • 3
  • 4
  • 5

通过这部分文件可以知道这个项目是基于Python3.8版本的,然后用到了2个依赖分别是grpcio以及grpcio-tools,其中grpcioPythongRPC实现,它是通过c语言翻译的,所以很多底层都是c实现的,如果在使用gRPC框架的过程中找不到对应的使用方法说明,那可以直接到gRPC的c项目中找到对应的函数并查看它的函数说明进而了解该函数的作用;而另一个库grpcio-tools的作用是把proto文件转译为Python代码,不过单靠grpcio-tools转译的代码很难使用,比如是这段代码:

from grpc_example_common.protos.user.user_pb2 import LoginUserResult 


login_user_result: LoginUserResult = LoginUserResult()

  • 1
  • 2
  • 3
  • 4
  • 5

这段代码引入了由grpcio-tools通过用户Protobuf文件生成的LoginUserResult对象,开发者在后续想要使用这个对象的时候,IDE是没办法提示你这个对象有什么属性的,只能凭自己的记忆进行填写,或者回到对应的Protobuf文件查看该对象的定义:

message LoginUserResult {
  string uid = 1;
  string user_name = 2;
  string token = 3;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

发现它有uiduser_nametoken三个属性,然后才会在代码填写LoginUserResult对象的属性进行调用:

from grpc_example_common.protos.user.user_pb2 import LoginUserResult 


login_user_result: LoginUserResult = LoginUserResult(
    uid="123",
    user_name="so1n",
    token="aaa"
)
print(login_user_result.uid)
# 123

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这时即使填错了,比如uid写为uid1IDE也不会提示有错误,我们需要等到运行时报错才知道是填错了。

这样一个场景是会让开发者非常难受的,明明都定义了一个Protobuf文件,文件中已经写了这个消息有什么属性了,结果生成对应的类却无法让IDE了解它有什么属性(跳进去源码也无法知道),这时就需要通过来解决这一个问题。mypy-protobuf会生成的一份独立的.pyi文件,这样一来IDE就可以帮忙提示这个对象有什么属性了,如图: IDE提醒 此外,通过.pyi文件可以使mypy等工具校验我们的代码类型是否正确,这样在运行前就能知道代码是否有问题。

mypy-protobuf的使用方法十分的简单,它以grpcio-tools的一个插件来运行,具体的使用方法如下:

# 定义生产文件的存放目录,通常都会在指定的目录下生成一个proto的文件夹
target_p = "xxx"
# 定义proto文件的目录
sourct_p = "xxx"
python -m grpc_tools.protoc \
    # 指定xxx_pb2文件和xxx_pb2_grpc文件生成位置,通常我们都让他们在同一个文件夹生产
    --python_out=./$target_p \
    --grpc_python_out=./$target_p \
    # 指定proto文件的位置
    -I. \
    $source_p/user/*.proto
# 上面是标准的grpcio-tools执行的标准语句
    # 指定`mypy-protobuf`生成xxx_pb2和xxx_pb2_grpc对应的pyi文件的位置,必须与xxx_pb2和xxx_pb2_grpc位置保持一致
    --mypy_grpc_out=./$target_p \
    --mypy_out=./$target_p \

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

只要运行了这段命令,grpc_tools就能在对应的路径下生成Protobuf对应的代码和对应的pyi文件,不过当前的grpcio-tools默认生成的代码所在的目录名是protos,它认为这个目录是在项目对应的根目录下生成的,如果我们指定在某个子目录下生产对应的代码,那么在运行程序时会直接报错,因为生成的代码文件中有一个大概长成这样的语句:

# xxx为proto文件的名
from protos.xxx import

  • 1
  • 2
  • 3

这意味着它永远都是从项目的根目录开始引入的protos包,但我们根目录却没有这个包,所以就会报错,这时就需要手动把生成的语句替换为:

# xxx为proto文件的名
from .xxx import

  • 1
  • 2
  • 3

这样就可以完美运行了,但是每个文件手动改一下会非常的麻烦,因为每次生成代码后都要手动更改代码,同时由于项目存在多个Protobuf文件,每个文件都需要执行一次命令才能生成对应的代码。对于一个开发者来说,最讨厌的就是一直执行重复的工作,这种工作是非常烦心的, 所以需要编写了一个脚本来自动的把所有Protobuf文件转为Python代码(也就是项目中的gen_rpc.sh文件),该脚本如下:

# 设置脚本运行的Python环境 
export VENV_PREFIX=""
if [ -d 'venv' ] ; then
    export VENV_PREFIX="venv/bin/"
fi
if [ -d '.venv' ] ; then
    export VENV_PREFIX=".venv/bin/"
fi

echo 'use venv path:' ${VENV_PREFIX}

# 设置生成的存放Python代码的proto文件夹的目录 
target_p='grpc_example_common'
# 设置Proyobuf文件所在位置
source_p='protos'
# 设置生成protobuf代码文件的文件名 
service_list=("book" "user")

# 清理之前生成的代码
rm -r "${target_p:?}/${source_p:?}"*
# 创建对应的文件夹
mkdir -p "${target_p:?}/${source_p:?}"

# 批处理
for service in "${service_list[@]}"
do
  # 生成proto文件对应的Python代码逻辑,每个proto文件执行一次
  mkdir -p "${target_p:?}/${source_p:?}/${service:?}"
  echo  "from proto file:" $source_p/"$service"/*.proto "gen proto py file to" $target_p/$source_p
  ${VENV_PREFIX}python -m grpc_tools.protoc \
    --mypy_grpc_out=./$target_p \
    --mypy_out=./$target_p \
    --python_out=./$target_p \
    --grpc_python_out=./$target_p \
    -I. \
    $source_p/"$service"/*.proto

  # 创建一个__init__文件,这样一来这个文件夹就是一个包了,下面转换为from . import语句才能生效
  touch $target_p/$source_p/"$service"/__init__.py
  # fix grpc tools bug
  sed -i "s/from protos.$service import/from . import/" $target_p/$source_p/$service/*.py
done

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

这样一来,我们通过Protobuf文件生成Python代码的操作就非常省心了,不管Protobuf文件有何改动,只要通过调用命令后就能在grpc_example_common.protos目录下看到已经生成的最新的Python代码,目前grpc_example_common的项目结构如下:

├── grpc_example_common    # Python与gRPC相关的调用
│   ├── helper
│   ├── __init__.py
│   ├── interceptor
│   └── protos             # 生成的对应Python代码
├── protos                 # Protobuf文件
│   ├── book
│   └── user
├──.flake8                 # 格式化工具的配置
├──.pre-commit-config.yaml # 格式化工具的配置
├── gen_rpc.sh             # 通过proto文件生成Python gRPC调用代码的脚本
├── mypy.ini               # 格式化工具的配置
├── pyproject.toml         # Python项目配置文件
├── README.md
├── requirements-dev.txt   # 测试环境的依赖文件
├── requirements.txt       # 正式环境的依赖文件
└── setup.py 

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

通过项目结构可以看出还有其它的东西,这是我为了方便,我还在这个项目中添加一些PythongRPC相关的调用封装,把它当做一个Python的自定义包。

需要注意的是,每修改一次Protobuf文件应该视为一次版本发布,当生成完Protobuf文件的对应代码后,我们需要提交代码并打上对应的tag,这样其它项目才能引用到对应的版本代码。

grpc_example_common目录下还有其它常用的封装,将会在后续章节介绍。

2.编写gRPC服务项目

目前这个演示的项目有两个子gRPC项目,他们的结构很像,所以这一节以来阐述如何创建一个gRPC服务。

该项目的代码结构如下:

├── tests                 # 存放测试用例
│   ├── __init__.py
│   └── test_user.py
├── user_grpc_service     # 项目代码真正所在的位置
│   ├── dal               #   service代码,一般用于查询Mysql,Redis的逻辑
│   ├── handler           #   业务逻辑代码,继承对应Protobuf文件生成的类
│   ├── helper            #   其它代码封装。
│   └── __init__.py
├── app.py                # 项目代码入口
├── mypy.ini              # mypy配置文件
├── pyproject.toml        # 项目配置文件
└── user.sql              # 项目初始化SQL

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

首先,该项目会通过如下配置引入一些依赖:

[tool.poetry.dependencies]
python = "^3.8"
DBUtils = "^3.0.0"
PyMySQL = "^1.0.2"
cryptography = "^36.0.1"
grpc_example_common = { git = "git@github.com:so1n/grpc-example-common.git", tag="v0.1.2"}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

其中grpc_example_common项目就是包括我们上面通过Protobuf生成的文件生产的代码,以及一些自定义的封装,通过引入依赖后,可以很方便的引用Protobuf文件生成的代码。

安装依赖后,就可以在项目中编写对应的gRPC服务了, 在这个项目里有一个比较简单的分层,所有的gRPC服务接口处理的函数都在放在user_grpc_service.handler目录中,而与数据库交互的则放在user_grpc_service.dal中。

编写服务的第一步,就是在user_grpc_service.handler编写对应的代码,先创建一个名为user.py的文件,该文件的代码值负责对User服务的调用,由于对于User服务只有一个子服务,里面只需要创建一个名为UserServicer的类,这个类似继承于Protobuf生成的user_pb2_grpc.UserServicer类,如下:

# 通过common包引入对应Protobuf文件生成的代码
from grpc_example_common.protos.user import user_pb2_grpc as user_service

class UserServicer(user_service.UserServicer):
    pass

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这时候IDE会在UserServicer上显示波浪线,如果鼠标移到波浪线位置上,IDE会提示类 User 必须实现所有 abstract 方法,于是点击实现abstract方法后,就会自动生成类似于下面的代码:

from google.protobuf.empty_pb2 import Empty  # type: ignore
from grpc_example_common.protos.user import user_pb2 as user_message
from grpc_example_common.protos.user import user_pb2_grpc as user_service

class UserServicer(user_service.UserServicer):

    def logout_user(self, request: user_message.LogoutUserRequest,
                    context: grpc.ServicerContext) -> Empty:
        pass

    def login_user(self, request: user_message.LoginUserRequest,
                   context: grpc.ServicerContext) -> user_message.LoginUserResult:
        pass

    def create_user(self, request: user_message.CreateUserRequest,
                    context: grpc.ServicerContext) -> Empty:
        pass

    def delete_user(self, request: user_message.DeleteUserRequest,
                    context: grpc.ServicerContext) -> Empty:
        pass

    def check_user_login(self, request: user_message.LogoutUserRequest,
                         context: grpc.ServicerContext) -> user_message.CheckLoginResult:
        pass

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

这段代码就是对应的Python代码表达,当客户端调用UserServicer.logout_user方法时,服务端就会自动转到该方法执行对应的逻辑,并返回结果给客户端,所以对于开发者来说只要专心完成好这几个接口的实现即可。开发者编写此处的业务逻辑代码与平时编写的API代码基本没什么差别,这里不多做阐述

不过需要注意的是从request中得到的数据对象并不是Python中常见的对象,而是gRPC封装的且类似于Python常见的对象,如果直接用于pymysql的类似代码:

with conn.cursor() as cursor:
    cursor.execute(sql, param)

  • 1
  • 2
  • 3

那么execute可能会转码失败,导致拼接不了正确的SQL,这时候可以把request中得到的对象转为Python中场见的对象,比如gRPC的时间类型TimestampPython时间类型datetime转换如下:

import datetime
from dataclasses import MISSING
from typing import Any, Optional

from google.protobuf.timestamp_pb2 import Timestamp  # type: ignore

def timestamp_to_datetime(t: Timestamp, default: Any = MISSING) -> datetime.datetime:
    """replace proto.timestamp to python datetime.datetime"""
    if t.seconds == 0 and t.nanos == 0 and default != MISSING:
        return default
    return t.ToDatetime()

def datetime_to_timestamp(d: Optional[datetime.datetime]) -> Timestamp:
    """replace python datetime.datetime to proto.timestamp"""
    t: Timestamp = Timestamp()
    if d:
        t.FromDatetime(d)
    return t

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

通过封装好的timestamp_to_datetimedatetime_to_timestamp可以方便的在业务逻辑中对gRPCPython对象进行转换,更多类型转换见,不过这种转换的实现是非常简单的,性能也不是很好

业务代码编写完后,需要绑定到对应的Server上面才能正常的提供服务,于是我们需要像创建Flask Server一样,先创建一个服务,然后把路由注册进去,对于gRPC的实现代码如下:

import logging
import os
from concurrent import futures
from typing import List, Optional

import grpc
from grpc_example_common.interceptor.server_interceptor.base import BaseInterceptor
from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptor

from user_grpc_service.handler.user import UserService, user_service

logging.basicConfig(
    format="[%(asctime)s %(levelname)s] %(message)s",
    datefmt="%y-%m-%d %H:%M:%S",
    level=logging.DEBUG,
)
logger: logging.Logger = logging.getLogger()

def main(host: str = "127.0.0.1", port: str = "9000", ssl_port: Optional[str] = None) -> None:
    # 拦截器列表
    interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()]
    # 创建一个gRPC服务
    server: grpc.server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=interceptor_list,
    )
    # 绑定我们的业务实现到服务上
    user_service.add_UserServicer_to_server(UserService(), server)

    if ssl_port:
        # 如果是启用了ssl,则读取文件,然后建立一个安全的连接
        port = ssl_port
        # read in key and certificate
        with open(os.path.join(os.path.split(__file__)[0], "server.key")) as f:
            private_key = f.read().encode()
        with open(os.path.join(os.path.split(__file__)[0], "server.crt")) as f:
            certificate_chain = f.read().encode()
        # create server credentials
        server_creds = grpc.ssl_server_credentials(
            (
                (
                    private_key,
                    certificate_chain,
                ),
            )
        )
        server.add_secure_port(f"{host}:{port}", server_creds)
    else:
        # 否则建立一个普通的连接
        server.add_insecure_port(f"{host}:{port}")
    # 启动服务
    server.start()
    try:
        # 打印我们挂载了多少个子服务(也就是上面注册的服务)
        for generic_handler in server._state.generic_handlers:
            logger.info(
                f"add service name:{generic_handler.service_name()} cnt:{len(generic_handler._method_handlers)}"
            )
        logger.info(f"server run in {host}:{port}")
        # 一直运行,直到被关闭
        server.wait_for_termination()
    except KeyboardInterrupt:
        # 收到退出的信号,关闭服务
        server.stop(0)

if __name__ == "__main__":
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

可以看到这段代码非常简单, 但是他肩负了很多请求和连接的健康维护,会在后续的章节中详细介绍。

3.测试编写的gRPC服务

代码编写完成后就应该发起请求,看看这个服务是否能正常运行,但是gRPC服务不像HTTP服务一样可以在浏览器等地方输入一个URL就能发起一个请求,所以为了能验证我们的服务能否正常的运行,我们应该编写一个测试用例。

在官方文档[gRPC Testing]中介绍了gRPC的测试用例编写方法,但是这个只覆盖到了业务代码,无法覆盖到拦截器,参数调优等逻辑,而我目前使用到了一个名为CustomerTopInterceptor的拦截器,它在发现业务代码有异常的时候会把异常通过meta_data传给客户端,然后客户端进行解析并抛出对应的异常(这种实现可能不是最优雅的,但是符合需求),如果采用了官方的gRPC Testing,那在测试用例中山无法捕获到对应的异常的,所以只能采取其它的测试方法来编写一个覆盖范围更广的测试用例–[pytest-grpc]
首先是安装好[pytest-grpc]然后按照标准的测试用例编写习惯,在项目根目录创建一个名为tests的目录,(当然,我也在pyproject.toml指定了pytest的执行目录为tests),然后在里面编写每个子服务的测试代码,一般来说一个子服务对应一个Python文件,接着在这个文件的最前面编写服务[pytest-grpc]要求的代码:

from typing import Callable, List

import grpc
from grpc_example_common.interceptor.client_interceptor.customer_top import (
    CustomerTopInterceptor as ClientCustomerTopInterceptor,
)
from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptor
from grpc_example_common.protos.user import user_pb2, user_pb2_grpc

from user_grpc_service.handler.user import UserService
from user_grpc_service.helper.conn_proxy import SteadyDBConnection, g_db_pool

# 应用的是整个文件的测试用例, 所以都需要写上@pytest.fixture(scope="module")

@pytest.fixture(scope="module")
def grpc_add_to_server() -> Callable:
    # 指定该子服务对应的添加服务接口
    return user_pb2_grpc.add_UserServicer_to_server

@pytest.fixture(scope="module")
def grpc_servicer() -> UserService:
    # 指定我们编写该子服务的类
    return UserService()

@pytest.fixture(scope="module")
def grpc_interceptors() -> List[grpc.ServerInterceptor]:
    # 指定服务端对应的拦截器
    return [CustomerTopInterceptor()]

@pytest.fixture(scope="module")
def grpc_stub(grpc_channel: grpc.Channel) -> user_pb2_grpc.UserStub:
    # 指定子服务对应的客户端
    # 这里会先生成一个channel,该channel用于跟服务端通信,
    # 同时它也有一个与服务端拦截器CustomerTopInterceptor对应的拦截器ClientCustomerTopInterceptor
    # 最后把channel应用到对应的子服务Stub上面
    channel: grpc.Channel = grpc.intercept_channel(grpc_channel, ClientCustomerTopInterceptor())
    return user_pb2_grpc.UserStub(channel)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

创建完成后我们就可以继续在该文件编写对应的测试用例代码了,这样在运行的时候都会自动加载上面代码,然后我们可以在每个测试用例都使用上面代码创建的客户端grpc_stub来发起请求进行测试。以创建用户和删除用户两个接口为例子,创建用户接口调用后会在数据库生成一条对应的数据,而删除用户接口会从数据库删除一条对应的数据,如果数据不存在于数据库,则会抛出RuntimeError异常(具体代码逻辑可见[user_grpc_service/handler/user.py,他们的测试用例代码如下:

from contextlib import contextmanager
from typing import Callable, Generator, List

@contextmanager
def mock_user(
    uid: str = "666666", 
    user_name: str = "so1n", 
    password: str = "123456"
) -> Generator[None, None, None]:
    """通过contextmanager可以在对应的代码逻辑创建一个用户,并在结束时自动清除该用户信息"""
    conn: SteadyDBConnection = g_db_pool.connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute(
                "INSERT INTO user (uid, user_name, password) VALUES (%s, %s, %s)",
                (uid, user_name, password),
            )
            conn.commit()
        yield
    finally:
        with conn.cursor() as cursor:
            cursor.execute("DELETE FROM user WHERE uid=%s", (uid,))
            conn.commit()

class TestUser:
    def test_create_user(self, grpc_stub: user_pb2_grpc.UserStub) -> None:
        """创建用户调用的测试用例"""
        try:
            request: user_pb2.CreateUserRequest = user_pb2.CreateUserRequest(
                uid="666666", user_name="so1n", password="123456"
            )
            # 通过客户端带有的create_user方法发起请求,他会请求到我们的服务端代码
            # 之后服务端的业务代码会在数据库创建一条对应的用户数据
            grpc_stub.create_user(request, metadata=[])
        finally:
            conn: SteadyDBConnection = g_db_pool.connection()
            conn.begin()
            with conn.cursor() as cursor:
                # 删除刚才创建的用户数据,返回删除的条目数量
                ret: int = cursor.execute("DELETE FROM user WHERE uid=%s", ("666666",))
            conn.commit()
            # 判断是否成功删除一条用户数据,如果是,则代表刚才创建成功。 
            assert ret == 1

    def test_delete_user(self, grpc_stub: user_pb2_grpc.UserStub) -> None:
        """删除用户调用的测试用例"""
        uid: str = "666666"
        # 创建delete_user对应的请求对象
        request: user_pb2.DeleteUserRequest = user_pb2.DeleteUserRequest(uid=uid)
        # user not found
        with pytest.raises(RuntimeError):
            # 当前数据库没有对应的用户数据,会抛出RuntimeError异常,如果pytest能够捕获到这个异常,则证明拦截器生效了。
            grpc_stub.delete_user(request, metadata=[])
        with mock_user(uid=uid):
            # 在数据库存在对应的用户数据下,能正常删除数据,并不抛错
            grpc_stub.delete_user(request, metadata=[])

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

运行测试用例可以发现测试通过了,接下来就可以编写我们的API服务,在API服务中调用我们的gRPC服务。

4.编写API服务

gRPC服务搭建完毕后,终于可以来编写API服务了,有了API服务后,才能把功能提供给了用户,API服务的项目结构如下:

├── app_service                   # API接口的服务,包括路由和调用封装
│   ├── social_book_route.py
│   ├── manager_book_route.py
│   ├── utils.py
│   ├── route.py
│   ├── user_route.py
│   └── __init__.py
├── grpc_service                  # 调用gRPC的服务
│   ├── __init__.py
│   ├── user_service.py
│   └── book_service.py
├── tests                         # 测试用例
│   ├── test_route
│   ├── __init__.py
│   └── conftest.py
├── app.py                        # app代码
├── gunicorn.conf.py              # gunicorn的配置文件
├── pyproject.toml
├── README.md
└── mypy.ini

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

API服务与gRPC服务一样,通过pyproject.toml的配置:

[tool.poetry.dependencies]
python = "^3.8"
grpc_example_common = { git = "git@github.com:so1n/grpc-example-common.git", tag="v0.1.4" }
Flask = "^2.0.3"

  • 1
  • 2
  • 3
  • 4
  • 5

引用了grpc_example_common的库, 然后在grpc_service中用到了该库,还是以用户服务为例子,用户服务的代码位于项目的grpc_service.user_service.py中,这个代码首先是创建一个Mixin的类:

class UserGrpcServiceMixin(object):
    def __init__(self, channel: grpc.Channel):
        self.user_stub: user_service.UserStub = user_service.UserStub(channel)
        # 通过grpc_wrapper.grpc_client_func_wrapper为所有的请求带上matedate参数
        grpc_wrapper.auto_load_wrapper_by_stub(self.user_stub, grpc_wrapper.grpc_client_func_wrapper)

    def create_user(self, *, uid: str, user_name: str, password: str) -> None:
        self.user_stub.create_user(user_message.CreateUserRequest(uid=uid, user_name=user_name, password=password))

    def delete_user(self, *, uid: str) -> None:
        self.user_stub.delete_user(user_message.DeleteUserRequest(uid=uid))

    def login_user(self, *, uid: str, password: str) -> user_message.LoginUserResult:
        return self.user_stub.login_user(user_message.LoginUserRequest(uid=uid, password=password))

    def logout_user(self, *, uid: str, token: str) -> None:
        self.user_stub.logout_user(user_message.LogoutUserRequest(uid=uid, token=token))

    def get_uid_by_token(self, *, token: str) -> str:
        result: user_message.GetUidByTokenResult = self.user_stub.get_uid_by_token(
            user_message.GetUidByTokenRequest(token=token)
        )
        return result.uid

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

这个类就是对user_service.UserStub的简单封装,可以看到方法名和参数与Protobuf保持一致,它只是接受一个负责通信的channel,然后传入到user_stub中,方便后续的方法对user_stub调用,同时这个类还负责一些数据的转换,如上面提到的ProtobufTimestamps对象转为Pythondatetime对象。

接着创建一个负责子服务管理的UserGrpcSerevice类,这个类负责建立通信和维护通信段稳定:

class UserGrpcService(UserGrpcServiceMixin):
    # 如果有多个,则在这里继承多个mixin
    def __init__(self, host: str, port: int) -> None:
        # 初始化与服务端的通信
        self.channel: grpc.Channel = grpc.intercept_channel(
            grpc.insecure_channel(f"{host}:{port}"), CustomerTopInterceptor()
        )
        # 传入到对应的服务里
        UserGrpcServiceMixin.__init__(self, self.channel)

    def channel_ready_future(self, timeout: int = 10) -> None:
        # 用于检查是否与服务端建立起连接
        target: str = (
            f"{self.__class__.__name__}"
            f" {self.channel._channel._connectivity_state.channel.target().decode()}"  # type: ignore
        )  # type: ignore
        try:
            grpc.channel_ready_future(self.channel).result(timeout=timeout)
        except grpc.FutureTimeoutError:
            logger.exception(f"channel:{target} connect timeout")
            raise RuntimeError(f"channel:{target} connect timeout")
        else:
            logger.info(f"channel:{target} connect success")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

创建完毕后对于gRPC客户端调用服务端的逻辑已经封装完成了,接下来就是在路由函数中进行使用了,一般情况下都是使用单例的模式创建一个UserGrpcService的实例,但是我不太喜欢这样做,于是创建了一个中间件,然后通过flask.g传递创建app时初始化的gRPC服务示例,对应的中间件处理代码如下:

from typing import Any, Union

from flask import Blueprint, Flask, Response
from flask import g as flask_g
from flask import jsonify, request

from grpc_service.book_service import BookGrpcService
from grpc_service.user_service import UserGrpcService

APP_TYPE = Union[Blueprint, Flask]

class CustomerGType(object):
    """基于flasg.g的封装,这样就可以无忧的使用IDE的提示和重构功能了"""
    book_grpc_service: BookGrpcService
    user_grpc_service: UserGrpcService

    def __getattr__(self, key: str) -> Any:
        return getattr(flask_g, key)

    def __setattr__(self, key: str, value: Any) -> None:
        setattr(flask_g, key, value)


g: CustomerGType = CustomerGType()

class ContextMiddleware(object):
    """基于flask的before_request和after_request钩子创建的一个中间件类"""
    def __init__(
        self, *, app: APP_TYPE, book_grpc_service: BookGrpcService, user_grpc_service: UserGrpcService
    ) -> None:
        self._app = app
        self._app.before_request(self._before_requests)
        self._app.after_request(self._after_requests)

        self._book_grpc_service: BookGrpcService = book_grpc_service
        self._user_grpc_service: UserGrpcService = user_grpc_service

    def _before_requests(self) -> None:
        # 收到请求点时候把gRPC服务传到指定的变量中
        g.book_grpc_service = self._book_grpc_service
        g.user_grpc_service = self._user_grpc_service
        return

    def _after_requests(self, response: Response) -> Response:
        return response

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

中间件创建完成后,就可以在create_app工厂函数中创建对应的gRPC服务,然后赋值到对应的中间件中:

from flask.app import Flask

from app_service.route import manager_book_bp, social_book_bp, user_bp
from app_service.utils import ContextMiddleware, api_exception
from grpc_service.book_service import BookGrpcService
from grpc_service.user_service import UserGrpcService

def create_app() -> Flask:
    app: Flask = Flask(__name__)
    # 注册对应的路由
    app.register_blueprint(manager_book_bp)
    app.register_blueprint(social_book_bp)
    app.register_blueprint(user_bp)

    # 初始化对应的gRPC服务,并等待建立连接
    book_grpc_service: BookGrpcService = BookGrpcService("0.0.0.0", 9000)
    book_grpc_service.channel_ready_future(timeout=3)
    user_grpc_service: UserGrpcService = UserGrpcService("0.0.0.0", 9001)
    user_grpc_service.channel_ready_future(timeout=3)
    # 把gRPC服务注入到中间件中
    ContextMiddleware(app=app, book_grpc_service=book_grpc_service, user_grpc_service=user_grpc_service)

    app.errorhandler(Exception)(api_exception)
    return app

if __name__ == "__main__":
    create_app().run("localhost", port=8000)


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

最后,就可以在路由函数中使用对应的gRPC服务了,还是以创建用户和删除用户为例子:

from flask import Response, request
from grpc_example_common.protos.user import user_pb2 as user_message

from app_service.utils import g, get_uid_by_token, make_response

def create_user() -> Response:
    request_dict: dict = request.json()
    g.user_grpc_service.create_user(
        uid=request_dict["uid"], user_name=request_dict["user_name"], password=request_dict["password"]
    )
    return make_response()

def delete_user() -> Response:
    request_dict: dict = request.json()
    g.user_grpc_service.delete_user(uid=request_dict["uid"])
    return make_response()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

这两个接口都是在收到请求后,再调用gRPC服务对应的方法来传递请求,其它服务调用的代码与创建用户和删除用户的例子相同

需要注意的是,通常我们不会在生产环境直接运行Flask,而是采用gunicorn+gevnet来运行我们的API服务,从而增强服务的稳定性和性能,但是gevent是修改Python代码来达到全局代码都不阻塞的,而gRPC本身的调用是包含C代码,gevent无法修改到gRPC调用到的C代码,所以gRPC提供一个名为grpc.experimental.gevent.init_gevent的方法来解决这个问题,如下代码:

import socket

try:
    import gevent
except ImportError:
    pass
else:
    if socket.socket is gevent.socket.socket:
        import grpc.experimental.gevent

        grpc.experimental.gevent.init_gevent()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

这段代码在初始化时可以通过判断是否启用gevnet来启用grpc.experimental.gevent.init_gevent,通常建议放在gunicorn的配置文件里

至此,一个API服务就搭建完毕,可以直接运行后在浏览器进行测试。

5.测试编写的API服务

截止到目前,对于包含gRPC的API服务接口测试没有一个比较好的方法,因为单例测试是不考虑别的服务的,意味着需要对gRPC调用段响应进行Mock,然而用于调用段Stub类只有属性而没有方法,这样会导致mock不成功,所以需要先创建一个gRPC Stub的函数签名,以UserStub为例子,将会创建一个类似于gRPC UserStub的类:

from typing import Any

class UserStub(object):
    def __init__(self, channel: Any):
        pass

    def get_uid_by_token(self, *args: Any, **kwargs: Any) -> None:
        pass

    def logout_user(self, *args: Any, **kwargs: Any) -> None:
        pass

    def login_user(self, *args: Any, **kwargs: Any) -> None:
        pass

    def create_user(self, *args: Any, **kwargs: Any) -> None:
        pass

    def delete_user(self, *args: Any, **kwargs: Any) -> None:
        pass

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

其它的见:tests/grpc_abc_stub.py

然后在tests/conftest.py编写一个全局的初始化,该初始化会把gRPC的检查连接方法屏蔽以及把对应的Stub类进行替换:

import pytest
from grpc import _utilities
from grpc_example_common.protos.book import manager_pb2_grpc, social_pb2_grpc
from grpc_example_common.protos.user import user_pb2_grpc

from tests.grpc_abc_stub import BookManagerStub, BookSocialStub, UserStub

def result(self: Any, timeout: Any = None) -> Any:
    pass

# Blocking the start check of grpc service
_utilities._ChannelReadyFuture.result = result

user_pb2_grpc.UserStub = UserStub
user_pb2_grpc.UserStub = UserStub
social_pb2_grpc.BookSocialStub = BookSocialStub
manager_pb2_grpc.BookManagerStub = BookManagerStub

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

这样就为测试用例初始化完成了, 但是我为每个Stub包装了一个功能,使他们永远会传递metadata变量:

class UserGrpcServiceMixin(object):
    def __init__(self, channel: grpc.Channel):
        self.user_stub: user_service.UserStub = user_service.UserStub(channel)
        grpc_wrapper.auto_load_wrapper_by_stub(self.user_stub, grpc_wrapper.grpc_client_func_wrapper)

  • 1
  • 2
  • 3
  • 4
  • 5

该方法会初始化一定要放在mock之后,否则mock无效,这意味着初始化Flask.TestClient的逻辑必须在测试代码里,于是先创建一个类似于pytest.fixture的初始化Flask.TestClient函数:

@contextmanager
def customer_app() -> Generator[FlaskClient, None, None]:
    flask_app: Flask = create_app()
    # Flask provides a way to test your application by exposing the Werkzeug test Client
    # and handling the context locals for you.
    client: FlaskClient = flask_app.test_client()
    # Establish an application context before running the tests.
    ctx: AppContext = flask_app.app_context()
    ctx.push()
    yield client  # this is where the testing happens!
    ctx.pop()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

为了偷懒,我把他放到了conftest文件。

如果没有包装过stub的方法,则可以不用采用该步骤

现在,所有初始化都编写完毕了,可以编写测试用例,以用户调用路由为例子(说明见注释):

from google.protobuf.empty_pb2 import Empty  # type: ignore
from grpc_example_common.protos.user import user_pb2 as user_message
from pytest_mock import MockFixture
from werkzeug.test import TestResponse

from tests.conftest import customer_app

class TestUser:
    def test_create_user(self, mocker: MockFixture) -> None:
        # 该接口会调用到UserStub.create_user,我们把他mock掉,返回的是Empty
        mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.create_user").return_value = Empty()
        with customer_app() as client:
            resp: TestResponse = client.post(
                "/api/user/create", json={"uid": "123", "user_name": "so1n", "password": "aha"}
            )
            assert resp.json["code"] == 0

    def test_delete_user(self, mocker: MockFixture) -> None:
        # 该接口会调用到UserStub.delete_user,我们把他mock掉,返回的是Empty
        mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.delete_user").return_value = Empty()
        with customer_app() as client:
            resp: TestResponse = client.post("/api/user/delete", json={"uid": "123"})
            assert resp.json["code"] == 0
        # User.Stub.delete_user还有一个会抛错的方法,我们通过mock满足这个条件
        mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.delete_user").side_effect = RuntimeError(
            "test error"
        )
        with customer_app() as client:
            resp = client.post("/api/user/delete", json={"uid": "123"})
            assert resp.json["data"] == "test error"

    def test_login_user(self, mocker: MockFixture) -> None:
        # 通过mock指定具体的返回数据,返回的数据类型一定要跟Protobuf生成的代码一致
        mocker.patch(
            "grpc_example_common.protos.user.user_pb2_grpc.UserStub.login_user"
        ).return_value = user_message.LoginUserResult(uid="123", token="66666")
        with customer_app() as client:
            resp: TestResponse = client.post("/api/user/login", json={"uid": "123", "password": "pw"})
            assert resp.json["data"] == {"uid": "123", "token": "66666"}

    def test_logout(self, mocker: MockFixture) -> None:
        mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.logout_user").return_value = Empty()
        mocker.patch(
            "grpc_example_common.protos.user.user_pb2_grpc.UserStub.get_uid_by_token"
        ).return_value = user_message.GetUidByTokenResult(uid="123")
        with customer_app() as client:
            resp: TestResponse = client.post("/api/user/logout", json={"uid": "123"}, headers={"token": "666666"})
            assert resp.json["code"] == 0

        mocker.patch(
            "grpc_example_common.protos.user.user_pb2_grpc.UserStub.get_uid_by_token"
        ).return_value = user_message.GetUidByTokenResult(uid="1234")
        with customer_app() as client:
            resp = client.post("/api/user/logout", json={"uid": "123"}, headers={"token": "666666"})
            assert resp.json["data"] == "Uid ERROR"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

至此,已经实现了一个可以简单使用的gRPC服务,可以在电脑上起不同的进程并观察他们的调用情况,但是这只是一个开始,随着服务的扩大,服务间的维护和调优会变得十分麻烦,要想服务能够健壮的运行,我们需要继续深入。

---------------------------END---------------------------

题外话

当下这个大数据时代不掌握一门编程语言怎么跟的上时代呢?当下最火的编程语言Python前景一片光明!如果你也想跟上时代提升自己那么请看一下.

在这里插入图片描述

感兴趣的小伙伴,赠送全套Python学习资料,包含面试题、简历资料等具体看下方。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/492211
推荐阅读
相关标签