Python 多进程 工作/队列
我有一个Python函数需要总共运行12次。目前我使用的是multiprocessing库中的Pool来并行运行这些任务。通常我一次运行6个,因为这个函数对CPU的要求很高,同时运行12个往往会导致程序崩溃。当我一次运行6个时,第二组6个任务会等到第一组的6个任务全部完成后才开始。理想情况下,我们希望在第一组的6个任务中有一个完成后,能立即开始下一个任务(比如第7个),这样就能保持6个任务同时运行,而还有更多的任务可以开始。目前代码是这样的(会被调用两次,第一次传入前6个元素,第二次传入后6个元素):
from multiprocessing import Pool
def start_pool(project_list):
pool = Pool(processes=6)
pool.map(run_assignments_parallel,project_list[0:6])
我一直在尝试实现一个工作者/队列的解决方案,但遇到了一些问题。我有一个工作者函数,代码如下:
def worker(work_queue, done_queue):
try:
for proj in iter(work_queue.get, 'STOP'):
print proj
run_assignments_parallel(proj)
done_queue.put('finished ' + proj )
except Exception, e:
done_queue.put("%s failed on %s with: %s" % (current_process().name, proj, e.message))
return True
调用这个工作者函数的代码如下:
workers = 6
work_queue = Queue()
done_queue = Queue()
processes = []
for project in project_list:
print project
work_queue.put(project)
for w in xrange(workers):
p = Process(target=worker, args=(work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put('STOP')
for p in processes:
p.join()
done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):
print status
project_list只是一个包含12个项目路径的列表,这些项目需要在函数'run_assignments_parallel'中运行。
现在的写法是,函数对同一个进程(项目)被调用了多次,我不太明白发生了什么。这段代码是基于我找到的一个示例,我觉得循环结构可能有问题。如果能得到一些帮助我会非常感激,对这个问题我表示歉意。谢谢!
2 个回答
5
你可以使用MPipe这个模块。
创建一个有6个工作者的单阶段管道,把你所有的项目作为任务放进去。然后只需要从最后读取结果(在你的情况下,就是状态)。
from mpipe import Pipeline, OrderedStage
...
pipe = Pipeline(OrderedStage(run_assignments_parallel), 6)
for project in project_list:
pipe.put(project)
pipe.put(None) # Signal end of input.
for status in pipe.results():
print(status)
10
理想情况下,我们希望在最初的6个任务中,有一个完成后,能够立即启动另一个(比如第7个)。这样就可以同时运行6个任务,而还有更多的任务等待开始。
你只需要做的就是传入所有12个输入参数,而不是6个:
from multiprocessing import Pool
pool = Pool(processes=6) # run no more than 6 at a time
pool.map(run_assignments_parallel, project_list) # pass full list (12 items)