如何在多处理器系统上生成并行子进程?

48 投票
4 回答
69455 浏览
提问于 2025-04-15 11:42

我有一个Python脚本,想用它来控制另一个Python脚本。我有一台有64个处理器的服务器,所以我想同时启动最多64个这个第二个Python脚本的子进程。这个子脚本的名字是:

$ python create_graphs.py --name=NAME

其中NAME可以是像XYZ、ABC、NYU这样的名字。

在我的主控制脚本中,我从一个列表中获取名字变量:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

所以我的问题是,启动这些子进程的最佳方法是什么?我想把同时运行的子进程数量限制在64个,所以我需要跟踪每个子进程的状态(也就是它是否完成),这样我才能高效地保持整个进程的运行。

我考虑过使用subprocess这个包,但因为它一次只能启动一个子进程,所以我放弃了。最后我找到了multiprocessor这个包,但我承认看那些关于线程和子进程的文档让我感到有些困惑。

现在,我的脚本使用subprocess.call一次只启动一个子进程,代码是这样的:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

我真的希望它能一次启动64个子进程。在其他的StackOverflow问题中,我看到有人使用Queue,但听起来那样会影响性能?

4 个回答

1

我觉得你不需要使用队列,除非你打算从应用程序中提取数据(如果你真的想要数据,我觉得把它放到数据库里可能更简单)。

不过你可以试试这个方法:

把你的 create_graphs.py 脚本里的所有内容放到一个叫做 "create_graphs" 的函数里。

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

我知道这样会比处理器少一个线程,这可能是个好事,因为这样可以留一个处理器来管理线程、磁盘输入输出和电脑上发生的其他事情。如果你决定想用最后一个核心,只需加一即可。

补充说明:我觉得我可能误解了 my_list 的用途。其实你根本不需要 my_list 来跟踪线程(因为所有线程都可以通过 threads 列表中的项目来引用)。不过,这确实是给进程输入的一个好方法——或者更好的是:使用生成器函数;)

my_listthreads 的作用

my_list 保存了你在函数中需要处理的数据。
threads 只是当前正在运行的线程的列表。

while 循环做了两件事:启动新的线程来处理数据,并检查是否有线程已经完成运行。

所以只要你有 (a) 更多的数据需要处理,或者 (b) 还有线程没有完成运行……你希望程序继续运行。一旦两个列表都为空,它们会被评估为 False,while 循环就会结束。

3

这是我根据Nadia和Jim的评论想出来的解决方案。我不确定这是不是最好的方法,但它确实有效。因为我需要使用一些第三方应用程序,包括Matlab,所以原来的子脚本必须是一个shell脚本。因此,我不得不把它从Python中拿出来,用bash来编写。

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

这个解决方案听起来合理吗?我试着用Jim的while循环格式,但我的脚本什么都没返回。我不太明白为什么会这样。当我用Jim的'while'循环替换掉'for'循环运行脚本时,输出是这样的:

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

而当我用'for'循环运行时,得到的结果就更有意义了:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

所以这个方法是有效的,我很满意。不过,我还是不明白为什么我不能用Jim的'while'循环,而必须用我现在用的'for'循环。谢谢大家的帮助,我对stackoverflow的知识面感到很惊讶。

70

你要找的是在多进程处理(multiprocessing)中使用的 进程池 类。

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

这里有一个计算的例子,帮助你更好地理解。下面的代码会把10000个任务分配到N个进程中,其中N是你的CPU数量。注意,我在这里传入了None作为进程的数量。这会让进程池类自动使用CPU的数量来决定进程的数量(参考)。

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results

撰写回答