gRPC服务器内的PythonProcessPoolExecutor进程在没有抛出任何错误的情况下死亡

2024-04-25 12:38:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个gRPC服务器,它使用ProcessPoolExecutormax_workers=1来运行脚本Here就是为什么我必须在run_brain_applicationmax_workers=1中使用ProcessPoolExecutor。下面是服务器脚本的简化版本

我放置了两个print语句来打印process的开头和结尾

问题是,对于某些请求,流程有时会启动,但从未完成。我的意思是它打印START,但我从来没有看到STOP。同一个请求在下一次起作用,因此问题不在于请求。此外,如果run_application中存在异常,则会在没有任何问题的情况下抛出该异常

我已经检查了有关Stackoverflow的所有相关问题,但其中大多数(12)都是关于他们的进程没有抛出异常,这不是我的情况

任何想法都非常感谢

谢谢

class BrainServer(brain_pb2_grpc.BrainServiceServicer):

    def run_brain_application(self, request, context):
            request_dict = MessageToDict(
                request, 
                preserving_proto_field_name=True,
                keep_null=True
            )

            print("START")
            with futures.ProcessPoolExecutor(max_workers=1) as executor:
                result = executor.submit(run_application, request_dict).result()
            res = result.get('data')
            print("STOP")
            report = dict_to_protobuf(result_pb2.Result, {res_type: res}, strict=False)

            return report

        except Exception as e:
            _log_fatal(
                msg=str(e)
            )
            raise e


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=16))
    brain_pb2_grpc.add_BrainServiceServicer_to_server(BrainServer(), server)

    server.add_insecure_port(f"[::]:{GRPC_PORT}")
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

Tags: run服务器grpcserverapplicationrequestresresult
2条回答

这是一个很难确定的问题

我首先建议给你的遗嘱执行人增加超时时间

我还建议只使用ThreadPoolExecutor,看看问题是否得到解决。pythonlogger和其他一些组件在多进程环境中使用时表现不佳See。我的猜测是,您使用的python资源不应该在进程之间共享,但它们确实是(我的直接怀疑是logger

当父进程是多线程的时,使用os.fork创建子进程是有问题的。
concurrent.futures.ProcessPoolExecutor内部使用的multiprocesing模块提供了多种创建子进程的方法。这些方法称为start方法,它们被命名为forkspawnforkserver

建议多线程父级使用forkserver方法创建子级,因为在forkserver方法中,创建的服务器是单线程的,多线程父级请求服务器创建子级。由于服务器是单线程的os.fork,因此没有问题

concurrent.futures.ProcessPoolExecutor接受mp_context参数,它使用该参数设置池中进程的启动方法。
所以你应该通过mp_context

import multiprocessing as mp

ctx = mp.get_context('forkserver')
with futures.ProcessPoolExecutor(max_workers=1, mp_context=ctx) as executor:
    result = executor.submit(run_application, request_dict).result()
res = result.get('data')

Contexts and start methods

相关问题 更多 >

    热门问题