2078 字
10 分钟
Grpc学习记录
grpc学习记录
安装库
grpcio
grpcio-tools
protobuf
grpc特点
传输的是不可读的二进制数据
使用HTTP2
更快
数据安全
需要配置客户端
准备工作
写一个protobuf文件
hello_bilibili.proto
# 1使用哪种protobuf版本syntax = "proto3";# 2给包起一个名字package test;# 3服务器中服务的名字service Bilibili { # 4里面写各种函数 rpc HelloDewei(HelloDeweiReq) returns (HelloDeweiReply) {}}
# 5定义请求参数message HelloDeweiReq { string name = 1; int32 age = 2;}# 6定义响应参数message HelloDeweiReply { string result = 1;}
注意:在执行生成命令生成py文件时,注释要删掉
命令如下:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello_bilibili.proto
编写service文件
首先要导入grpc和上面由proto文件生成的两个py文件
service.py文件如下:
# -*- coding: utf-8 -*-# 0导包import grpcimport hello_bilibili_pb2 as pb2import hello_bilibili_pb2_grpc as pb2_grpc# 8导入线程池所需要的包from concurrent import futures# 13导入阻塞需要的time模块import time
# 1书写服务类,参数为生成的hello_bilibili_pb2_grpc文件里的BilibiliServicerclass Bilibili(pb2_grpc.BilibiliServicer): # 2实现服务类中的方法,request为请求参数,context为上下文 def HelloDewei(self, request, context): # 3下面是请求参数中的字段 name = request.name age = request.age # 4拼接返回值 result = f'my name is {name}, my age is {age}' # 5返回这个返回值 return pb2.HelloDeweiReply(result=result)
# 6编写服务启动函数def run(): # 7创建一个grpc服务器 # grpc是支持线程的,所以这里需要创建一个线程池,这需要导包 grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=4)) # 9将服务类注册到服务器,参数是Bilibili服务类和刚创建的grpc_server服务器 pb2_grpc.add_BilibiliServicer_to_server(Bilibili(), grpc_server) # 10绑定ip和端口号 grpc_server.add_insecure_port('0.0.0.0:5000') # 11启动服务器,注意启动服务器后要特殊处理,不然会启动一下就退出了 grpc_server.start() # 12阻塞主线程 try: while True: print('grpc_server is running') time.sleep(60 * 60 * 24) except KeyboardInterrupt: # 14如果按下ctrl+c,则正常停止服务器 print('KeyboardInterrupt, grpc_server is stop') grpc_server.stop(0)
if __name__ == '__main__': run()
编写client文件
client.py文件如下:
# -*- coding:utf-8 -*-# 0导包import grpcimport hello_bilibili_pb2 as pb2import hello_bilibili_pb2_grpc as pb2_grpc
# 1写一个主函数def run(): # 2创建一个grpc的频道channel,绑定服务端的ip和端口号 channel = grpc.insecure_channel('127.0.0.1:50001') # 3创建一个grpc的client绑定刚创建的频道channel client = pb2_grpc.BilibiliStub(channel=channel) # 4调用client中的方法,并传入参数 response = client.HelloDewei(pb2.HelloDeweiReq( name='dewei', age=18 )) print(response.result)
if __name__ == '__main__': run()
常见的protobuf数据类型
类型 | 说明 |
---|---|
string | 字符串类型,要求是utf-8或7-bit与ascii编码的字符串 |
bytes | 比特类型 |
bool | 布尔类型 |
int32 | 32位整形 |
int64 | 64位整形 |
float | 浮点类型 |
repeated | 数组(列表)repeated string data = 1; |
map | 字典类型(python叫法)map<string, string> data = 1; |
protobuf中的特殊字符
类型 | 说明 |
---|---|
package | 包名 |
syntax | Protobuf版本 |
service | 定义服务 |
rpc | 定义服务中的方法 |
stream | 定义的方法传输为流传输 |
message | 定义消息体 message User{} |
extend | 扩展消息体 extend User{} |
import | 导入一些插件 |
// | 注释 |
_pb2.py文件介绍
每一个message对应的信息存储,比如我们的request与response在这里被定义extension
_pb2_grpc.py文件介绍
用来存储每一个服务的server与客户端以及注册server的工具
客户端名为
服务器名为
注册服务为
扩写.proto文件
新的hello_bilibili.proto文件如下:
# 1使用哪种protobuf版本syntax = "proto3";# 2给包起一个名字package test;# 3服务器中服务的名字service Bilibili { # 4里面写各种函数 rpc HelloDewei(HelloDeweiReq) returns (HelloDeweiReply) {} # 7定义一个新的函数 # 11将请求参数和相应参数改为流式传输 rpc HelloTest(stream HelloTestRequest) returns (stream HelloTestResponse) {}}
# 5定义请求参数message HelloDeweiReq { string name = 1; int32 age = 2;}# 6定义响应参数message HelloDeweiReply { string result = 1;}# 8定义新的请求参数message HelloTestRequest { string name = 1; int64 age = 2; repeated string data = 3; # 9定义map键值对的值的数据结构 message HelloTestRequestNumberValue { string name = 1; int32 age = 2; bool is_active = 3; } map<string, HelloTestRequestNumberValue> number = 4; // string, int32, bool}
# 10定义新的响应参数message HelloTestResponse {}
然后使用生成命令重新生成那两个文件:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello_bilibili.proto
传输方式
- unary:单程
- stream:流
- 1.双向:客户端请求服务器端(流),服务器发送给客户端(流)
- 2.单向:服务器接收到客户端(流),服务器发送给客户端(非流)
- 3.单向:服务器接收到客户端(非流),服务器发送给客户端(流)
注意:单向流其实双方也建立长连接了,只是单向流主要是由一方主动发起,向对方传数据,双向流是双方都频繁传数据
服务器流式向客户端传输数据
扩写.proto文件
文件如下:
// 1使用哪种protobuf版本syntax = "proto3";// 2给包起一个名字package test;// 3服务器中服务的名字service Bilibili { // 4里面写各种函数 rpc HelloDewei(HelloDeweiReq) returns (HelloDeweiReply) {} // 7定义一个新的函数 // 11将请求参数和相应参数改为流式传输 rpc HelloTest(stream HelloTestRequest) returns (stream HelloTestResponse) {} // 12服务器以流的方式返回响应 rpc TestClientRecvStream(TestClientRecvStreamRequest) returns (stream TestClientRecvStreamResponse) {}}
// 5定义请求参数message HelloDeweiReq { string name = 1; int32 age = 2;}// 6定义响应参数message HelloDeweiReply { string result = 1;}// 8定义新的请求参数message HelloTestRequest { string name = 1; int64 age = 2; repeated string data = 3; // 9定义map键值对的值的数据结构 message HelloTestRequestNumberValue { string name = 1; int32 age = 2; bool is_active = 3; } map<string, HelloTestRequestNumberValue> number = 4; // string, int32, bool}
// 10定义新的响应参数message HelloTestResponse {}
// 13定义服务器流式向客户端返回响应的请求参数message TestClientRecvStreamRequest { string data = 1;}// 14定义服务器流式向客户端返回响应的响应参数message TestClientRecvStreamResponse { string result = 1;}
服务器文件
server.py文件如下:
# -*- coding: utf-8 -*-# 0导包import grpcimport hello_bilibili_pb2 as pb2import hello_bilibili_pb2_grpc as pb2_grpc# 8导入线程池所需要的包from concurrent import futures# 13导入阻塞需要的time模块import time
# 1书写服务类,参数为生成的hello_bilibili_pb2_grpc文件里的BilibiliServicerclass Bilibili(pb2_grpc.BilibiliServicer): # 2实现服务类中的方法,request为请求参数,context为上下文 def HelloDewei(self, request, context): # 3下面是请求参数中的字段 name = request.name age = request.age # 4拼接返回值 result = f'my name is {name}, my age is {age}' # 5返回这个返回值 return pb2.HelloDeweiReply(result=result) # 15定义服务器流式向客户端传输的函数 def TestClientRecvStream(self, request, context): index = 0 # 16首先看客户端是否是活跃状态 while context.is_active(): data = request.data # 18服务器主动关闭向客户端输出 if data == 'close': print('data is close, request will cancel') # 19关闭向客户端输出 context.cancel() index += 1 time.sleep(1) result = 'send %d %s' % (index, data) print(result) # 17流式向客户端返回 yield pb2.TestClientRecvStreamResponse( result=result )
# 6编写服务启动函数def run(): # 7创建一个grpc服务器 # grpc是支持线程的,所以这里需要创建一个线程池,这需要导包 grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=4)) # 9将服务类注册到服务器,参数是Bilibili服务类和刚创建的grpc_server服务器 pb2_grpc.add_BilibiliServicer_to_server(Bilibili(), grpc_server) # 10绑定ip和端口号 grpc_server.add_insecure_port('0.0.0.0:50001') # 11启动服务器,注意启动服务器后要特殊处理,不然会启动一下就退出了 grpc_server.start() # 12阻塞主线程 try: while True: print('grpc_server is running') time.sleep(60 * 60 * 24) except KeyboardInterrupt: # 14如果按下ctrl+c,则正常停止服务器 print('KeyboardInterrupt, grpc_server is stop') grpc_server.stop(0)
if __name__ == '__main__': run()
客户端文件
client.py文件如下:
# -*- coding:utf-8 -*-# 0导包import grpcimport hello_bilibili_pb2 as pb2import hello_bilibili_pb2_grpc as pb2_grpc
# 1写一个主函数def run(): # 2创建一个grpc的频道channel,绑定服务端的ip和端口号 channel = grpc.insecure_channel('127.0.0.1:50001') # 3创建一个grpc的client绑定刚创建的频道channel client = pb2_grpc.BilibiliStub(channel=channel) # 4调用client中的方法,并传入参数 # response = client.HelloDewei(pb2.HelloDeweiReq( # name='dewei', # age=18 # )) # print(response.result) # 5向客户端发送请求 response = client.TestClientRecvStream(pb2.TestClientRecvStreamRequest( data='dewei' )) # 6服务器流式向客户端返回 for item in response: print(item.result)
if __name__ == '__main__': run()