Python 多进程 工作/队列

7 投票
2 回答
24753 浏览
提问于 2025-04-17 21:11

我有一个Python函数需要总共运行12次。目前我使用的是multiprocessing库中的Pool来并行运行这些任务。通常我一次运行6个,因为这个函数对CPU的要求很高,同时运行12个往往会导致程序崩溃。当我一次运行6个时,第二组6个任务会等到第一组的6个任务全部完成后才开始。理想情况下,我们希望在第一组的6个任务中有一个完成后,能立即开始下一个任务(比如第7个),这样就能保持6个任务同时运行,而还有更多的任务可以开始。目前代码是这样的(会被调用两次,第一次传入前6个元素,第二次传入后6个元素):

from multiprocessing import Pool

def start_pool(project_list):

    pool = Pool(processes=6)
    pool.map(run_assignments_parallel,project_list[0:6])

我一直在尝试实现一个工作者/队列的解决方案,但遇到了一些问题。我有一个工作者函数,代码如下:

def worker(work_queue, done_queue):
    try:
        for proj in iter(work_queue.get, 'STOP'):
            print proj
            run_assignments_parallel(proj)
            done_queue.put('finished ' + proj )
    except Exception, e:        
        done_queue.put("%s failed on %s with: %s" % (current_process().name, proj,        e.message))
    return True

调用这个工作者函数的代码如下:

workers = 6
work_queue = Queue()
done_queue = Queue()  
processes = []
for project in project_list:
    print project
    work_queue.put(project)
for w in xrange(workers):        
    p = Process(target=worker, args=(work_queue, done_queue))
    p.start()
    processes.append(p)
    work_queue.put('STOP')
for p in processes:
     p.join()    
     done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):        
    print status

project_list只是一个包含12个项目路径的列表,这些项目需要在函数'run_assignments_parallel'中运行。

现在的写法是,函数对同一个进程(项目)被调用了多次,我不太明白发生了什么。这段代码是基于我找到的一个示例,我觉得循环结构可能有问题。如果能得到一些帮助我会非常感激,对这个问题我表示歉意。谢谢!

2 个回答

5

你可以使用MPipe这个模块。

创建一个有6个工作者的单阶段管道,把你所有的项目作为任务放进去。然后只需要从最后读取结果(在你的情况下,就是状态)。

from mpipe import Pipeline, OrderedStage

...    

pipe = Pipeline(OrderedStage(run_assignments_parallel), 6)    

for project in project_list:
   pipe.put(project)

pipe.put(None)  # Signal end of input.

for status in pipe.results():
   print(status)
10

理想情况下,我们希望在最初的6个任务中,有一个完成后,能够立即启动另一个(比如第7个)。这样就可以同时运行6个任务,而还有更多的任务等待开始。

你只需要做的就是传入所有12个输入参数,而不是6个:

from multiprocessing import Pool
pool = Pool(processes=6) # run no more than 6 at a time
pool.map(run_assignments_parallel, project_list) # pass full list (12 items)

撰写回答