Python:异步打印多个子进程的stdout
我正在测试一种方法,想要在Python 2.7中打印出多个子进程的标准输出。现在我设置了一个主进程,它会启动三个子进程,并输出它们的结果。每个子进程都是一个循环,它会随机睡眠一段时间,醒来后会说“睡了X秒”。
我遇到的问题是,打印输出似乎是同步的。比如,子进程A睡了1秒,子进程B睡了3秒,而子进程C睡了10秒。当主进程在检查子进程C是否有输出时,它会停下来整整10秒,尽管其他两个子进程可能已经睡醒并打印了输出。这是为了模拟一个子进程在较长时间内确实没有输出的情况。
我需要一个在Windows上可用的解决方案。
我的代码如下:
main_process.py
import sys
import subprocess
logfile = open('logfile.txt', 'w')
processes = [
subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1),
]
while True:
line = processes[0].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)
line = processes[1].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)
line = processes[2].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)
#If everyone is dead, break
if processes[0].poll() is not None and \
processes[1].poll() is not None and \
processes[2].poll() is not None:
break
processes[0].wait()
processes[1].wait()
print 'Done'
subproc_1.py/subproc_2.py/subproc_3.py
import time, sys, random
sleep_time = random.random() * 3
for x in range(0, 20):
print "[PROC1] Slept for {0} seconds".format(sleep_time)
sys.stdout.flush()
time.sleep(sleep_time)
sleep_time = random.random() * 3 #this is different for each subprocess.
更新:解决方案
结合下面的答案和这个问题,这个方法应该有效。
import sys
import subprocess
from threading import Thread
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # for Python 3.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
if __name__ == '__main__':
logfile = open('logfile.txt', 'w')
processes = [
subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1),
]
q = Queue()
threads = []
for p in processes:
threads.append(Thread(target=enqueue_output, args=(p.stdout, q)))
for t in threads:
t.daemon = True
t.start()
while True:
try:
line = q.get_nowait()
except Empty:
pass
else:
sys.stdout.write(line)
logfile.write(line)
logfile.flush()
#break when all processes are done.
if all(p.poll() is not None for p in processes):
break
print 'All processes done'
我不确定在while循环结束时是否需要任何清理代码。如果有人对此有意见,请补充。
每个子进程的脚本看起来都类似于这个(我为了更好的示例进行了编辑):
import datetime, time, sys, random
for x in range(0, 20):
sleep_time = random.random() * 3
time.sleep(sleep_time)
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%H%M%S.%f')
print "[{0}][PROC1] Slept for {1} seconds".format(timestamp, sleep_time)
sys.stdout.flush()
print "[{0}][PROC1] Done".format(timestamp)
sys.stdout.flush()
1 个回答
你的问题出在 readline()
是一个阻塞函数;也就是说,如果你在一个文件对象上调用它,而没有可供读取的行,它就会一直等到有输出行才会返回结果。所以你现在的代码会依次从子进程1、2和3读取,直到每个进程的输出准备好才会继续。
(编辑:提问者澄清他们在使用Windows,这使得下面的内容不适用。)
如果你想从任何一个准备好的输出流中读取,你需要以非阻塞的方式检查这些流的状态,使用 select
模块,然后只对那些准备好的流进行读取。select
提供了多种方式来实现这一点,但为了举例,我们将使用 select.select()
。在启动你的子进程后,你的代码大致会像这样:
streams = [p.stdout for p in processes]
def output(s):
for f in [sys.stdout, logfile]:
f.write(s)
f.flush()
while True:
rstreams, _, _ = select.select(streams, [], [])
for stream in rstreams:
line = stream.readline()
output(line)
if all(p.poll() is not None for p in processes):
break
for stream in streams:
output(stream.read())
当你用三个文件对象(或文件描述符)调用 select()
时,它会返回三个参数的子集,这些子集分别是准备好读取的流、准备好写入的流,或者有错误的流。因此在每次循环中,我们会检查哪些输出流准备好读取,然后只遍历这些流。接着我们再重复这个过程。(注意,这里很重要的一点是你要对输出进行行缓冲;上面的代码假设如果一个流准备好读取,就至少有一整行可以读取。如果你指定了不同的缓冲方式,上面的代码可能会阻塞。)
你原始代码的另一个问题是:当你在 poll()
报告所有子进程都已退出后退出循环时,可能并没有读取到它们的所有输出。所以你需要最后再检查一遍流,以读取任何剩余的输出。
注意:我给出的示例代码并没有特别努力地按照输出可用的顺序捕获子进程的输出(这不可能做到完美,但可以比上面的代码更接近)。它也缺少其他一些细节(例如,在主循环中,它会继续选择每个子进程的标准输出,即使有些已经结束,这虽然没什么害处,但效率不高)。这只是为了说明一种基本的非阻塞输入输出技术。