2078 字
10 分钟
Grpc学习记录
2024-04-25

grpc学习记录#

安装库#

grpcio

grpcio-tools

protobuf

grpc特点#

传输的是不可读的二进制数据

使用HTTP2

更快

数据安全

需要配置客户端

准备工作#

写一个protobuf文件

hello_bilibili.proto

# 1使用哪种protobuf版本
syntax = "proto3";
# 2给包起一个名字
package test;
# 3服务器中服务的名字
service Bilibili {
# 4里面写各种函数
rpc HelloDewei(HelloDeweiReq) returns (HelloDeweiReply) {}
}
# 5定义请求参数
message HelloDeweiReq {
string name = 1;
int32 age = 2;
}
# 6定义响应参数
message HelloDeweiReply {
string result = 1;
}

注意:在执行生成命令生成py文件时,注释要删掉

命令如下:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello_bilibili.proto

编写service文件#

首先要导入grpc和上面由proto文件生成的两个py文件

service.py文件如下:

# -*- coding: utf-8 -*-
# 0导包
import grpc
import hello_bilibili_pb2 as pb2
import hello_bilibili_pb2_grpc as pb2_grpc
# 8导入线程池所需要的包
from concurrent import futures
# 13导入阻塞需要的time模块
import time
# 1书写服务类,参数为生成的hello_bilibili_pb2_grpc文件里的BilibiliServicer
class Bilibili(pb2_grpc.BilibiliServicer):
# 2实现服务类中的方法,request为请求参数,context为上下文
def HelloDewei(self, request, context):
# 3下面是请求参数中的字段
name = request.name
age = request.age
# 4拼接返回值
result = f'my name is {name}, my age is {age}'
# 5返回这个返回值
return pb2.HelloDeweiReply(result=result)
# 6编写服务启动函数
def run():
# 7创建一个grpc服务器
# grpc是支持线程的,所以这里需要创建一个线程池,这需要导包
grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
# 9将服务类注册到服务器,参数是Bilibili服务类和刚创建的grpc_server服务器
pb2_grpc.add_BilibiliServicer_to_server(Bilibili(), grpc_server)
# 10绑定ip和端口号
grpc_server.add_insecure_port('0.0.0.0:5000')
# 11启动服务器,注意启动服务器后要特殊处理,不然会启动一下就退出了
grpc_server.start()
# 12阻塞主线程
try:
while True:
print('grpc_server is running')
time.sleep(60 * 60 * 24)
except KeyboardInterrupt:
# 14如果按下ctrl+c,则正常停止服务器
print('KeyboardInterrupt, grpc_server is stop')
grpc_server.stop(0)
if __name__ == '__main__':
run()

编写client文件#

client.py文件如下:

# -*- coding:utf-8 -*-
# 0导包
import grpc
import hello_bilibili_pb2 as pb2
import hello_bilibili_pb2_grpc as pb2_grpc
# 1写一个主函数
def run():
# 2创建一个grpc的频道channel,绑定服务端的ip和端口号
channel = grpc.insecure_channel('127.0.0.1:50001')
# 3创建一个grpc的client绑定刚创建的频道channel
client = pb2_grpc.BilibiliStub(channel=channel)
# 4调用client中的方法,并传入参数
response = client.HelloDewei(pb2.HelloDeweiReq(
name='dewei',
age=18
))
print(response.result)
if __name__ == '__main__':
run()

常见的protobuf数据类型#

类型说明
string字符串类型,要求是utf-8或7-bit与ascii编码的字符串
bytes比特类型
bool布尔类型
int3232位整形
int6464位整形
float浮点类型
repeated数组(列表)repeated string data = 1;
map字典类型(python叫法)map<string, string> data = 1;

protobuf中的特殊字符#

类型说明
package包名
syntaxProtobuf版本
service定义服务
rpc定义服务中的方法
stream定义的方法传输为流传输
message定义消息体 message User{}
extend扩展消息体 extend User{}
import导入一些插件
//注释

_pb2.py文件介绍#

每一个message对应的信息存储,比如我们的request与response在这里被定义extension

_pb2_grpc.py文件介绍#

用来存储每一个服务的server与客户端以及注册server的工具

客户端名为 + Stub

服务器名为 +Servicer

注册服务为_服务端名_to_server

扩写.proto文件#

新的hello_bilibili.proto文件如下:

# 1使用哪种protobuf版本
syntax = "proto3";
# 2给包起一个名字
package test;
# 3服务器中服务的名字
service Bilibili {
# 4里面写各种函数
rpc HelloDewei(HelloDeweiReq) returns (HelloDeweiReply) {}
# 7定义一个新的函数
# 11将请求参数和相应参数改为流式传输
rpc HelloTest(stream HelloTestRequest) returns (stream HelloTestResponse) {}
}
# 5定义请求参数
message HelloDeweiReq {
string name = 1;
int32 age = 2;
}
# 6定义响应参数
message HelloDeweiReply {
string result = 1;
}
# 8定义新的请求参数
message HelloTestRequest {
string name = 1;
int64 age = 2;
repeated string data = 3;
# 9定义map键值对的值的数据结构
message HelloTestRequestNumberValue {
string name = 1;
int32 age = 2;
bool is_active = 3;
}
map<string, HelloTestRequestNumberValue> number = 4; // string, int32, bool
}
# 10定义新的响应参数
message HelloTestResponse {}

然后使用生成命令重新生成那两个文件:

