利用Python语言的grpc实现消息传送详解
作者:we34dfg 发布时间:2023-09-12 14:00:07
标签:Python,grpc,消息
1. grpc开源包的安装
# conda
$ conda create -n grpc_env python=3.9
# install grpc
$ pip install grpc -i https://pypi.doubanio.com/simple
$ pip install grpc-tools -i https://pypi.doubanio.com/simple
# 有时proto生成py文件不对就是得换换grpc两个包的版本
2. grpc的使用之传送消息
整体结构,client.py server.py 和proto目录下的example.proto
1)在example.proto定义传送体
// 声明
syntax = "proto3";
package proto;
// service创建
service HelloService{
rpc Hello(Request) returns (Response) {} // 单单传送消息
}
// 请求参数消息体 1、2是指参数顺序
message Request {
string data = 1;
}
// 返回参数消息体
message Response {
int32 ret = 1; //返回码
string data = 2;
}
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto
2) 在虚拟环境里使用命令生成py文件
$ conda activate grpc_env
$ f:
$ cd F:\examples
$ python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto
在proto目录下会生成两个py文件,如下图所示:
3) 编辑client.py 和 server.py
# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具体功能实现"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def server(ip: str, port: int) -> None:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # ⼤⼩为10的线程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
while True:
time.sleep(60 * 60)
except Exception as es:
print(es)
server.stop(0)
if __name__ == '__main__':
server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
def client(ip: str, port: int) -> None:
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target) # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
data = "hello 123"
request = example_pb2.Request(data=data)
res = cli.Hello(request)
print(f"ret:{res.ret}, data:{res.data}")
if __name__ == '__main__':
client("127.0.0.1", 8000)
3. grpc的使用之数据传输大小配置
默认情况下,gRPC 将传入消息限制为 4 MB。 传出消息没有限制。
1)example.proto定义不变
2)编辑client.py 和 server.py
# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具体功能实现"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def server(ip: str, port: int) -> None:
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
while True:
time.sleep(60 * 60)
except Exception as es:
print(es)
server.stop(0)
if __name__ == '__main__':
server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
def client(ip: str, port: int) -> None:
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
data = "hello 123" * 1024 * 1024
request = example_pb2.Request(data=data)
res = cli.Hello(request)
print(f"ret:{res.ret}, data:{res.data}")
if __name__ == '__main__':
client("127.0.0.1", 8000)
4. grpc的使用之超时配置
1)example.proto定义不变
2)编辑client.py 和 server.py
# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具体功能实现"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
time.sleep(2)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def server(ip: str, port: int) -> None:
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
while True:
time.sleep(60 * 60)
except Exception as es:
print(es)
server.stop(0)
if __name__ == '__main__':
server("127.0.0.1", 8000)
# client.py
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
def client(ip: str, port: int) -> None:
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
try:
data = "hello 123"
request = example_pb2.Request(data=data)
res = cli.Hello(request, timeout=1) # timeout 单位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
if __name__ == '__main__':
client("127.0.0.1", 8000)
运行结果:
grpc.RpcError Deadline Exceeded
5. grpc之大文件之流stream传输
1)在example.proto重新定义传送体
// 声明
syntax = "proto3";
package proto;
// service创建
service HelloService{
rpc Hello(Request) returns (Response) {} // 单单传送消息
rpc ClientTOServer(stream UpFileRequest) returns (Response) {} // 流式上传文件
rpc ServerTOClient(Request) returns (stream UpFileRequest) {} // 流式下载文件
}
// 请求参数消息体 1、2是指参数顺序
message Request {
string data = 1;
}
// 返回参数消息体
message Response {
int32 ret = 1; //返回码
string data = 2;
}
message UpFileRequest {
string filename = 1;
int64 sendsize = 2;
int64 totalsize = 3;
bytes data = 4;
}
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto
2)在虚拟环境里使用命令生成py文件,参考2. 2)
3)编辑client.py 和 server.py
# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具体功能实现"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
time.sleep(2)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def ClientTOServer(self, request_iterator, context):
"""上传文件"""
data = bytearray()
for UpFileRequest in request_iterator:
file_name = UpFileRequest.filename
file_size = UpFileRequest.totalsize
file_data = UpFileRequest.data
print(f"文件名称:{file_name}, 文件总长度:{file_size}")
data.extend(file_data) # 拼接两个bytes
print(f"已接收长度:{len(data)}")
if len(data) == file_size:
with open("242_copy.mp3", "wb") as fw:
fw.write(data)
print(f"{file_name=} 下载完成")
(ret, res) = (0, file_name)
else:
print(f"{file_name=} 下载失败")
(ret, res) = (-1, file_name)
return example_pb2.Response(ret=ret, data=res)
def ServerTOClient(self, request, context):
"""下载文件"""
fp = request.data
print(f"下载文件:{fp=}")
# 获取文件名和文件大小
file_name = os.path.basename(fp)
file_size = os.path.getsize(fp) # 获取文件大小
# 发送文件内容
part_size = 1024 * 1024 # 每次读取1MB数据
count = 1
with open(fp, "rb") as fr:
while True:
try:
if count == 1:
count += 1
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
else:
context = fr.read(part_size)
if context:
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
sendsize=len(context),
data=context)
else:
print(f"发送完毕")
return 0
except Exception as es:
print(es)
def server(ip: str, port: int) -> None:
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
while True:
time.sleep(60 * 60)
except Exception as es:
print(es)
server.stop(0)
if __name__ == '__main__':
server("127.0.0.1", 8000)
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
def send_stream_data(fp: str):
"""迭代器发送大文件"""
# 获取文件名和文件大小
file_name = os.path.basename(fp)
file_size = os.path.getsize(fp) # 获取文件大小
# 发送文件内容
part_size = 1024 * 1024 # 每次读取1MB数据
count = 1
with open(fp, "rb") as fr:
while True:
try:
if count == 1:
count += 1
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
else:
context = fr.read(part_size)
if context:
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
data=context)
else:
print(f"发送完毕")
return 0
except Exception as es:
print(es)
def client(ip: str, port: int) -> None:
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
try:
data = "hello 123"
request = example_pb2.Request(data=data)
res = cli.Hello(request, timeout=1) # timeout 单位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
def client_to_server(ip: str, port: int, fp: str):
"""
流式上传数据。
"""
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
try:
request = send_stream_data(fp=fp)
res = cli.ClientTOServer(request, timeout=600) # timeout 单位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
def server_to_client(ip: str, port: int, fp: str):
"""
流式上传数据。
"""
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
try:
data = bytearray()
request = example_pb2.Request(data=fp)
filename = ""
for res in cli.ServerTOClient(request, timeout=300):
filename = res.filename
total_size = res.totalsize
data += res.data
if total_size == len(data):
with open("242_1.mp3", "wb") as fw:
fw.write(data)
print(f"{filename=} : {total_size=} 下载完成!")
else:
print(f"{filename=} 下载失败!")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
if __name__ == '__main__':
# client("127.0.0.1", 8000)
# client_to_server("127.0.0.1", 8000, "242.mp3")
server_to_client("127.0.0.1", 8000, "242.mp3")
6. grpc之大文件之流async异步传输
# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
import asyncio
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具体功能实现"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
time.sleep(2)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def ClientTOServer(self, request_iterator, context):
"""上传文件"""
data = bytearray()
for UpFileRequest in request_iterator:
file_name = UpFileRequest.filename
file_size = UpFileRequest.totalsize
file_data = UpFileRequest.data
print(f"文件名称:{file_name}, 文件总长度:{file_size}")
data.extend(file_data) # 拼接两个bytes
print(f"已接收长度:{len(data)}")
if len(data) == file_size:
with open("242_copy.mp3", "wb") as fw:
fw.write(data)
print(f"{file_name=} 下载完成")
(ret, res) = (0, file_name)
else:
print(f"{file_name=} 下载失败")
(ret, res) = (-1, file_name)
return example_pb2.Response(ret=ret, data=res)
def ServerTOClient(self, request, context):
"""下载文件"""
fp = request.data
print(f"下载文件:{fp=}")
# 获取文件名和文件大小
file_name = os.path.basename(fp)
file_size = os.path.getsize(fp) # 获取文件大小
# 发送文件内容
part_size = 1024 * 1024 # 每次读取1MB数据
count = 1
with open(fp, "rb") as fr:
while True:
try:
if count == 1:
count += 1
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
else:
context = fr.read(part_size)
if context:
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
sendsize=len(context),
data=context)
else:
print(f"发送完毕")
return 0
except Exception as es:
print(es)
async def server(ip: str, port: int) -> None:
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
await server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
await server.wait_for_termination()
except Exception as es:
print(es)
await server.stop(None)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)]))
loop.close()
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
import asyncio
def send_stream_data(fp: str):
"""迭代器发送大文件"""
# 获取文件名和文件大小
file_name = os.path.basename(fp)
file_size = os.path.getsize(fp) # 获取文件大小
# 发送文件内容
part_size = 1024 * 1024 # 每次读取1MB数据
count = 1
with open(fp, "rb") as fr:
while True:
try:
if count == 1:
count += 1
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
else:
context = fr.read(part_size)
if context:
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
data=context)
else:
print(f"发送完毕")
return 0
except Exception as es:
print(es)
async def client(ip: str, port: int) -> None:
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
async with grpc.aio.insecure_channel(target, options=options) as channel: # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
try:
data = "hello 123"
request = example_pb2.Request(data=data)
res = await cli.Hello(request, timeout=3) # timeout 单位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
async def client_to_server(ip: str, port: int, fp: str):
"""
流式上传数据。
"""
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
async with grpc.aio.insecure_channel(target, options=options) as channel: # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
try:
request = send_stream_data(fp=fp)
res = await cli.ClientTOServer(request, timeout=600) # timeout 单位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
def server_to_client(ip: str, port: int, fp: str):
"""
流式上传数据。
"""
# 数据传输大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub
try:
data = bytearray()
request = example_pb2.Request(data=fp)
filename = ""
for res in cli.ServerTOClient(request, timeout=300):
filename = res.filename
total_size = res.totalsize
data += res.data
if total_size == len(data):
with open("242_1.mp3", "wb") as fw:
fw.write(data)
print(f"{filename=} : {total_size=} 下载完成!")
else:
print(f"{filename=} 下载失败!")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
if __name__ == '__main__':
# asyncio.run(client("127.0.0.1", 8000))
asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3"))
# server_to_client("127.0.0.1", 8000, "242.mp3")
结论: 在本地测了一下不加async和加async的文件上传传送, async还慢点,嘿嘿嘿。
来源:https://blog.csdn.net/we34dfg/article/details/129663394
0
投稿
猜你喜欢
- 函数名称:ReplaceHTML参数:@Textstr作用:去掉 @Textstr 内的HTML代码备注:需要给数据库访问者master.s
- 为了方便的实现记录数据、修改数据没有精力去做一个完整的系统去管理数据。因此,在python的控制台直接实现一个简易的数据管理系统,包括数据的
- __author__ = 'clownfish'#coding:utf-8import urllib2,urllib,coo
- 总结了部分所学、所听、所看、所问的一些CSS写作经验,书写高效的CSS - 漫谈CSS的渲染效率,它们与渲染效率及所占用
- 前言最近因为工作的需要,在写一些python脚本,总是使用print来打印信息感觉很low,所以抽空研究了一下python的logging库
- 本文实例为大家分享了python实现复制大量文件的具体代码,供大家参考,具体内容如下本来是去项目公司拷数据,结果去了发现有500G,靠系统的
- 这篇文章主要介绍了安装PyInstaller失败问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的
- 原文链接:https://blog.csdn.net/Fairy_Nan/article/details/105914203HDF也是一种自
- 在使用PyTorch做实验时经常会用到生成随机数Tensor的方法,比如:torch.rand()torch.randn()torch.no
- 从Android 3.0开始除了我们重点讲解的Fragment外,Action Bar也是一个重要的内容,Action Bar主要是用于代替
- 本文实例讲述了python分支、循环简单用法。分享给大家供大家参考,具体如下:讲程序设计,不得不讲到顺序、分支、循环。顺序就是从上到下运行代
- 目录安装PyPDF2模块创建文件,准备PDF文档万事俱备,准备开拆文档的拆分思路python拆分计算公式:具体怎么拆?完整拆分程序:列表拆分
- Django是一个大而全的框架。需要明确的是,传参进行分页获取分页后的数据,一般都是通过向服务器发送get请求的方式实现的,在向后端服务发送
- 这几天正在追剧,原名《大秦帝国之天下》的《大秦赋》,看着看着又想把前几部刷一遍了,但第一部《裂变》自己没有高清资源,搜了一波发现yout
- 本文实例讲述了python实现的多任务版udp聊天器。分享给大家供大家参考,具体如下:说明编写一个有2个线程的程序线程1用来接收数据然后显示
- 动态联接库(DLL)是加快应用程序关键部分的执行速度的重要方法,但有一点恐怕大部分人都不知道,那就是在ASP文件也能通过调用DLL来加快服务
- DJANGO_SETTINGS_MODULE使用Django时要通知Django当前使用的是哪个配置文件。可以改变环境变量 DJANGO_S
- 本文实例为大家分享了Python使用Pygame绘制时钟的具体代码,供大家参考,具体内容如下前提条件:需要安装pygame功能:1.初始化界
- 如果你从未为MySQL设置根用户密码,服务器在以根用户身份进行连接时不需要密码。但是,建议你为每个账户设置密码。如果你以前设置了根用户密码,
- 废话真的一句也不想多说,直接看代码吧!# -*- coding: utf-8 -*- import numpy from sklearn i