Windows上的异步子进程
首先,我要解决的整体问题比我在这里展示的要复杂一些,所以请不要告诉我“使用阻塞线程”,因为这并不能解决我实际遇到的情况,除非我进行大量的重写和重构。
我有几个应用程序,这些程序不是我能修改的,它们从标准输入(stdin)读取数据,然后经过处理后输出到标准输出(stdout)。我的任务是把这些程序串联起来。问题是,有时候它们会卡住,因此我需要跟踪它们的进度,而这些进度信息是通过标准错误输出(STDERR)输出的。
pA = subprocess.Popen(CommandA, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# ... some more processes make up the chain, but that is irrelevant to the problem
pB = subprocess.Popen(CommandB, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=pA.stdout )
现在,直接通过 pA.stdout.readline() 和 pB.stdout.readline(),或者使用普通的 read() 函数来读取数据是会阻塞的。因为不同的应用程序输出的速度和格式都不一样,所以阻塞不是一个选项。(而且正如我上面所说,除非万不得已,否则不考虑线程。)pA.communicate()
是安全的,但因为我需要实时的信息,所以这也不是一个选项。
因此,谷歌让我找到了这个 异步子进程代码片段,它在 ActiveState 上。
一开始一切都很好,直到我实现它。比较 cmd.exe 输出的 pA.exe | pB.exe
,虽然它们都输出到同一个窗口,导致有点混乱,但我看到更新非常及时。然而,当我使用上面的代码片段和那里的 read_some()
函数实现同样的功能时,通知单个管道更新的时间超过了10秒。但当它更新时,进度信息却能显示到40%这样的程度。
因此,我进行了更多的研究,发现了许多关于 PeekNamedPipe、匿名句柄,以及即使管道中有信息可用却返回0字节可用的主题。由于这个问题超出了我的专业知识范围,我来到 Stack Overflow 寻求帮助。:)
我的平台是64位的Windows 7,使用Python 2.6,这些应用程序是32位的,如果这有影响的话,兼容Unix不是问题。如果这是唯一的解决方案,我甚至可以处理一个完全使用 ctypes 或 pywin32 的解决方案,只要我能异步读取每个标准错误管道,并且性能立即,没有死锁。:)
3 个回答
那使用Twisted的FD怎么样? http://twistedmatrix.com/documents/8.1.0/api/twisted.internet.fdesc.html
它不是异步的,但它是非阻塞的。对于异步的内容,你能把它转换成使用Twisted吗?
我假设如果只使用标准输入(stdin)和标准输出(stdout),那么这个处理流程不会出现死锁。你现在要解决的问题是,如何在写入标准错误(stderr)时避免死锁,并且还要处理标准错误可能会被堵塞的情况。
如果你让多个进程同时写入标准错误,你需要注意它们的输出可能会混在一起。我猜你已经想办法解决了这个问题,只是提醒一下以确保万无一失。
要注意python的-u选项;在测试时,这个选项可以帮助你查看操作系统的缓存是否在干扰你的程序。
如果你想在Windows上模拟select()来处理文件句柄,你唯一的选择是使用PeekNamedPipe()等函数。我有一段代码可以同时读取多个进程的行输出,你甚至可以直接使用它——试着把proc.stderr的句柄列表传给它,然后就可以了。
class NoLineError(Exception): pass
class NoMoreLineError(Exception): pass
class LineReader(object):
"""Helper class for multi_readlines."""
def __init__(self, f):
self.fd = f.fileno()
self.osf = msvcrt.get_osfhandle(self.fd)
self.buf = ''
def getline(self):
"""Returns a line of text, or raises NoLineError, or NoMoreLineError."""
try:
_, avail, _ = win32pipe.PeekNamedPipe(self.osf, 0)
bClosed = False
except pywintypes.error:
avail = 0
bClosed = True
if avail:
self.buf += os.read(self.fd, avail)
idx = self.buf.find('\n')
if idx >= 0:
ret, self.buf = self.buf[:idx+1], self.buf[idx+1:]
return ret
elif bClosed:
if self.buf:
ret, self.buf = self.buf, None
return ret
else:
raise NoMoreLineError
else:
raise NoLineError
def multi_readlines(fs, timeout=0):
"""Read lines from |fs|, a list of file objects.
The lines come out in arbitrary order, depending on which files
have output available first."""
if type(fs) not in (list, tuple):
raise Exception("argument must be a list.")
objs = [LineReader(f) for f in fs]
for i,obj in enumerate(objs): obj._index = i
while objs:
yielded = 0
for i,obj in enumerate(objs):
try:
yield (obj._index, obj.getline())
yielded += 1
except NoLineError:
#time.sleep(timeout)
pass
except NoMoreLineError:
del objs[i]
break # Because we mutated the array
if not yielded:
time.sleep(timeout)
pass
我自己从来没有遇到过“Peek返回0字节但数据可用”的问题。如果其他人遇到这种情况,我敢打赌是他们的libc在将数据发送到操作系统之前对stdout/stderr进行了缓存;对此你从外部是无能为力的。你必须让应用程序以某种方式使用无缓存输出(比如对python使用-u选项;在win32/libc中修改stderr文件句柄,等等)。
你看到没有输出,然后突然有大量更新,这让我觉得问题出在源头的缓存上。如果win32的libc在写入管道而不是控制台时缓存方式不同,这也是可能的。再次强调,从这些程序外部你能做的就是尽量清空它们的输出。
使用线程到底有多糟糕呢?我遇到过类似的问题,最后决定用线程来收集子进程的标准输出和标准错误,然后把这些数据放到一个线程安全的队列里,这样主线程就可以安全地读取这些数据,而不用担心后台的线程在忙碌。
你担心用线程和阻塞的解决方案会有什么麻烦吗?你是不是在担心要让你其他的代码也变得线程安全?其实这不成问题,因为IO线程不需要和你其他的代码或数据互动。如果你的内存要求非常严格,或者你的处理流程特别长,可能会觉得创建这么多线程不太好。我对你的具体情况了解不多,所以不能确定这是否会成为问题,但我觉得既然你已经在创建额外的进程,增加几个线程来和它们互动应该不会太麻烦。在我的情况下,我发现这些IO线程并没有特别麻烦。
我的线程函数大概是这样的:
def simple_io_thread(pipe, queue, tag, stop_event):
"""
Read line-by-line from pipe, writing (tag, line) to the
queue. Also checks for a stop_event to give up before
the end of the stream.
"""
while True:
line = pipe.readline()
while True:
try:
# Post to the queue with a large timeout in case the
# queue is full.
queue.put((tag, line), block=True, timeout=60)
break
except Queue.Full:
if stop_event.isSet():
break
continue
if stop_event.isSet() or line=="":
break
pipe.close()
当我启动子进程时,我会这样做:
outputqueue = Queue.Queue(50)
stop_event = threading.Event()
process = subprocess.Popen(
command,
cwd=workingdir,
env=env,
shell=useshell,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stderr_thread = threading.Thread(
target=simple_io_thread,
args=(process.stderr, outputqueue, "STDERR", stop_event)
)
stdout_thread = threading.Thread(
target=simple_io_thread,
args=(process.stdout, outputqueue, "STDOUT", stop_event)
)
stderr_thread.daemon = True
stdout_thread.daemon = True
stderr_thread.start()
stdout_thread.start()
然后当我想读取数据时,我可以直接在outputqueue上阻塞——从中读取的每一项都包含一个字符串,用来标识它来自哪个管道,以及那条管道的一行文本。几乎没有代码是在单独的线程中运行的,它只通过一个线程安全的队列与主线程沟通(还有一个事件,以防我需要提前放弃)。也许这种方法会对你有帮助,让你在使用线程和阻塞的同时,不用重写很多代码?
(我的解决方案更复杂一些,因为我有时希望提前终止子进程,并且想确保所有线程都能完成。如果这不是问题,你可以去掉所有的stop_event相关的东西,这样就简单多了。)