tqdm 多进程嵌套进度条
我正在使用多进程来处理多个耗时的任务,同时有一个外部的进度条来跟踪完成了多少个任务。为了显示每个任务的进度,我还想加一个内部进度条,并且希望在内部进度条完成时能够打印出来。
问题是,当内部进度条完成时,它会消失,因为设置了leave=False。如果设置为leave=True也不行,因为我需要能够重新启动内部进度条。因此,我的解决办法是手动打印出完成的进度条。
我的解决方案如下。因为它使用了`sleep(.04)`,所以这个0.04的时间需要根据电脑、工作进程数量、任务长度等进行调整。而且,即使你尝试调整这个时间,它也不一定总是有效。因此,我在寻找一个不那么“hack”的解决方案,能够在任何电脑上都能正常工作。
from tqdm import tqdm
from time import sleep
import multiprocessing
def do_the_thing(my_args):
if my_args:
pbar_inner = tqdm(total=15, position=1, leave=False)
for i in range(15):
sleep(.1)
pbar_inner.update()
else:
sleep(1.5)
if __name__ == '__main__':
postfix = ' [Use this line/progress bar to print some stuff out.]'
pbar_outer = tqdm(total=60, position=0, leave=True)
for n in range(3):
pool = multiprocessing.Pool(2)
args = [True if i % 8 == 0 else False for i in range(20)]
for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
pbar_outer.update()
if args[count]:
sleep(.04)
my_pbar_inner = tqdm(total=15, position=1, leave=False,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}' + postfix)
my_pbar_inner.update(15)
my_pbar_inner.set_postfix_str('')
pool.close()
pool.join()
2 个回答
这是我现在的解决方案。虽然它并不总是有效,但加上“睡眠”这个步骤确实有帮助。不知道为什么,在这个测试脚本中,进度条大部分时间还是会消失。不过在实际代码中,它的表现要好得多。
from tqdm import tqdm
from time import sleep
import multiprocessing
def do_the_thing(my_args):
if my_args:
pbar_inner = tqdm(total=15, position=1, leave=False)
for i in range(15):
sleep(.1)
pbar_inner.update()
else:
sleep(1.5)
if __name__ == '__main__':
postfix = ' [Use this line/progress bar to print some stuff out.]'
pbar_outer = tqdm(total=60, position=0, leave=True)
for n in range(3):
pool = multiprocessing.Pool(2)
args = [True if i % 8 == 0 else False for i in range(20)]
for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
pbar_outer.update()
if args[count]:
sleep(.04) # magic
my_pbar_inner = tqdm(total=15, position=1, leave=False,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}' + postfix)
my_pbar_inner.update(15)
my_pbar_inner.set_postfix_str('')
pool.close()
pool.join()
这里的重点是,如果外部的进度条需要几个小时或几天才能完成,那么有一个内部进度条在几分钟内更新就会很不错(内部进度条会在整个过程中反复重启)。
默认情况下,进度条完成后会消失,所以这个“解决方案”的意思就是手动把它再打印出来,显示为完成状态。这个状态只能在上一个进度条结束和下一个进度条开始之间的(可能很短暂的)时间里看到。
我觉得我明白你想要做什么。不过,这个功能在tqdm里是没有的。
tqdm的设计是,当它的实例被删除时,进度条会自动关闭。这是通过一个叫做__del__的机制实现的。因此,当你退出do_the_thing
这个函数时,进度条总是会被删除或留下。
不过,有几种方法可以解决这个问题。最简单的方法有点小技巧。虽然我说进度条总是会被删除,但如果你看看它的实现,会发现它通过disable
这个属性来防止多次删除。所以,提前把disable
设置为True,就可以防止它被删除。
import multiprocessing
from time import sleep
from tqdm import tqdm
def do_the_thing(args):
my_args, postfix = args
if my_args:
pbar_inner = tqdm(total=15, position=1, leave=False)
for i in range(15):
sleep(.1)
pbar_inner.update()
pbar_inner.set_postfix_str(postfix)
pbar_inner.disable = True # Disable further updates, including clearing of the bar.
else:
sleep(1.5)
if __name__ == '__main__':
postfix = ' [Use this line/progress bar to print some stuff out.]'
pbar_outer = tqdm(total=60, position=0, leave=True)
for n in range(3):
pool = multiprocessing.Pool(2)
args = [(i % 8 == 0, postfix) for i in range(20)]
for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
pbar_outer.update()
pool.close()
pool.join()
# Setting disable attribute prevents it from being deleted even at the end,
# so insert a dummy inner bar to overwrite and delete it (if you need).
tqdm(range(1), position=1, leave=False)
这个方法简单,但它依赖于tqdm的具体实现,所以你可能不太喜欢。
另一个解决办法更“正确”,但稍微复杂一点。问题在于,当do_the_thing
函数退出时,进度条实例会被删除,因此我们需要在其他地方管理这个进度条实例。也就是说,我们创建一个后台进程来单独管理进度条,而不是让Pool的工作进程直接管理,然后通过一个队列来传递操作给进度条。
import multiprocessing
from time import sleep
from tqdm import tqdm
def pbar_inner_worker(queue: multiprocessing.Queue, position: int):
"""Worker that manages the inner bar."""
pbar = tqdm(position=position, leave=False)
while True:
op = queue.get()
if op is None:
return
for method, kwargs in op.items():
getattr(pbar, method)(**kwargs)
def do_the_thing(args):
my_args, postfix, pbar_inner = args
if my_args:
# This will invoke `pbar.reset(total=15)` in the pbar_inner_worker.
pbar_inner.put({"reset": dict(total=15)})
for i in range(15):
sleep(0.1)
pbar_inner.put({"update": dict(n=1)})
pbar_inner.put({"set_postfix_str": dict(s=postfix)})
else:
sleep(1.5)
if __name__ == "__main__":
postfix = " [Use this line/progress bar to print some stuff out.]"
pbar_outer = tqdm(total=60, position=0, leave=True)
# Create the inner bar.
pbar_inner_queue = multiprocessing.Manager().Queue()
pbar_inner_process = multiprocessing.Process(
target=pbar_inner_worker,
kwargs=dict(queue=pbar_inner_queue, position=1),
)
pbar_inner_process.start()
for n in range(3):
pool = multiprocessing.Pool(2)
args = [(i % 8 == 0, postfix, pbar_inner_queue) for i in range(20)]
for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
pbar_outer.update()
pool.close()
pool.join()
# Close the inner bar.
pbar_inner_queue.put(None)
pbar_inner_process.join()
pbar_inner_process.close()
请注意,multiprocessing.Manager().Queue()
可能会比较慢,所以如果你频繁更新进度条,可能会导致性能下降。
另外要注意的是,这两种方法都不适合多个进程同时更新进度条。