在stdout/stderr文件描述符上调用os.fsync会终止子进程

3 投票
2 回答
1368 浏览
提问于 2025-04-16 21:33

我在用Python的 subprocess 库创建一个子进程后,想通过 stderr 把一些序列化的数据从子进程传递到父进程。接着,我希望父进程能通过 stdin 返回一个函数对这些数据处理后的结果。

简单来说,我在子进程里有一个函数,做的事情大概是这样的:

sys.stderr.write("some stuff to write")
# some time later
some_var = sys.stdin.read()

但是,这样做会让父进程一直等待 stderr 的输入,导致它被锁住,所以我尝试调用:

sys.stderr.flush()
os.fsync(sys.stderr.fileno())

可是,这样并没有成功。在 os.fsync 之后的代码都没有执行。而且,当我在父进程里调用 proc.poll() 时,发现子进程的返回码是1。

我该怎么做才能避免这个问题呢?是不是该考虑其他的方法?

2 个回答

1

这里有一种方法可以防止输入输出死锁,而不需要大幅度改变你的做法。在子进程中:

import os, fcntl
map(lambda f: fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK),
    (sys.stdin, sys.stdout, sys.stderr))

在父进程中,对与子进程之间的管道文件描述符也做同样的处理。这样,当你使用 select.select() 来进行通信时,就不会再出现这些锁死的情况了。不过,你必须在写入之前就使用 select(),因为在你尝试读写时可能会收到 EAGAIN 错误,根据你的应用逻辑,可能会出现无限等待的情况。

我真心建议你看看 Twisted 框架,它内置了子进程的功能:http://twistedmatrix.com/documents/current/core/howto/process.html

2

我建议你考虑另一种方法。你可以使用一个独立的进程(multiprocessing.Process),并通过两个队列来和它沟通(multiprocessing.Queue),一个队列用来接收输入,另一个用来发送输出。

import multiprocessing

def processWorker(input, result):
    work = input.get()
    print work
    result.put(work*work)

input  = multiprocessing.Queue()
result = multiprocessing.Queue()

p = multiprocessing.Process(target = processWorker, args = (input, result))
p.start()

input.put(2)
res = result.get(block = True)
print res

然后你可以继续循环传递数据。使用multiprocessing.Queue会更可靠,因为你不需要依赖于标准输出或错误输出的解析,这样也避免了一些相关的限制。此外,你还可以更轻松地管理多个子进程。

另外,你还可以设置一个超时时间,来限制获取数据的最长等待时间,比如:

import queue
try:
    res = result.get(block = True, timeout = 10)
except Queue.Empty:
    print error

撰写回答