让子进程允许RPC服务器重启而子进程存活

4 投票
1 回答
711 浏览
提问于 2025-04-20 08:24

场景

我有一个rpc服务器,需要启动一些重要的进程(multiprocessing.Process),这些进程会持续好几天。出于安全考虑,我不希望这些进程的存活依赖于rpc服务器。因此,我希望服务器可以在进程运行时崩溃并重新启动。

孤儿进程

这个问题可以通过以下方法解决(注意:不要在不想丢失之前工作的地方粘贴,这会关闭你的python会话):

import os
import multiprocessing
import time

def _job(data):
    for _ in range(3):
        print multiprocessing.current_process(), "is working"
        time.sleep(2)
    print multiprocessing.current_process(), "is done"

#My real worker gets a Connection-object as part of a
#multiprocessing.Pipe among other arguments
worker = multiprocessing.Process(target=_job, args=(None,))
worker.daemon = True
worker.start()
os._exit(0)

问题:如果工作进程还在,关闭rpc服务器的socket

退出主进程似乎对关闭socket的问题没有帮助。因此,为了说明服务器重启的问题,模拟的方式是在第一个服务器关闭后,启动一个参数完全相同的第二个服务器。

以下代码运行得很好:

import SimpleXMLRPCServer
HOST = "127.0.0.1"
PORT = 45212
s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT))
s.server_close()
s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT))
s.server_close()

然而,如果创建了一个工作进程,就会出现一个socket.error错误,提示socket已经在使用中:

s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT))
worker = multiprocessing.Process(target=_job, args=(None,))
worker.start()
s.server_close()
s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT)) #raises socket.error
worker.join()
s.server_close()

手动关闭服务器的socket是有效的:

import socket
s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT))
worker = multiprocessing.Process(target=_job, args=(None,))
worker.start()
s.socket.shutdown(socket.SHUT_RDWR)
s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT))
worker.join()
s.server_close()

但这种行为让我很担心。我并没有以任何方式将socket传递给工作进程,但看起来它似乎还是获取到了。

之前有类似的问题被提问,但那些问题通常是将socket传递给工作进程,而我这里并不想这样。如果我把socket传递过去,我可以在工作进程中关闭它,从而绕过shutdown的处理:

def _job2(notMySocket):
    notMySocket.close()
    for _ in range(3):
        print multiprocessing.current_process(), "is working"
        time.sleep(2)
    print multiprocessing.current_process(), "is done"

s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT))
worker = multiprocessing.Process(target=_job2, args=(s.socket,))
worker.start()
time.sleep(0.1) #Just to be sure worker gets to close socket in time
s.server_close()
s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT)) 
worker.join()
s.server_close()

但服务器的socket根本没有理由去访问工作进程。我对这个解决方案一点也不满意,即使它是目前为止最好的选择。

问题

有没有办法限制在使用multiprocessing.Process时,只有我想传递给目标的内容被复制,而不是所有打开的socket和其他东西?

在我的情况下,要让这段代码正常工作:

s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT))
childPipe, parentPipe = multiprocessing.Pipe()
worker = multiprocessing.Process(target=_job, args=(childPipe,))
worker.start()
s.server_close()
s = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT)) #raises socket.error
worker.join()
s.server_close()

1 个回答

1

如果你在用Python 2.x,我觉得在Posix平台上是没办法避免继承的问题的。os.fork总是会用来创建新进程,这就意味着父进程的所有状态都会被复制到子进程里。你能做的就是在子进程里立即关闭套接字,这也是你现在正在做的。要避免这种继承,唯一的方法就是在启动服务器之前就先启动进程。你可以尝试提前启动Process,然后用multiprocessing.Queue来传递工作项(而不是用args这个参数),或者用multiprocessing.Event来表示它应该开始工作。是否能做到这一点,取决于你需要传给子进程的内容。

不过,如果你在用Python 3.4及以上版本(或者可以升级到3.4及以上),你可以使用spawnforkserver上下文来避免套接字被继承。

spawn

父进程会启动一个全新的Python解释器进程。子进程只会继承运行进程对象的run()方法所需的资源。特别是,父进程中不必要的文件描述符和句柄不会被继承。用这种方法启动进程相对来说比较慢,跟使用fork或forkserver比起来。

这种方法在Unix和Windows上都可以用。在Windows上是默认的。

forkserver

当程序启动并选择forkserver启动方法时,会启动一个服务器进程。从那时起,每当需要新进程时,父进程就会连接到服务器,请求它来创建一个新进程。fork服务器进程是单线程的,所以使用os.fork()是安全的。不会继承不必要的资源。

示例:

def _job2():
    for _ in range(3):
        print multiprocessing.current_process(), "is working"
        time.sleep(2)
    print multiprocessing.current_process(), "is done"

ctx = multiprocessing.get_context('forkserver')
worker = ctx.Process(target=_job2)
worker.start()

撰写回答