如何显示Python multiprocessing pool imap_unordered调用的进度?

160 投票
12 回答
172025 浏览
提问于 2025-04-16 15:45

我有一个脚本,它成功地使用了多进程池来处理一些任务,调用了 imap_unordered()

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

不过,我的 num_tasks 大约有 250,000 个,所以在执行 join() 的时候,主线程会被锁住大约 10 秒。我希望能在命令行上逐步显示进度,这样就能让人知道主进程没有被锁住。类似于:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

有没有什么方法可以让结果对象或者池本身显示剩下多少任务?我试着用 multiprocessing.Value 对象作为计数器(在 do_work 完成任务后调用 counter.value += 1),但是计数器在达到总值的 ~85% 时就停止增加了。

12 个回答

43

正如Tim所建议的,你可以使用 tqdmimap 来解决这个问题。我刚好遇到了这个问题,并对 imap_unordered 的解决方案做了一些调整,这样我就可以获取映射的结果。下面是它的工作原理:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

如果你不在乎你的任务返回的值,那么你就不需要把这个列表赋值给任何变量。

161

我个人最喜欢的这个工具——在运行和提交的过程中,它会给你一个漂亮的小进度条,还会告诉你预计完成的时间。

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass
96

结果集的私有属性不需要访问:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

撰写回答