python反应式grpc支持

reactive-grpc的Python项目详细描述


build status

反应性grpc

一个简单的GRPC桥到反应流。

示例:给定以下协议缓冲区定义:

syntax = "proto3";

package rxgrpc.test;

service TestService {
  rpc GetOneToOne(TestRequest) returns (TestResponse) {}
  rpc GetOneToStream(TestRequest) returns (stream TestResponse) {}
  rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {}
  rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {}
}

message TestRequest {
  string message = 1;
}

message TestResponse {
  string message = 1;
}

一个简单的服务类:

fromtest.proto.test_pb2_grpcimportTestServiceServicerfromtest.protoimporttest_pb2class_Servicer(TestServiceServicer):defGetOneToOne(self,request:test_pb2.TestRequest,context):returntest_pb2.TestResponse(message='response: {}'.format(request.message))defGetOneToStream(self,request,context):foriinrange(3):yieldtest_pb2.TestResponse(message='response {}: {}'.format(i,request.message))defGetStreamToOne(self,request_iterator,context):returntest_pb2.TestResponse(message='response: {}'.format(', '.join(map(lambdad:d.message,request_iterator))))defGetStreamToStream(self,request_iterator,context):yield frommap(lambdad:test_pb2.TestResponse(message='response: {}'.format(d.message)),request_iterator)

一个简单的GRPC反应式服务器,其中请求消息被转换可以 创建如下:

fromtest.protoimporttest_pb2_grpc,test_pb2fromrxgrpcimportserver,mappersfromrximportoperatorsfromtest.proto.test_pb2_grpcimportTestServiceServicerclass_Servicer(TestServiceServicer):# ...passworkers=3rx_server=server.create_server(test_pb2,workers)test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(),rx_server)rx_server.add_insecure_port('[::]:50051')def_transform_message(m:test_pb2.TestRequest)->test_pb2.TestRequest:returntest_pb2.TestRequest(message='TRANSFORMED {}'.format(m.message))rx_server.set_grpc_observable(rx_server.grpc_pipe(operators.map(mappers.grpc_invocation_map(_transform_message)),method_name='/rxgrpc.test.TestService/GetOneToOne'),method_name='/rxgrpc.test.TestService/GetOneToOne')rx_server.start()

这里是一个流输入过滤器的示例:

fromrxgrpcimportoperatorsfromtest.protoimporttest_pb2def_filter_message(m:test_pb2.TestRequest)->test_pb2.TestRequest:returnbool(int(m.message[-1])%2)server=...server.set_grpc_observable(server.grpc_pipe(operators.filter(_filter_message),method_name='/rxgrpc.test.TestService/GetStreamToOne'),method_name='/rxgrpc.test.TestService/GetStreamToOne')

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
JavaSWT树项目高度   java如何通过单击按钮向JFrame添加文本字段?   java keytool如何保护密钥?   java在Apache Camel中构建一个报告来测量事务时间   安卓为什么这个Java类的参数不能按预期工作?   java参数化JUnit中的测试方法,而不仅仅是整个类   java N级地图,需要把地图放在地图里面   屏幕右侧的java抽屉?   JAVA网MalformedURLException:解析xml时没有协议异常   java为什么调用时出现JSON异常。getJSONObject()?   socketJava服务器关闭代码   java如何为计算器生成命令行参数   java如何将活动意图传递给另一个类?   java Apache HttpClient临时错误:NoHttpResponseException   java JVM选项XX:UseFastEmptyMethods/XX:UseFastAccessorMethods   类Java构造函数问题   验证PDF和Excel文件类型的java模式   java循环菜单问题   java如何返回tar。Spring中通过http的gz文件   java使用NetBeans设置图像库