如何让Python持续填充多个子进程线程?
我在Linux上运行一个叫foo的应用程序。在一个Bash脚本或终端命令行中,我的应用程序是多线程运行的,使用的命令是:
$ foo -config x.ini -threads 4 < inputfile
系统监视器和top命令显示,foo的CPU负载平均大约是380%(因为我用的是四核机器)。我用Python 2.6x重新实现了这个功能,代码是:
proc = subprocess.Popen("foo -config x.ini -threads 4", \
shell=True, stdin=subprocess.PIPE, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
mylist = ['this','is','my','test','app','.']
for line in mylist:
txterr = ''
proc.stdin.write(line.strip()+'\n')
while not proc.poll() and not txterr.count('Finished'):
txterr += subproc.stderr.readline()
print proc.stdout.readline().strip(),
但是foo运行得比较慢,top命令显示CPU负载只有100%。即使把shell设置为False,foo运行得也很慢:
proc = subprocess.Popen("foo -config x.ini -threads 4".split(), \
shell=False, stdin=subprocess.PIPE, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
有没有办法让Python的子进程持续利用所有的线程呢?
3 个回答
首先,你是不是因为CPU使用率达到100%而猜测它是单线程的,而不是400%?
其实,最好用top
这个程序来检查一下它启动了多少个线程,按下H
键可以显示线程信息。或者,你也可以使用ps -eLf
命令,确保NLWP
这一列显示了多个线程。
在Linux系统中,CPU的亲和性可能会有点问题;默认情况下,调度程序不会把一个进程从它最后使用的处理器上移动开。这意味着,如果你的程序的四个线程都是在同一个处理器上启动的,它们将永远共享这个处理器。你需要使用像taskset(1)
这样的工具来强制进程在不同的处理器上运行,特别是当它们需要长时间运行时。例如,taskset -p <pid1> -c 0 ; taskset -p <pid2> -c 1 ; taskset -p <pid3> -c 2 ; taskset -p <pid4> -c 3
。
你可以用taskset -p <pid>
来查看当前的亲和性设置。
(有一天我在想,为什么我的Folding At Home进程使用的CPU时间远低于我的预期,结果发现调度程序把三个FaH任务放在了同一个超线程的兄弟上,而第四个任务则在另一个超线程的兄弟上,都是在同一个核心上。其他三个处理器都闲着呢。(第一个核心的温度很高,而其他三个核心的温度低了四五度。哈哈。))
当你用Popen来调用一个命令时,不管是从Python里调用,还是从命令行里调用,其实都没什么区别。是“foo”这个命令自己启动了它的进程,而不是Python在启动它。
所以答案是“是的,当从Python调用时,子进程可以是多线程的。”
如果你的Python脚本没有及时给foo
进程提供数据,那么你可以把读取标准输出和错误输出的工作交给线程来处理:
from Queue import Empty, Queue
from subprocess import PIPE, Popen
from threading import Thread
def start_thread(target, *args):
t = Thread(target=target, args=args)
t.daemon = True
t.start()
return t
def signal_completion(queue, stderr):
for line in iter(stderr.readline, ''):
if 'Finished' in line:
queue.put(1) # signal completion
stderr.close()
def print_stdout(q, stdout):
"""Print stdout upon receiving a signal."""
text = []
for line in iter(stdout.readline, ''):
if not q.empty():
try: q.get_nowait()
except Empty:
text.append(line) # queue is empty
else: # received completion signal
print ''.join(text),
text = []
q.task_done()
else: # buffer stdout until the task is finished
text.append(line)
stdout.close()
if text: print ''.join(text), # print the rest unconditionally
queue = Queue()
proc = Popen("foo -config x.ini -threads 4".split(), bufsize=1,
stdin=PIPE, stdout=PIPE, stderr=PIPE)
threads = [start_thread(print_stdout, queue, proc.stdout)]
threads += [start_thread(signal_completion, queue, proc.stderr)]
mylist = ['this','is','my','test','app','.']
for line in mylist:
proc.stdin.write(line.strip()+'\n')
proc.stdin.close()
proc.wait()
for t in threads: t.join() # wait for stdout