在stdout/stderr文件描述符上调用os.fsync会终止子进程
我在用Python的 subprocess
库创建一个子进程后,想通过 stderr
把一些序列化的数据从子进程传递到父进程。接着,我希望父进程能通过 stdin
返回一个函数对这些数据处理后的结果。
简单来说,我在子进程里有一个函数,做的事情大概是这样的:
sys.stderr.write("some stuff to write")
# some time later
some_var = sys.stdin.read()
但是,这样做会让父进程一直等待 stderr
的输入,导致它被锁住,所以我尝试调用:
sys.stderr.flush()
os.fsync(sys.stderr.fileno())
可是,这样并没有成功。在 os.fsync
之后的代码都没有执行。而且,当我在父进程里调用 proc.poll()
时,发现子进程的返回码是1。
我该怎么做才能避免这个问题呢?是不是该考虑其他的方法?
2 个回答
这里有一种方法可以防止输入输出死锁,而不需要大幅度改变你的做法。在子进程中:
import os, fcntl
map(lambda f: fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK),
(sys.stdin, sys.stdout, sys.stderr))
在父进程中,对与子进程之间的管道文件描述符也做同样的处理。这样,当你使用 select.select()
来进行通信时,就不会再出现这些锁死的情况了。不过,你必须在写入之前就使用 select(),因为在你尝试读写时可能会收到 EAGAIN
错误,根据你的应用逻辑,可能会出现无限等待的情况。
我真心建议你看看 Twisted 框架,它内置了子进程的功能:http://twistedmatrix.com/documents/current/core/howto/process.html
我建议你考虑另一种方法。你可以使用一个独立的进程(multiprocessing.Process),并通过两个队列来和它沟通(multiprocessing.Queue),一个队列用来接收输入,另一个用来发送输出。
import multiprocessing
def processWorker(input, result):
work = input.get()
print work
result.put(work*work)
input = multiprocessing.Queue()
result = multiprocessing.Queue()
p = multiprocessing.Process(target = processWorker, args = (input, result))
p.start()
input.put(2)
res = result.get(block = True)
print res
然后你可以继续循环传递数据。使用multiprocessing.Queue会更可靠,因为你不需要依赖于标准输出或错误输出的解析,这样也避免了一些相关的限制。此外,你还可以更轻松地管理多个子进程。
另外,你还可以设置一个超时时间,来限制获取数据的最长等待时间,比如:
import queue
try:
res = result.get(block = True, timeout = 10)
except Queue.Empty:
print error