python反应式grpc支持
reactive-grpc的Python项目详细描述
反应性grpc
一个简单的GRPC桥到反应流。
示例:给定以下协议缓冲区定义:
syntax = "proto3"; package rxgrpc.test; service TestService { rpc GetOneToOne(TestRequest) returns (TestResponse) {} rpc GetOneToStream(TestRequest) returns (stream TestResponse) {} rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {} rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {} } message TestRequest { string message = 1; } message TestResponse { string message = 1; }
一个简单的服务类:
fromtest.proto.test_pb2_grpcimportTestServiceServicerfromtest.protoimporttest_pb2class_Servicer(TestServiceServicer):defGetOneToOne(self,request:test_pb2.TestRequest,context):returntest_pb2.TestResponse(message='response: {}'.format(request.message))defGetOneToStream(self,request,context):foriinrange(3):yieldtest_pb2.TestResponse(message='response {}: {}'.format(i,request.message))defGetStreamToOne(self,request_iterator,context):returntest_pb2.TestResponse(message='response: {}'.format(', '.join(map(lambdad:d.message,request_iterator))))defGetStreamToStream(self,request_iterator,context):yield frommap(lambdad:test_pb2.TestResponse(message='response: {}'.format(d.message)),request_iterator)
一个简单的GRPC反应式服务器,其中请求消息被转换可以 创建如下:
fromtest.protoimporttest_pb2_grpc,test_pb2fromrxgrpcimportserver,mappersfromrximportoperatorsfromtest.proto.test_pb2_grpcimportTestServiceServicerclass_Servicer(TestServiceServicer):# ...passworkers=3rx_server=server.create_server(test_pb2,workers)test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(),rx_server)rx_server.add_insecure_port('[::]:50051')def_transform_message(m:test_pb2.TestRequest)->test_pb2.TestRequest:returntest_pb2.TestRequest(message='TRANSFORMED {}'.format(m.message))rx_server.set_grpc_observable(rx_server.grpc_pipe(operators.map(mappers.grpc_invocation_map(_transform_message)),method_name='/rxgrpc.test.TestService/GetOneToOne'),method_name='/rxgrpc.test.TestService/GetOneToOne')rx_server.start()
这里是一个流输入过滤器的示例:
fromrxgrpcimportoperatorsfromtest.protoimporttest_pb2def_filter_message(m:test_pb2.TestRequest)->test_pb2.TestRequest:returnbool(int(m.message[-1])%2)server=...server.set_grpc_observable(server.grpc_pipe(operators.filter(_filter_message),method_name='/rxgrpc.test.TestService/GetStreamToOne'),method_name='/rxgrpc.test.TestService/GetStreamToOne')