最近我在改造手头负责的一个 HTTP Restful API 服务,集成对 gRPC 协议(在鹅厂叫 tRPC,是 gRPC 的本土衍生版本)的支持。由于是从 0 学习这个协议,因此也踩了不少坑,这里用一篇文章来记录下,希望可以帮助到同样从 0 学习 gRPC 的朋友。
列表参数赋值
老的 Rest 接口中,大量用到了列表参数,看了下 pb 协议,给这个参数定义如下:
message Foo {
repeated string Bar = 1;
}
在 Python RPC 客户端调用测试时,按照经验,传参如下:
req = pb.Foo()
req.Bar = ["a", "b"]
结果赋值就报错了:
AttributeError: Assignment not allowed to repeated field "Bar" in protocol message object.
经过一翻搜索,最终发现 Python 客户端应该如下赋值列表参数:
req = pb.Foo()
bar_list = ["a", "b"]
req.Bar.extend(bar_list)
这样就可以正常赋值了,但是总觉得这个设计太绕了,直接赋值不行么?总体来说,gRPC 还是偏编译型语言的设计的,在 Python 这种解释型语言里面则有点格格不入。
定义二维数组
无独有偶,老接口里面除了上面的单一列表参数之外,还有二维数组作为参数的接口,比如在生成表格图片的时候,pandas 数据里面就有大量二维数组数据,比如:
[["a", "b"], ["c", "d"]]
可以看到这个值并没有键名,因此我第一时间想用 pb 的嵌套参数来赋值,发现玩不转,比如:
message subArg {
repeated string arg1 = 1;
repeated string arg2 = 2;
}
message Arg {
subArg arg = 1;
}
解析时,会将 arg1、arg2 作为键名传递过去:
{
"arg1": ["a", "b"],
"arg2": ["c", "d"]
}
准确来说 Python 中是没有数组类型的,只有列表(list)和元组(tuple), 不过数组可以用 numpy 库来定义,所以这个场景其实也可以用 numpy 来搞定,不过经过我多番搜索,最终找到一个更简单应对这种复杂参数的解决办法:google.protobuf.Value
import "google/protobuf/struct.proto";
message Foo {
repeated google.protobuf.Value array = 1;
}
这样定义就解决了,google.protobuf.Value 是个好东西,他包含并兼容以下属性:
"boolValue": false,
"listValue": {},
"nullValue": 0,
"numberValue": 0,
"stringValue": "",
"structValue": {}
也就是说,对于复杂参数,只要属性是 bool、列表、数值型、字符串、结构体他都能支持。
定义空字典
在老的接口中,为了更加灵活,我设计了一些拓展参数,类似于 kwargs,你不传他就是空字典,你需要定义额外的参数就通过传入 {"foo": "bar"} 这类值来实现额外参数支持。
结果在 gRPC 场景中就寄了,pb 要定义字典,就必须用如下嵌套的方式:
message SubFoo {
string arg1 = 1;
string arg2 = 2;
}
message Foo {
SubFoo arg = 1;
}
这样才能实现 arg = {"arg1": "xxx", "arg2": "yyy"} 的赋值,但是要定义成空字典这样的拓展参数有没有办法呢?
经过一番研究折腾,发现还是有答案的!其实还是用 google.protobuf.Value 来解决!因此,只要遇到非固定值的参数,基本都可以用 google.protobuf.Value 来满足!
协议互转
在我进入这个项目的单元测试编写阶段时,我发现在 Python 客户端里面来定义老接口的 pb 参数实在是太太太太复杂了,因为老的 RestAPI 就是一个 JSON,有时候多层嵌套,要多复杂有多复杂,这个在 PB 里面定义简直要了老命,明明就是一个 JSON 可以解决的问题,PB 里面得绕几次才能完成赋值,极大的阻碍了我写单元测试的速度!
于是,我想了一个办法,我直接复制老 API 接口的单元测试里面的 JSON 参数,然后通过同时遍历 pb 协议对象里面的参数字典以及 JSON,来实现自动赋值,最开始我写了一个自动转换的函数来做这个逻辑,结果发现有太多要考虑的地方,健壮性不太行。
最终,经过研究折腾,发现官方本身就提供了一个JSON 和 PB 互转的方法,非常好用!
Contains routines for printing protocol messages in JSON format.
Simple usage example:
# Create a proto object and serialize it to a json format string. message = my_proto_pb2.MyMessage(foo=’bar’) json_string = json_format.MessageToJson(message)
# Parse a json format string to proto object. message = json_format.Parse(json_string, my_proto_pb2.MyMessage())
exception google.protobuf.json_format.Error
Top-level module error for json_format.
args
with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
这里简单贴一个例子,方便上手:
先定义一个 pb 协议
import "google/protobuf/struct.proto";
message MyRequest {
string a = 1;
repeated string b = 2;
repeated google.protobuf.Value c = 3;
}
此时客户端赋值通过 Parse 可以快速将 JSON 参数转换为 pb 参数:
import json
from google.protobuf.json_format import Parse, MessageToJson
# 其他代码略..
proxy = rpc.MyClientProxyImpl()
# 定义请求参数
payload = {
"a": "a",
"b": ["a","b"],
"c": [["a", "b"], ["c", "d"]]
}
payload = json.dumps(payload)
# 直接用 Parse 结合 JSON 和 pb 协议对象就能完成赋值,ignore_unknown_fields 表示忽略未知字段,即遇到 pb 未定义的参数直接跳过而不是报错
req = Parse(payload, pb.MyRequest(), ignore_unknown_fields=True)
# 其他代码略...
ret = proxy.MyFunc(ctx, req, options)
# 用 MessageToJson 可以将 RPC 的返回结果转成 JSON,不过这里需要先序列化再反序列化一次才能正常展示:
print(f"请求参数:{payload}, 响应内容:{json.dumps(json.loads(MessageToJson(ret)))}")
所以,通过自带的 Parse, MessageToJson 方法,我们可以快速将 JSON 转成 pb 协议,也可以将 RPC 结果快速转成 JSON,这个在 Python 这种解释型语言中非常实用!
随机端口
新服务开发并测试完毕,准备部署到我们的 K8S 集群,发现了一个问题:之前直接用 K8S 的 nodeport 就能快速访问基于 overlay 网络部署的老的 RestAPI 服务,但是用 RPC 之后,其实会将应用的 IP 和端口注册到服务发现中心,如果继续使用 overlay 模式的话,注册到服务发现中心的 IP 就是 K8S 的 overlay 私有 IP,那其他不在同一个 K8S 里面的服务是无法访问到这个私有 IP+端口的。
这里解决办法有两个:
- 将 overlay 模式改成 floatingIP,即容器会分配一个 underlay IP,直接将网络打平,这样整个内网都能访问到这个服务了;
- 服务改成 Host 网络模式启动,直接绑定 K8S 计算节点网络就能互访。
两个方案都有缺点,第一个是会造成 IP 地址浪费,第二个则会存在端口冲突问题。
最终,我选择了第二个方案,改成 Host 网络模式+随机端口的方式启动,就能完美解决端口冲突问题了。
目前我们的 RPC 框架对随机端口注册到服务发现中心并不完善,因此我这边写了一个 shell 脚本来解决了这个问题:原理是在启动 RPC 服务端之前,脚本先随机一个本地可以用的端口,然后修改 RPC 启动端口并启动。脚本内容如下:
#!/bin/bash
# ******************************************************
# Author : Jager
# Last modified : 2020-11-18 10:00
# Filename : get_random_port.sh
# Description : 获取系统可用的随机端口
# :
# ******************************************************
# 在指定范围内随机数字
# get_range_number STARTNUM ENDNUM
get_range_number()
{
min=$1
max=$(($2-$min+1))
num=$(cat /dev/urandom | head -n 10 | cksum | awk -F ' ' '{print $1}')
echo $(($num%$max+$min))
}
# 检查端口是否被系统占用(包括随机端口)
# check_port <PORT>
# 没有被占用: return 0
# 被占用或者参数为空: return 1
check_port()
{
if [[ -z $1 ]];then
return 1
fi
if ! awk -F '[: ]+' '{print strtonum("0x"$4)}' /proc/net/tcp | grep -wq $1;then
return 0
else
return 1
fi
}
# 获取可用随机端口
# get_random_port STARTPORT ENDPORT RETRY_TIMES
get_random_port()
{
round=0
start_port=${1:-10001}
end_port=${2:-19999}
retry_times=${3:-10000}
while ! check_port ${rand_port};do
export rand_port=$(get_range_number ${start_port} ${end_port})
let round+=1
if [[ ${round} -ge ${retry_times} ]];then
echo "${retry_times} retries, no ports available, export port 0"
export rand_port=0
fi
done
}
# 设置起始端口范围和最大重试次数
START_PORT=10001
END_PORT=19999
RETRY_TIMES=10000
# 生成可用的随机端口
get_random_port ${START_PORT} ${END_PORT} ${RETRY_TIMES}
echo 启动的随机端口为:${rand_port}
# 修改配置文件
export config_file=conf/${CONFIG_FILE:-trpc_python.yaml}
if [[ -f $config_file ]];then
sed -i "s/port: .*/port: $rand_port/g" $config_file
else
echo "指定的配置文件:$config_file 不存在,请检查!"
exit 1
fi
# 启动服务
exec python3 trpc_main.py -c ${config_file}
终于在解决上述问题之后,我手头的这个服务目前也并行支持了 RPC 协议,让服务形态更加丰富。折腾期间大大小小其实踩了不少坑,本文主要记录了一些比较典型、通用的问题,后续如果还有相关经验会继续补充进来。
大大,这访问速度太快了,上的cdn吗?
博主,交换友情链接吗?
博士赞一个
赞一个
博主你好,你的网站做得真好,可以跟你换个友链吗?
大佬,元旦快乐吖
话说大佬最近没管博客吗?评论都不审核
元旦快乐吖大佬
站点访问的速度确实很快。
这访问速度 简直是毫秒开啊
访问速度好快~
赞一个