可以在进程间共享子进程管道吗(Celery 工作进程)?

0 投票
2 回答
1453 浏览
提问于 2025-04-18 07:53

我遇到了一个问题:

我在运行celery的时候,启动了很多工作进程。在celery启动时,我创建了一些子进程:

proc = subprocess.Popen("program", stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)

我需要这些子进程能够启动并且在后续被celery的工作进程多次使用。所以我把子进程保存到一个叫做multiprocessing.Manager().dict()的地方,类似于一个池子...

pool = multiprocessing.Manager().dict()
pool[proc_id] = proc

所有的子进程都可以被celery的工作进程访问,但它们并不能正常工作。我发现当通过池子共享子进程时,管道就会坏掉。第一个问题是:有没有办法在不同的进程(celery工作进程)之间共享子进程的管道?

我还尝试把管道保存到一个普通的字典里。然后当工作进程从池子里获取子进程时,这些管道就会连接到子进程上:

proc.stdin = dict_of_pipes[proc_id]

这个方法有时候能用,但有时候字典里找不到管道。我猜是因为在进程之间共享普通字典是不太行的?

你可以把“程序”想象成/bin/bash。锁定问题已经解决,字典一次只会被一个进程访问...

第二个问题是:有没有可能为子进程打开一个新的管道?(从任何celery工作进程都可以吗?)或者有没有其他的解决办法?

2 个回答

0

这是可能的,你可以通过一个已经存在的管道发送新的管道。关于这个问题,有一个讨论:Python 2.6 通过队列/管道等发送连接对象

那个回答对我有用。

# Somewhere in the main process code
#
#
in, out = Pipe()
reduced = reduction.reduce_connection(out)
in_old_pipe.send(reduced)
.
# Somewhere else in the subprocess code
.
.
reduced = out_old_pipe.recv()
newi = reduced[0](*reduced[1])

这样,你就可以用一个主管道来连接新创建的子进程。

3

经过一些实验,我发现无法打开一个已经存在的子进程的管道(这是我的第二个问题),而且我也不能在进程之间共享已经存在的管道(这是我主要的问题)。

所以我这样解决的:每个子进程都用Python的multiprocessing.Process包裹起来,这个包裹实现了XML RPC服务器。这些“包裹”在celery启动时会被启动,或者在celery工作进程需要的时候随时启动。

当包裹进程启动后,它会通过multiprocessing.Pipe发送它运行的端口,这些端口会保存在一个共享的池子里(multiprocessing.Manager().dict())。这样,celery工作进程就可以通过XML RPC包裹来调用正在运行的子进程,而不用担心管道的问题。

虽然XML RPC不是必须的,但它让代码变得更简单,也更容易使用。

撰写回答