使用grpc.aio的Python日志拦截器
我找不到一个关于Python的日志拦截器的例子,这个拦截器可以测量处理请求所花的时间。
我想知道在服务器端处理一个请求需要多长时间。我尝试自己写一个日志拦截器,但没有成功。
我觉得continuation
并没有等到请求完成。
这是我的.proto文件
syntax = "proto3";
package auth;
service SampleService {
rpc Hello(HelloRequest) returns (HelloResponse);
}
message HelloRequest{
string Name = 1;
}
message HelloResponse{
string Greeting = 1;
}
grpc.py
import asyncio
from typing import Callable, Awaitable
import grpc
from pkg.py.gen import service_pb2
from pkg.py.gen import service_pb2_grpc
from ..logger import get_logger
from ..services import Service
class SampleService(service_pb2_grpc.SampleServiceServicer):
def __init__(self) -> None:
super().__init__()
async def Hello(self, request: service_pb2.HelloRequest, _) -> service_pb2.HelloResponse:
await asyncio.sleep(1)
return service_pb2.HelloResponse(Greeting=Service.hello(request.Name))
class LoggingServerInterceptor(grpc.aio.ServerInterceptor):
async def intercept_service(
self,
continuation: Callable[
[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]
],
handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
...
app.py
from abc import ABC, abstractmethod
from concurrent import futures
import grpc.aio
from pkg.py.gen import service_pb2_grpc
from ..config import Config
from ..logger import get_logger
from ..transport import SampleService, LoggingServerInterceptor
class AbstractApp(ABC):
@abstractmethod
async def run(self) -> None:
...
@abstractmethod
async def stop(self) -> None:
...
class App(AbstractApp):
def __init__(self):
self.server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[LoggingServerInterceptor()]
)
service_pb2_grpc.add_SampleServiceServicer_to_server(
SampleService(), self.server
)
address = f"{Config().HOST}:{Config().PORT}"
logger = get_logger()
logger.info(f"Listening on {address}")
self.server.add_insecure_port(address)
async def run(self) -> None:
await self.server.start()
await self.server.wait_for_termination()
async def stop(self) -> None:
logger = get_logger()
logger.info("Shutting down in progress")
main.py
import asyncio
import sys
from pathlib import Path
root = Path(__file__).resolve().parents[2].__str__() # noqa: E402
sys.path.append(root) # noqa: E402
from internal.config import Config
from internal.logger import get_logger, setup_logger
from internal.app.app import App
def main() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
app = App()
logger = get_logger()
loop.create_task(app.run())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
logger.info("Gracefully shutting down")
loop.run_until_complete(app.stop())
loop.close()
logger.info("Shut down completed")
if __name__ == '__main__':
main()
1 个回答
0
找到了这个库:grpc-interceptor。
import time
from typing import Callable, Any
import grpc
from grpc_interceptor.server import AsyncServerInterceptor
class RequestLoggingInterceptor(AsyncServerInterceptor):
async def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
start_time = time.monotonic()
try:
return await method(request, context)
finally:
request_time = time.monotonic() - start_time
get_logger().info(
f"Request {method_name} - duration {request_time}s"
)