可以在进程间共享子进程管道吗(Celery 工作进程)?
我遇到了一个问题:
我在运行celery的时候,启动了很多工作进程。在celery启动时,我创建了一些子进程:
proc = subprocess.Popen("program", stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
我需要这些子进程能够启动并且在后续被celery的工作进程多次使用。所以我把子进程保存到一个叫做multiprocessing.Manager().dict()的地方,类似于一个池子...
pool = multiprocessing.Manager().dict()
pool[proc_id] = proc
所有的子进程都可以被celery的工作进程访问,但它们并不能正常工作。我发现当通过池子共享子进程时,管道就会坏掉。第一个问题是:有没有办法在不同的进程(celery工作进程)之间共享子进程的管道?
我还尝试把管道保存到一个普通的字典里。然后当工作进程从池子里获取子进程时,这些管道就会连接到子进程上:
proc.stdin = dict_of_pipes[proc_id]
这个方法有时候能用,但有时候字典里找不到管道。我猜是因为在进程之间共享普通字典是不太行的?
你可以把“程序”想象成/bin/bash。锁定问题已经解决,字典一次只会被一个进程访问...
第二个问题是:有没有可能为子进程打开一个新的管道?(从任何celery工作进程都可以吗?)或者有没有其他的解决办法?
2 个回答
这是可能的,你可以通过一个已经存在的管道发送新的管道。关于这个问题,有一个讨论:Python 2.6 通过队列/管道等发送连接对象
那个回答对我有用。
# Somewhere in the main process code
#
#
in, out = Pipe()
reduced = reduction.reduce_connection(out)
in_old_pipe.send(reduced)
.
# Somewhere else in the subprocess code
.
.
reduced = out_old_pipe.recv()
newi = reduced[0](*reduced[1])
这样,你就可以用一个主管道来连接新创建的子进程。
经过一些实验,我发现无法打开一个已经存在的子进程的管道(这是我的第二个问题),而且我也不能在进程之间共享已经存在的管道(这是我主要的问题)。
所以我这样解决的:每个子进程都用Python的multiprocessing.Process包裹起来,这个包裹实现了XML RPC服务器。这些“包裹”在celery启动时会被启动,或者在celery工作进程需要的时候随时启动。
当包裹进程启动后,它会通过multiprocessing.Pipe发送它运行的端口,这些端口会保存在一个共享的池子里(multiprocessing.Manager().dict())。这样,celery工作进程就可以通过XML RPC包裹来调用正在运行的子进程,而不用担心管道的问题。
虽然XML RPC不是必须的,但它让代码变得更简单,也更容易使用。