赞
踩
通过前面的文章了解到了gRPC
是什么,以及清楚使用它的优缺点,现在终于可以开始实现一个gRPC
服务了。
这里演示的是一个用户与书互动的项目,用户可以通过该项目进行注册,登录,注销等操作,同时也可以上传,查看和评论对应的书籍,通常情况下我们会由一个简单的Web应用来提供这些服务,现在,我们假设这个服务非常庞大,需要把他们按照功能拆分成不同的微服务了,这些服务与Web应用通过gRPC
进行通信。
注:由于篇幅原因,不会夹杂大量的源代码,需要跳转到
Github
中查看,同时对于业务逻辑也不会详细的介绍,所以可能需要一些接口开发经验才容易阅读懂。
在创建项目之前,我们需要确定我们的需求是什么,就像开发API接口一样,先了解需求,然后多方根据需求定义好接口,最后才为每个接口编写对应的代码,在这个项目中,我假定了拆分了两个服务,一个是与用户有关, 一个是与书籍有关,书籍部分又细分为书籍管理,书籍社交两部分。为此,先编写了Protobuf文件,之前在中说过,我们创建gRPC
对应的Protobuf文件应该放在一个公有的仓库中,这样就方便后续的Protobuf文件升级以及不同语言都能共享同一份Protobuf文件。
所以创建一个gRPC
服务的第一步就是先创建一个包含Protobuf文件的仓库,我把它命名为grpc-example-common
,具体源码可以通过获取。
这个仓库中pyproject.toml
文件的tool.poetry.dependencies
部分如下:
[tool.poetry.dependencies]
python = "^3.8"
grpcio = "^1.43.0"
grpcio-tools = "^1.43.0"
通过这部分文件可以知道这个项目是基于Python3.8
版本的,然后用到了2个依赖分别是grpcio
以及grpcio-tools
,其中grpcio
是Python
的gRPC
实现,它是通过c语言翻译的,所以很多底层都是c实现的,如果在使用gRPC框架的过程中找不到对应的使用方法说明,那可以直接到gRPC的c项目中找到对应的函数并查看它的函数说明进而了解该函数的作用;而另一个库grpcio-tools
的作用是把proto
文件转译为Python
代码,不过单靠grpcio-tools
转译的代码很难使用,比如是这段代码:
from grpc_example_common.protos.user.user_pb2 import LoginUserResult
login_user_result: LoginUserResult = LoginUserResult()
这段代码引入了由grpcio-tools
通过用户Protobuf文件生成的LoginUserResult
对象,开发者在后续想要使用这个对象的时候,IDE是没办法提示你这个对象有什么属性的,只能凭自己的记忆进行填写,或者回到对应的Protobuf文件查看该对象的定义:
message LoginUserResult {
string uid = 1;
string user_name = 2;
string token = 3;
}
发现它有uid
,user_name
,token
三个属性,然后才会在代码填写LoginUserResult
对象的属性进行调用:
from grpc_example_common.protos.user.user_pb2 import LoginUserResult
login_user_result: LoginUserResult = LoginUserResult(
uid="123",
user_name="so1n",
token="aaa"
)
print(login_user_result.uid)
# 123
这时即使填错了,比如uid
写为uid1
IDE也不会提示有错误,我们需要等到运行时报错才知道是填错了。
这样一个场景是会让开发者非常难受的,明明都定义了一个Protobuf文件,文件中已经写了这个消息有什么属性了,结果生成对应的类却无法让IDE了解它有什么属性(跳进去源码也无法知道),这时就需要通过来解决这一个问题。mypy-protobuf
会生成的一份独立的.pyi
文件,这样一来IDE就可以帮忙提示这个对象有什么属性了,如图: 此外,通过.pyi
文件可以使mypy
等工具校验我们的代码类型是否正确,这样在运行前就能知道代码是否有问题。
mypy-protobuf
的使用方法十分的简单,它以grpcio-tools
的一个插件来运行,具体的使用方法如下:
# 定义生产文件的存放目录,通常都会在指定的目录下生成一个proto的文件夹 target_p = "xxx" # 定义proto文件的目录 sourct_p = "xxx" python -m grpc_tools.protoc \ # 指定xxx_pb2文件和xxx_pb2_grpc文件生成位置,通常我们都让他们在同一个文件夹生产 --python_out=./$target_p \ --grpc_python_out=./$target_p \ # 指定proto文件的位置 -I. \ $source_p/user/*.proto # 上面是标准的grpcio-tools执行的标准语句 # 指定`mypy-protobuf`生成xxx_pb2和xxx_pb2_grpc对应的pyi文件的位置,必须与xxx_pb2和xxx_pb2_grpc位置保持一致 --mypy_grpc_out=./$target_p \ --mypy_out=./$target_p \
只要运行了这段命令,grpc_tools
就能在对应的路径下生成Protobuf对应的代码和对应的pyi
文件,不过当前的grpcio-tools
默认生成的代码所在的目录名是protos
,它认为这个目录是在项目对应的根目录下生成的,如果我们指定在某个子目录下生产对应的代码,那么在运行程序时会直接报错,因为生成的代码文件中有一个大概长成这样的语句:
# xxx为proto文件的名
from protos.xxx import
这意味着它永远都是从项目的根目录开始引入的protos
包,但我们根目录却没有这个包,所以就会报错,这时就需要手动把生成的语句替换为:
# xxx为proto文件的名
from .xxx import
这样就可以完美运行了,但是每个文件手动改一下会非常的麻烦,因为每次生成代码后都要手动更改代码,同时由于项目存在多个Protobuf文件,每个文件都需要执行一次命令才能生成对应的代码。对于一个开发者来说,最讨厌的就是一直执行重复的工作,这种工作是非常烦心的, 所以需要编写了一个脚本来自动的把所有Protobuf文件转为Python
代码(也就是项目中的gen_rpc.sh
文件),该脚本如下:
# 设置脚本运行的Python环境 export VENV_PREFIX="" if [ -d 'venv' ] ; then export VENV_PREFIX="venv/bin/" fi if [ -d '.venv' ] ; then export VENV_PREFIX=".venv/bin/" fi echo 'use venv path:' ${VENV_PREFIX} # 设置生成的存放Python代码的proto文件夹的目录 target_p='grpc_example_common' # 设置Proyobuf文件所在位置 source_p='protos' # 设置生成protobuf代码文件的文件名 service_list=("book" "user") # 清理之前生成的代码 rm -r "${target_p:?}/${source_p:?}"* # 创建对应的文件夹 mkdir -p "${target_p:?}/${source_p:?}" # 批处理 for service in "${service_list[@]}" do # 生成proto文件对应的Python代码逻辑,每个proto文件执行一次 mkdir -p "${target_p:?}/${source_p:?}/${service:?}" echo "from proto file:" $source_p/"$service"/*.proto "gen proto py file to" $target_p/$source_p ${VENV_PREFIX}python -m grpc_tools.protoc \ --mypy_grpc_out=./$target_p \ --mypy_out=./$target_p \ --python_out=./$target_p \ --grpc_python_out=./$target_p \ -I. \ $source_p/"$service"/*.proto # 创建一个__init__文件,这样一来这个文件夹就是一个包了,下面转换为from . import语句才能生效 touch $target_p/$source_p/"$service"/__init__.py # fix grpc tools bug sed -i "s/from protos.$service import/from . import/" $target_p/$source_p/$service/*.py done
这样一来,我们通过Protobuf文件生成Python
代码的操作就非常省心了,不管Protobuf文件有何改动,只要通过调用命令后就能在grpc_example_common.protos
目录下看到已经生成的最新的Python
代码,目前grpc_example_common
的项目结构如下:
├── grpc_example_common # Python与gRPC相关的调用 │ ├── helper │ ├── __init__.py │ ├── interceptor │ └── protos # 生成的对应Python代码 ├── protos # Protobuf文件 │ ├── book │ └── user ├──.flake8 # 格式化工具的配置 ├──.pre-commit-config.yaml # 格式化工具的配置 ├── gen_rpc.sh # 通过proto文件生成Python gRPC调用代码的脚本 ├── mypy.ini # 格式化工具的配置 ├── pyproject.toml # Python项目配置文件 ├── README.md ├── requirements-dev.txt # 测试环境的依赖文件 ├── requirements.txt # 正式环境的依赖文件 └── setup.py
通过项目结构可以看出还有其它的东西,这是我为了方便,我还在这个项目中添加一些Python
与gRPC
相关的调用封装,把它当做一个Python
的自定义包。
需要注意的是,每修改一次Protobuf文件应该视为一次版本发布,当生成完Protobuf文件的对应代码后,我们需要提交代码并打上对应的tag,这样其它项目才能引用到对应的版本代码。
grpc_example_common
目录下还有其它常用的封装,将会在后续章节介绍。
目前这个演示的项目有两个子gRPC项目,他们的结构很像,所以这一节以来阐述如何创建一个gRPC服务。
该项目的代码结构如下:
├── tests # 存放测试用例
│ ├── __init__.py
│ └── test_user.py
├── user_grpc_service # 项目代码真正所在的位置
│ ├── dal # service代码,一般用于查询Mysql,Redis的逻辑
│ ├── handler # 业务逻辑代码,继承对应Protobuf文件生成的类
│ ├── helper # 其它代码封装。
│ └── __init__.py
├── app.py # 项目代码入口
├── mypy.ini # mypy配置文件
├── pyproject.toml # 项目配置文件
└── user.sql # 项目初始化SQL
首先,该项目会通过如下配置引入一些依赖:
[tool.poetry.dependencies]
python = "^3.8"
DBUtils = "^3.0.0"
PyMySQL = "^1.0.2"
cryptography = "^36.0.1"
grpc_example_common = { git = "git@github.com:so1n/grpc-example-common.git", tag="v0.1.2"}
其中grpc_example_common
项目就是包括我们上面通过Protobuf生成的文件生产的代码,以及一些自定义的封装,通过引入依赖后,可以很方便的引用Protobuf文件生成的代码。
安装依赖后,就可以在项目中编写对应的gRPC服务了, 在这个项目里有一个比较简单的分层,所有的gRPC服务接口处理的函数都在放在user_grpc_service.handler
目录中,而与数据库交互的则放在user_grpc_service.dal
中。
编写服务的第一步,就是在user_grpc_service.handler
编写对应的代码,先创建一个名为user.py
的文件,该文件的代码值负责对User服务的调用,由于对于User服务只有一个子服务,里面只需要创建一个名为UserServicer
的类,这个类似继承于Protobuf生成的user_pb2_grpc.UserServicer
类,如下:
# 通过common包引入对应Protobuf文件生成的代码
from grpc_example_common.protos.user import user_pb2_grpc as user_service
class UserServicer(user_service.UserServicer):
pass
这时候IDE会在UserServicer
上显示波浪线,如果鼠标移到波浪线位置上,IDE会提示类 User 必须实现所有 abstract 方法
,于是点击实现abstract方法
后,就会自动生成类似于下面的代码:
from google.protobuf.empty_pb2 import Empty # type: ignore from grpc_example_common.protos.user import user_pb2 as user_message from grpc_example_common.protos.user import user_pb2_grpc as user_service class UserServicer(user_service.UserServicer): def logout_user(self, request: user_message.LogoutUserRequest, context: grpc.ServicerContext) -> Empty: pass def login_user(self, request: user_message.LoginUserRequest, context: grpc.ServicerContext) -> user_message.LoginUserResult: pass def create_user(self, request: user_message.CreateUserRequest, context: grpc.ServicerContext) -> Empty: pass def delete_user(self, request: user_message.DeleteUserRequest, context: grpc.ServicerContext) -> Empty: pass def check_user_login(self, request: user_message.LogoutUserRequest, context: grpc.ServicerContext) -> user_message.CheckLoginResult: pass
这段代码就是对应的Python
代码表达,当客户端调用UserServicer.logout_user
方法时,服务端就会自动转到该方法执行对应的逻辑,并返回结果给客户端,所以对于开发者来说只要专心完成好这几个接口的实现即可。开发者编写此处的业务逻辑代码与平时编写的API代码基本没什么差别,这里不多做阐述
不过需要注意的是从request
中得到的数据对象并不是Python
中常见的对象,而是gRPC封装的且类似于Python
常见的对象,如果直接用于pymysql
的类似代码:
with conn.cursor() as cursor:
cursor.execute(sql, param)
那么execute可能会转码失败,导致拼接不了正确的SQL,这时候可以把request
中得到的对象转为Python
中场见的对象,比如gRPC
的时间类型Timestamp
与Python
时间类型datetime
转换如下:
import datetime from dataclasses import MISSING from typing import Any, Optional from google.protobuf.timestamp_pb2 import Timestamp # type: ignore def timestamp_to_datetime(t: Timestamp, default: Any = MISSING) -> datetime.datetime: """replace proto.timestamp to python datetime.datetime""" if t.seconds == 0 and t.nanos == 0 and default != MISSING: return default return t.ToDatetime() def datetime_to_timestamp(d: Optional[datetime.datetime]) -> Timestamp: """replace python datetime.datetime to proto.timestamp""" t: Timestamp = Timestamp() if d: t.FromDatetime(d) return t
通过封装好的timestamp_to_datetime
和datetime_to_timestamp
可以方便的在业务逻辑中对gRPC
和Python
对象进行转换,更多类型转换见,不过这种转换的实现是非常简单的,性能也不是很好
业务代码编写完后,需要绑定到对应的Server
上面才能正常的提供服务,于是我们需要像创建Flask Server
一样,先创建一个服务,然后把路由注册进去,对于gRPC
的实现代码如下:
import logging import os from concurrent import futures from typing import List, Optional import grpc from grpc_example_common.interceptor.server_interceptor.base import BaseInterceptor from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptor from user_grpc_service.handler.user import UserService, user_service logging.basicConfig( format="[%(asctime)s %(levelname)s] %(message)s", datefmt="%y-%m-%d %H:%M:%S", level=logging.DEBUG, ) logger: logging.Logger = logging.getLogger() def main(host: str = "127.0.0.1", port: str = "9000", ssl_port: Optional[str] = None) -> None: # 拦截器列表 interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()] # 创建一个gRPC服务 server: grpc.server = grpc.server( futures.ThreadPoolExecutor(max_workers=10), interceptors=interceptor_list, ) # 绑定我们的业务实现到服务上 user_service.add_UserServicer_to_server(UserService(), server) if ssl_port: # 如果是启用了ssl,则读取文件,然后建立一个安全的连接 port = ssl_port # read in key and certificate with open(os.path.join(os.path.split(__file__)[0], "server.key")) as f: private_key = f.read().encode() with open(os.path.join(os.path.split(__file__)[0], "server.crt")) as f: certificate_chain = f.read().encode() # create server credentials server_creds = grpc.ssl_server_credentials( ( ( private_key, certificate_chain, ), ) ) server.add_secure_port(f"{host}:{port}", server_creds) else: # 否则建立一个普通的连接 server.add_insecure_port(f"{host}:{port}") # 启动服务 server.start() try: # 打印我们挂载了多少个子服务(也就是上面注册的服务) for generic_handler in server._state.generic_handlers: logger.info( f"add service name:{generic_handler.service_name()} cnt:{len(generic_handler._method_handlers)}" ) logger.info(f"server run in {host}:{port}") # 一直运行,直到被关闭 server.wait_for_termination() except KeyboardInterrupt: # 收到退出的信号,关闭服务 server.stop(0) if __name__ == "__main__": main()
可以看到这段代码非常简单, 但是他肩负了很多请求和连接的健康维护,会在后续的章节中详细介绍。
代码编写完成后就应该发起请求,看看这个服务是否能正常运行,但是gRPC
服务不像HTTP
服务一样可以在浏览器等地方输入一个URL就能发起一个请求,所以为了能验证我们的服务能否正常的运行,我们应该编写一个测试用例。
在官方文档[gRPC Testing]中介绍了gRPC
的测试用例编写方法,但是这个只覆盖到了业务代码,无法覆盖到拦截器,参数调优等逻辑,而我目前使用到了一个名为CustomerTopInterceptor
的拦截器,它在发现业务代码有异常的时候会把异常通过meta_data
传给客户端,然后客户端进行解析并抛出对应的异常(这种实现可能不是最优雅的,但是符合需求),如果采用了官方的gRPC Testing
,那在测试用例中山无法捕获到对应的异常的,所以只能采取其它的测试方法来编写一个覆盖范围更广的测试用例–[pytest-grpc]
首先是安装好[pytest-grpc]然后按照标准的测试用例编写习惯,在项目根目录创建一个名为tests
的目录,(当然,我也在pyproject.toml
指定了pytest
的执行目录为tests
),然后在里面编写每个子服务的测试代码,一般来说一个子服务对应一个Python
文件,接着在这个文件的最前面编写服务[pytest-grpc]要求的代码:
from typing import Callable, List import grpc from grpc_example_common.interceptor.client_interceptor.customer_top import ( CustomerTopInterceptor as ClientCustomerTopInterceptor, ) from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptor from grpc_example_common.protos.user import user_pb2, user_pb2_grpc from user_grpc_service.handler.user import UserService from user_grpc_service.helper.conn_proxy import SteadyDBConnection, g_db_pool # 应用的是整个文件的测试用例, 所以都需要写上@pytest.fixture(scope="module") @pytest.fixture(scope="module") def grpc_add_to_server() -> Callable: # 指定该子服务对应的添加服务接口 return user_pb2_grpc.add_UserServicer_to_server @pytest.fixture(scope="module") def grpc_servicer() -> UserService: # 指定我们编写该子服务的类 return UserService() @pytest.fixture(scope="module") def grpc_interceptors() -> List[grpc.ServerInterceptor]: # 指定服务端对应的拦截器 return [CustomerTopInterceptor()] @pytest.fixture(scope="module") def grpc_stub(grpc_channel: grpc.Channel) -> user_pb2_grpc.UserStub: # 指定子服务对应的客户端 # 这里会先生成一个channel,该channel用于跟服务端通信, # 同时它也有一个与服务端拦截器CustomerTopInterceptor对应的拦截器ClientCustomerTopInterceptor # 最后把channel应用到对应的子服务Stub上面 channel: grpc.Channel = grpc.intercept_channel(grpc_channel, ClientCustomerTopInterceptor()) return user_pb2_grpc.UserStub(channel)
创建完成后我们就可以继续在该文件编写对应的测试用例代码了,这样在运行的时候都会自动加载上面代码,然后我们可以在每个测试用例都使用上面代码创建的客户端grpc_stub
来发起请求进行测试。以创建用户和删除用户两个接口为例子,创建用户接口调用后会在数据库生成一条对应的数据,而删除用户接口会从数据库删除一条对应的数据,如果数据不存在于数据库,则会抛出RuntimeError
异常(具体代码逻辑可见[user_grpc_service/handler/user.py,他们的测试用例代码如下:
from contextlib import contextmanager from typing import Callable, Generator, List @contextmanager def mock_user( uid: str = "666666", user_name: str = "so1n", password: str = "123456" ) -> Generator[None, None, None]: """通过contextmanager可以在对应的代码逻辑创建一个用户,并在结束时自动清除该用户信息""" conn: SteadyDBConnection = g_db_pool.connection() try: with conn.cursor() as cursor: cursor.execute( "INSERT INTO user (uid, user_name, password) VALUES (%s, %s, %s)", (uid, user_name, password), ) conn.commit() yield finally: with conn.cursor() as cursor: cursor.execute("DELETE FROM user WHERE uid=%s", (uid,)) conn.commit() class TestUser: def test_create_user(self, grpc_stub: user_pb2_grpc.UserStub) -> None: """创建用户调用的测试用例""" try: request: user_pb2.CreateUserRequest = user_pb2.CreateUserRequest( uid="666666", user_name="so1n", password="123456" ) # 通过客户端带有的create_user方法发起请求,他会请求到我们的服务端代码 # 之后服务端的业务代码会在数据库创建一条对应的用户数据 grpc_stub.create_user(request, metadata=[]) finally: conn: SteadyDBConnection = g_db_pool.connection() conn.begin() with conn.cursor() as cursor: # 删除刚才创建的用户数据,返回删除的条目数量 ret: int = cursor.execute("DELETE FROM user WHERE uid=%s", ("666666",)) conn.commit() # 判断是否成功删除一条用户数据,如果是,则代表刚才创建成功。 assert ret == 1 def test_delete_user(self, grpc_stub: user_pb2_grpc.UserStub) -> None: """删除用户调用的测试用例""" uid: str = "666666" # 创建delete_user对应的请求对象 request: user_pb2.DeleteUserRequest = user_pb2.DeleteUserRequest(uid=uid) # user not found with pytest.raises(RuntimeError): # 当前数据库没有对应的用户数据,会抛出RuntimeError异常,如果pytest能够捕获到这个异常,则证明拦截器生效了。 grpc_stub.delete_user(request, metadata=[]) with mock_user(uid=uid): # 在数据库存在对应的用户数据下,能正常删除数据,并不抛错 grpc_stub.delete_user(request, metadata=[])
运行测试用例可以发现测试通过了,接下来就可以编写我们的API服务,在API服务中调用我们的gRPC服务。
gRPC
服务搭建完毕后,终于可以来编写API服务了,有了API服务后,才能把功能提供给了用户,API服务的项目结构如下:
├── app_service # API接口的服务,包括路由和调用封装 │ ├── social_book_route.py │ ├── manager_book_route.py │ ├── utils.py │ ├── route.py │ ├── user_route.py │ └── __init__.py ├── grpc_service # 调用gRPC的服务 │ ├── __init__.py │ ├── user_service.py │ └── book_service.py ├── tests # 测试用例 │ ├── test_route │ ├── __init__.py │ └── conftest.py ├── app.py # app代码 ├── gunicorn.conf.py # gunicorn的配置文件 ├── pyproject.toml ├── README.md └── mypy.ini
API服务与gRPC
服务一样,通过pyproject.toml
的配置:
[tool.poetry.dependencies]
python = "^3.8"
grpc_example_common = { git = "git@github.com:so1n/grpc-example-common.git", tag="v0.1.4" }
Flask = "^2.0.3"
引用了grpc_example_common
的库, 然后在grpc_service
中用到了该库,还是以用户服务为例子,用户服务的代码位于项目的grpc_service.user_service.py
中,这个代码首先是创建一个Mixin
的类:
class UserGrpcServiceMixin(object): def __init__(self, channel: grpc.Channel): self.user_stub: user_service.UserStub = user_service.UserStub(channel) # 通过grpc_wrapper.grpc_client_func_wrapper为所有的请求带上matedate参数 grpc_wrapper.auto_load_wrapper_by_stub(self.user_stub, grpc_wrapper.grpc_client_func_wrapper) def create_user(self, *, uid: str, user_name: str, password: str) -> None: self.user_stub.create_user(user_message.CreateUserRequest(uid=uid, user_name=user_name, password=password)) def delete_user(self, *, uid: str) -> None: self.user_stub.delete_user(user_message.DeleteUserRequest(uid=uid)) def login_user(self, *, uid: str, password: str) -> user_message.LoginUserResult: return self.user_stub.login_user(user_message.LoginUserRequest(uid=uid, password=password)) def logout_user(self, *, uid: str, token: str) -> None: self.user_stub.logout_user(user_message.LogoutUserRequest(uid=uid, token=token)) def get_uid_by_token(self, *, token: str) -> str: result: user_message.GetUidByTokenResult = self.user_stub.get_uid_by_token( user_message.GetUidByTokenRequest(token=token) ) return result.uid
这个类就是对user_service.UserStub
的简单封装,可以看到方法名和参数与Protobuf保持一致,它只是接受一个负责通信的channel,然后传入到user_stub
中,方便后续的方法对user_stub
调用,同时这个类还负责一些数据的转换,如上面提到的Protobuf
的Timestamps
对象转为Python
的datetime
对象。
接着创建一个负责子服务管理的UserGrpcSerevice
类,这个类负责建立通信和维护通信段稳定:
class UserGrpcService(UserGrpcServiceMixin): # 如果有多个,则在这里继承多个mixin def __init__(self, host: str, port: int) -> None: # 初始化与服务端的通信 self.channel: grpc.Channel = grpc.intercept_channel( grpc.insecure_channel(f"{host}:{port}"), CustomerTopInterceptor() ) # 传入到对应的服务里 UserGrpcServiceMixin.__init__(self, self.channel) def channel_ready_future(self, timeout: int = 10) -> None: # 用于检查是否与服务端建立起连接 target: str = ( f"{self.__class__.__name__}" f" {self.channel._channel._connectivity_state.channel.target().decode()}" # type: ignore ) # type: ignore try: grpc.channel_ready_future(self.channel).result(timeout=timeout) except grpc.FutureTimeoutError: logger.exception(f"channel:{target} connect timeout") raise RuntimeError(f"channel:{target} connect timeout") else: logger.info(f"channel:{target} connect success")
创建完毕后对于gRPC客户端调用服务端的逻辑已经封装完成了,接下来就是在路由函数中进行使用了,一般情况下都是使用单例的模式创建一个UserGrpcService
的实例,但是我不太喜欢这样做,于是创建了一个中间件,然后通过flask.g
传递创建app时初始化的gRPC
服务示例,对应的中间件处理代码如下:
from typing import Any, Union from flask import Blueprint, Flask, Response from flask import g as flask_g from flask import jsonify, request from grpc_service.book_service import BookGrpcService from grpc_service.user_service import UserGrpcService APP_TYPE = Union[Blueprint, Flask] class CustomerGType(object): """基于flasg.g的封装,这样就可以无忧的使用IDE的提示和重构功能了""" book_grpc_service: BookGrpcService user_grpc_service: UserGrpcService def __getattr__(self, key: str) -> Any: return getattr(flask_g, key) def __setattr__(self, key: str, value: Any) -> None: setattr(flask_g, key, value) g: CustomerGType = CustomerGType() class ContextMiddleware(object): """基于flask的before_request和after_request钩子创建的一个中间件类""" def __init__( self, *, app: APP_TYPE, book_grpc_service: BookGrpcService, user_grpc_service: UserGrpcService ) -> None: self._app = app self._app.before_request(self._before_requests) self._app.after_request(self._after_requests) self._book_grpc_service: BookGrpcService = book_grpc_service self._user_grpc_service: UserGrpcService = user_grpc_service def _before_requests(self) -> None: # 收到请求点时候把gRPC服务传到指定的变量中 g.book_grpc_service = self._book_grpc_service g.user_grpc_service = self._user_grpc_service return def _after_requests(self, response: Response) -> Response: return response
中间件创建完成后,就可以在create_app
工厂函数中创建对应的gRPC
服务,然后赋值到对应的中间件中:
from flask.app import Flask from app_service.route import manager_book_bp, social_book_bp, user_bp from app_service.utils import ContextMiddleware, api_exception from grpc_service.book_service import BookGrpcService from grpc_service.user_service import UserGrpcService def create_app() -> Flask: app: Flask = Flask(__name__) # 注册对应的路由 app.register_blueprint(manager_book_bp) app.register_blueprint(social_book_bp) app.register_blueprint(user_bp) # 初始化对应的gRPC服务,并等待建立连接 book_grpc_service: BookGrpcService = BookGrpcService("0.0.0.0", 9000) book_grpc_service.channel_ready_future(timeout=3) user_grpc_service: UserGrpcService = UserGrpcService("0.0.0.0", 9001) user_grpc_service.channel_ready_future(timeout=3) # 把gRPC服务注入到中间件中 ContextMiddleware(app=app, book_grpc_service=book_grpc_service, user_grpc_service=user_grpc_service) app.errorhandler(Exception)(api_exception) return app if __name__ == "__main__": create_app().run("localhost", port=8000)
最后,就可以在路由函数中使用对应的gRPC
服务了,还是以创建用户和删除用户为例子:
from flask import Response, request from grpc_example_common.protos.user import user_pb2 as user_message from app_service.utils import g, get_uid_by_token, make_response def create_user() -> Response: request_dict: dict = request.json() g.user_grpc_service.create_user( uid=request_dict["uid"], user_name=request_dict["user_name"], password=request_dict["password"] ) return make_response() def delete_user() -> Response: request_dict: dict = request.json() g.user_grpc_service.delete_user(uid=request_dict["uid"]) return make_response()
这两个接口都是在收到请求后,再调用gRPC
服务对应的方法来传递请求,其它服务调用的代码与创建用户和删除用户的例子相同
需要注意的是,通常我们不会在生产环境直接运行Flask
,而是采用gunicorn
+gevnet
来运行我们的API服务,从而增强服务的稳定性和性能,但是gevent
是修改Python
代码来达到全局代码都不阻塞的,而gRPC
本身的调用是包含C代码,gevent
无法修改到gRPC
调用到的C代码,所以gRPC
提供一个名为grpc.experimental.gevent.init_gevent
的方法来解决这个问题,如下代码:
import socket
try:
import gevent
except ImportError:
pass
else:
if socket.socket is gevent.socket.socket:
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()
这段代码在初始化时可以通过判断是否启用gevnet
来启用grpc.experimental.gevent.init_gevent
,通常建议放在gunicorn的配置文件里
至此,一个API服务就搭建完毕,可以直接运行后在浏览器进行测试。
截止到目前,对于包含gRPC的API服务接口测试没有一个比较好的方法,因为单例测试是不考虑别的服务的,意味着需要对gRPC调用段响应进行Mock,然而用于调用段Stub
类只有属性而没有方法,这样会导致mock不成功,所以需要先创建一个gRPC Stub的函数签名,以UserStub
为例子,将会创建一个类似于gRPC UserStub的类:
from typing import Any class UserStub(object): def __init__(self, channel: Any): pass def get_uid_by_token(self, *args: Any, **kwargs: Any) -> None: pass def logout_user(self, *args: Any, **kwargs: Any) -> None: pass def login_user(self, *args: Any, **kwargs: Any) -> None: pass def create_user(self, *args: Any, **kwargs: Any) -> None: pass def delete_user(self, *args: Any, **kwargs: Any) -> None: pass
然后在tests/conftest.py编写一个全局的初始化,该初始化会把gRPC的检查连接方法屏蔽以及把对应的Stub类进行替换:
import pytest from grpc import _utilities from grpc_example_common.protos.book import manager_pb2_grpc, social_pb2_grpc from grpc_example_common.protos.user import user_pb2_grpc from tests.grpc_abc_stub import BookManagerStub, BookSocialStub, UserStub def result(self: Any, timeout: Any = None) -> Any: pass # Blocking the start check of grpc service _utilities._ChannelReadyFuture.result = result user_pb2_grpc.UserStub = UserStub user_pb2_grpc.UserStub = UserStub social_pb2_grpc.BookSocialStub = BookSocialStub manager_pb2_grpc.BookManagerStub = BookManagerStub
这样就为测试用例初始化完成了, 但是我为每个Stub包装了一个功能,使他们永远会传递metadata
变量:
class UserGrpcServiceMixin(object):
def __init__(self, channel: grpc.Channel):
self.user_stub: user_service.UserStub = user_service.UserStub(channel)
grpc_wrapper.auto_load_wrapper_by_stub(self.user_stub, grpc_wrapper.grpc_client_func_wrapper)
该方法会初始化一定要放在mock之后,否则mock无效,这意味着初始化Flask.TestClient
的逻辑必须在测试代码里,于是先创建一个类似于pytest.fixture
的初始化Flask.TestClient
函数:
@contextmanager
def customer_app() -> Generator[FlaskClient, None, None]:
flask_app: Flask = create_app()
# Flask provides a way to test your application by exposing the Werkzeug test Client
# and handling the context locals for you.
client: FlaskClient = flask_app.test_client()
# Establish an application context before running the tests.
ctx: AppContext = flask_app.app_context()
ctx.push()
yield client # this is where the testing happens!
ctx.pop()
为了偷懒,我把他放到了conftest
文件。
如果没有包装过stub的方法,则可以不用采用该步骤
现在,所有初始化都编写完毕了,可以编写测试用例,以用户调用路由为例子(说明见注释):
from google.protobuf.empty_pb2 import Empty # type: ignore from grpc_example_common.protos.user import user_pb2 as user_message from pytest_mock import MockFixture from werkzeug.test import TestResponse from tests.conftest import customer_app class TestUser: def test_create_user(self, mocker: MockFixture) -> None: # 该接口会调用到UserStub.create_user,我们把他mock掉,返回的是Empty mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.create_user").return_value = Empty() with customer_app() as client: resp: TestResponse = client.post( "/api/user/create", json={"uid": "123", "user_name": "so1n", "password": "aha"} ) assert resp.json["code"] == 0 def test_delete_user(self, mocker: MockFixture) -> None: # 该接口会调用到UserStub.delete_user,我们把他mock掉,返回的是Empty mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.delete_user").return_value = Empty() with customer_app() as client: resp: TestResponse = client.post("/api/user/delete", json={"uid": "123"}) assert resp.json["code"] == 0 # User.Stub.delete_user还有一个会抛错的方法,我们通过mock满足这个条件 mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.delete_user").side_effect = RuntimeError( "test error" ) with customer_app() as client: resp = client.post("/api/user/delete", json={"uid": "123"}) assert resp.json["data"] == "test error" def test_login_user(self, mocker: MockFixture) -> None: # 通过mock指定具体的返回数据,返回的数据类型一定要跟Protobuf生成的代码一致 mocker.patch( "grpc_example_common.protos.user.user_pb2_grpc.UserStub.login_user" ).return_value = user_message.LoginUserResult(uid="123", token="66666") with customer_app() as client: resp: TestResponse = client.post("/api/user/login", json={"uid": "123", "password": "pw"}) assert resp.json["data"] == {"uid": "123", "token": "66666"} def test_logout(self, mocker: MockFixture) -> None: mocker.patch("grpc_example_common.protos.user.user_pb2_grpc.UserStub.logout_user").return_value = Empty() mocker.patch( "grpc_example_common.protos.user.user_pb2_grpc.UserStub.get_uid_by_token" ).return_value = user_message.GetUidByTokenResult(uid="123") with customer_app() as client: resp: TestResponse = client.post("/api/user/logout", json={"uid": "123"}, headers={"token": "666666"}) assert resp.json["code"] == 0 mocker.patch( "grpc_example_common.protos.user.user_pb2_grpc.UserStub.get_uid_by_token" ).return_value = user_message.GetUidByTokenResult(uid="1234") with customer_app() as client: resp = client.post("/api/user/logout", json={"uid": "123"}, headers={"token": "666666"}) assert resp.json["data"] == "Uid ERROR"
至此,已经实现了一个可以简单使用的gRPC服务,可以在电脑上起不同的进程并观察他们的调用情况,但是这只是一个开始,随着服务的扩大,服务间的维护和调优会变得十分麻烦,要想服务能够健壮的运行,我们需要继续深入。
当下这个大数据时代不掌握一门编程语言怎么跟的上时代呢?当下最火的编程语言Python前景一片光明!如果你也想跟上时代提升自己那么请看一下.
感兴趣的小伙伴,赠送全套Python学习资料,包含面试题、简历资料等具体看下方。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。