Terminal window
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello_bilibili.proto

传输方式#

  • unary:单程
  • stream:流
    • 1.双向:客户端请求服务器端(流),服务器发送给客户端(流)
    • 2.单向:服务器接收到客户端(流),服务器发送给客户端(非流)
    • 3.单向:服务器接收到客户端(非流),服务器发送给客户端(流)

注意:单向流其实双方也建立长连接了,只是单向流主要是由一方主动发起,向对方传数据,双向流是双方都频繁传数据

服务器流式向客户端传输数据#

扩写.proto文件#

文件如下:

// 1使用哪种protobuf版本
syntax = "proto3";
// 2给包起一个名字
package test;
// 3服务器中服务的名字
service Bilibili {
// 4里面写各种函数
rpc HelloDewei(HelloDeweiReq) returns (HelloDeweiReply) {}
// 7定义一个新的函数
// 11将请求参数和相应参数改为流式传输
rpc HelloTest(stream HelloTestRequest) returns (stream HelloTestResponse) {}
// 12服务器以流的方式返回响应
rpc TestClientRecvStream(TestClientRecvStreamRequest) returns (stream TestClientRecvStreamResponse) {}
}
// 5定义请求参数
message HelloDeweiReq {
string name = 1;
int32 age = 2;
}
// 6定义响应参数
message HelloDeweiReply {
string result = 1;
}
// 8定义新的请求参数
message HelloTestRequest {
string name = 1;
int64 age = 2;
repeated string data = 3;
// 9定义map键值对的值的数据结构
message HelloTestRequestNumberValue {
string name = 1;
int32 age = 2;
bool is_active = 3;
}
map<string, HelloTestRequestNumberValue> number = 4; // string, int32, bool
}
// 10定义新的响应参数
message HelloTestResponse {}
// 13定义服务器流式向客户端返回响应的请求参数
message TestClientRecvStreamRequest {
string data = 1;
}
// 14定义服务器流式向客户端返回响应的响应参数
message TestClientRecvStreamResponse {
string result = 1;
}

服务器文件#

server.py文件如下:

# -*- coding: utf-8 -*-
# 0导包
import grpc
import hello_bilibili_pb2 as pb2
import hello_bilibili_pb2_grpc as pb2_grpc
# 8导入线程池所需要的包
from concurrent import futures
# 13导入阻塞需要的time模块
import time
# 1书写服务类,参数为生成的hello_bilibili_pb2_grpc文件里的BilibiliServicer
class Bilibili(pb2_grpc.BilibiliServicer):
# 2实现服务类中的方法,request为请求参数,context为上下文
def HelloDewei(self, request, context):
# 3下面是请求参数中的字段
name = request.name
age = request.age
# 4拼接返回值
result = f'my name is {name}, my age is {age}'
# 5返回这个返回值
return pb2.HelloDeweiReply(result=result)
# 15定义服务器流式向客户端传输的函数
def TestClientRecvStream(self, request, context):
index = 0
# 16首先看客户端是否是活跃状态
while context.is_active():
data = request.data
# 18服务器主动关闭向客户端输出
if data == 'close':
print('data is close, request will cancel')
# 19关闭向客户端输出
context.cancel()
index += 1
time.sleep(1)
result = 'send %d %s' % (index, data)
print(result)
# 17流式向客户端返回
yield pb2.TestClientRecvStreamResponse(
result=result
)
# 6编写服务启动函数
def run():
# 7创建一个grpc服务器
# grpc是支持线程的,所以这里需要创建一个线程池,这需要导包
grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
# 9将服务类注册到服务器,参数是Bilibili服务类和刚创建的grpc_server服务器
pb2_grpc.add_BilibiliServicer_to_server(Bilibili(), grpc_server)
# 10绑定ip和端口号
grpc_server.add_insecure_port('0.0.0.0:50001')
# 11启动服务器,注意启动服务器后要特殊处理,不然会启动一下就退出了
grpc_server.start()
# 12阻塞主线程
try:
while True:
print('grpc_server is running')
time.sleep(60 * 60 * 24)
except KeyboardInterrupt:
# 14如果按下ctrl+c,则正常停止服务器
print('KeyboardInterrupt, grpc_server is stop')
grpc_server.stop(0)
if __name__ == '__main__':
run()

客户端文件#

client.py文件如下:

# -*- coding:utf-8 -*-
# 0导包
import grpc
import hello_bilibili_pb2 as pb2
import hello_bilibili_pb2_grpc as pb2_grpc
# 1写一个主函数
def run():
# 2创建一个grpc的频道channel,绑定服务端的ip和端口号
channel = grpc.insecure_channel('127.0.0.1:50001')
# 3创建一个grpc的client绑定刚创建的频道channel
client = pb2_grpc.BilibiliStub(channel=channel)
# 4调用client中的方法,并传入参数
# response = client.HelloDewei(pb2.HelloDeweiReq(
# name='dewei',
# age=18
# ))
# print(response.result)
# 5向客户端发送请求
response = client.TestClientRecvStream(pb2.TestClientRecvStreamRequest(
data='dewei'
))
# 6服务器流式向客户端返回
for item in response:
print(item.result)
if __name__ == '__main__':
run()
Grpc学习记录
https://fuwari.cbba.top/posts/grpc学习记录/
作者
Chen_Feng
发布于
2024-04-25
许可协议
CC BY-NC-SA 4.0