如何让Python持续填充多个子进程线程?

1 投票
3 回答
1693 浏览
提问于 2025-04-16 10:38

我在Linux上运行一个叫foo的应用程序。在一个Bash脚本或终端命令行中,我的应用程序是多线程运行的,使用的命令是:

$ foo -config x.ini -threads 4 < inputfile

系统监视器和top命令显示,foo的CPU负载平均大约是380%(因为我用的是四核机器)。我用Python 2.6x重新实现了这个功能,代码是:

proc = subprocess.Popen("foo -config x.ini -threads 4", \
        shell=True, stdin=subprocess.PIPE, \
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
mylist = ['this','is','my','test','app','.']
for line in mylist:
    txterr = ''
    proc.stdin.write(line.strip()+'\n')
    while not proc.poll() and not txterr.count('Finished'):
        txterr += subproc.stderr.readline()
    print proc.stdout.readline().strip(),

但是foo运行得比较慢,top命令显示CPU负载只有100%。即使把shell设置为False,foo运行得也很慢:

proc = subprocess.Popen("foo -config x.ini -threads 4".split(), \
        shell=False, stdin=subprocess.PIPE, \
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

有没有办法让Python的子进程持续利用所有的线程呢?

3 个回答

1

首先,你是不是因为CPU使用率达到100%而猜测它是单线程的,而不是400%?

其实,最好用top这个程序来检查一下它启动了多少个线程,按下H键可以显示线程信息。或者,你也可以使用ps -eLf命令,确保NLWP这一列显示了多个线程。

在Linux系统中,CPU的亲和性可能会有点问题;默认情况下,调度程序不会把一个进程从它最后使用的处理器上移动开。这意味着,如果你的程序的四个线程都是在同一个处理器上启动的,它们将永远共享这个处理器。你需要使用像taskset(1)这样的工具来强制进程在不同的处理器上运行,特别是当它们需要长时间运行时。例如,taskset -p <pid1> -c 0 ; taskset -p <pid2> -c 1 ; taskset -p <pid3> -c 2 ; taskset -p <pid4> -c 3

你可以用taskset -p <pid>来查看当前的亲和性设置。

(有一天我在想,为什么我的Folding At Home进程使用的CPU时间远低于我的预期,结果发现调度程序把三个FaH任务放在了同一个超线程的兄弟上,而第四个任务则在另一个超线程的兄弟上,都是在同一个核心上。其他三个处理器都闲着呢。(第一个核心的温度很高,而其他三个核心的温度低了四五度。哈哈。))

4

当你用Popen来调用一个命令时,不管是从Python里调用,还是从命令行里调用,其实都没什么区别。是“foo”这个命令自己启动了它的进程,而不是Python在启动它。

所以答案是“是的,当从Python调用时,子进程可以是多线程的。”

0

如果你的Python脚本没有及时给foo进程提供数据,那么你可以把读取标准输出和错误输出的工作交给线程来处理:

from Queue import Empty, Queue
from subprocess import PIPE, Popen
from threading import Thread

def start_thread(target, *args):
    t = Thread(target=target, args=args)
    t.daemon = True
    t.start()
    return t

def signal_completion(queue, stderr):
    for line in iter(stderr.readline, ''):
        if 'Finished' in line:
           queue.put(1) # signal completion
    stderr.close()

def print_stdout(q, stdout):
    """Print stdout upon receiving a signal."""
    text = []
    for line in iter(stdout.readline, ''):
        if not q.empty():
           try: q.get_nowait()               
           except Empty:
               text.append(line) # queue is empty
           else: # received completion signal              
               print ''.join(text),
               text = []
               q.task_done()
        else: # buffer stdout until the task is finished
            text.append(line)
    stdout.close()
    if text: print ''.join(text), # print the rest unconditionally

queue = Queue()
proc = Popen("foo -config x.ini -threads 4".split(), bufsize=1,
             stdin=PIPE, stdout=PIPE, stderr=PIPE)
threads =  [start_thread(print_stdout, queue, proc.stdout)]
threads += [start_thread(signal_completion, queue, proc.stderr)]

mylist = ['this','is','my','test','app','.']
for line in mylist:
    proc.stdin.write(line.strip()+'\n')
proc.stdin.close()
proc.wait()
for t in threads: t.join() # wait for stdout

撰写回答