tqdm 多进程嵌套进度条

0 投票
2 回答
119 浏览
提问于 2025-04-14 15:34

我正在使用多进程来处理多个耗时的任务,同时有一个外部的进度条来跟踪完成了多少个任务。为了显示每个任务的进度,我还想加一个内部进度条,并且希望在内部进度条完成时能够打印出来。

这就是我想要的效果。

问题是,当内部进度条完成时,它会消失,因为设置了leave=False。如果设置为leave=True也不行,因为我需要能够重新启动内部进度条。因此,我的解决办法是手动打印出完成的进度条。

我的解决方案如下。因为它使用了`sleep(.04)`,所以这个0.04的时间需要根据电脑、工作进程数量、任务长度等进行调整。而且,即使你尝试调整这个时间,它也不一定总是有效。因此,我在寻找一个不那么“hack”的解决方案,能够在任何电脑上都能正常工作。

from tqdm import tqdm
from time import sleep
import multiprocessing


def do_the_thing(my_args):
    if my_args:
        pbar_inner = tqdm(total=15, position=1, leave=False)
        for i in range(15):
            sleep(.1)
            pbar_inner.update()
    else:
        sleep(1.5)


if __name__ == '__main__':
    postfix = ' [Use this line/progress bar to print some stuff out.]'
    pbar_outer = tqdm(total=60, position=0, leave=True)
    for n in range(3):
        pool = multiprocessing.Pool(2)
        args = [True if i % 8 == 0 else False for i in range(20)]
        for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
            pbar_outer.update()
            if args[count]:
                sleep(.04)
                my_pbar_inner = tqdm(total=15, position=1, leave=False,
                                     bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}' + postfix)
                my_pbar_inner.update(15)
                my_pbar_inner.set_postfix_str('')
        pool.close()
        pool.join()

2 个回答

0

这是我现在的解决方案。虽然它并不总是有效,但加上“睡眠”这个步骤确实有帮助。不知道为什么,在这个测试脚本中,进度条大部分时间还是会消失。不过在实际代码中,它的表现要好得多。

from tqdm import tqdm
from time import sleep
import multiprocessing


def do_the_thing(my_args):
    if my_args:
        pbar_inner = tqdm(total=15, position=1, leave=False)
        for i in range(15):
            sleep(.1)
            pbar_inner.update()
    else:
        sleep(1.5)


if __name__ == '__main__':
    postfix = ' [Use this line/progress bar to print some stuff out.]'
    pbar_outer = tqdm(total=60, position=0, leave=True)
    for n in range(3):
        pool = multiprocessing.Pool(2)
        args = [True if i % 8 == 0 else False for i in range(20)]
        for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
            pbar_outer.update()
            if args[count]:
                sleep(.04)  # magic 
                my_pbar_inner = tqdm(total=15, position=1, leave=False,
                                     bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}' + postfix)
                my_pbar_inner.update(15)
                my_pbar_inner.set_postfix_str('')
        pool.close()
        pool.join()

在这里输入图片描述 这里的重点是,如果外部的进度条需要几个小时或几天才能完成,那么有一个内部进度条在几分钟内更新就会很不错(内部进度条会在整个过程中反复重启)。

默认情况下,进度条完成后会消失,所以这个“解决方案”的意思就是手动把它再打印出来,显示为完成状态。这个状态只能在上一个进度条结束和下一个进度条开始之间的(可能很短暂的)时间里看到。

1

我觉得我明白你想要做什么。不过,这个功能在tqdm里是没有的。

tqdm的设计是,当它的实例被删除时,进度条会自动关闭。这是通过一个叫做__del__的机制实现的。因此,当你退出do_the_thing这个函数时,进度条总是会被删除或留下。

不过,有几种方法可以解决这个问题。最简单的方法有点小技巧。虽然我说进度条总是会被删除,但如果你看看它的实现,会发现它通过disable这个属性来防止多次删除。所以,提前把disable设置为True,就可以防止它被删除。

import multiprocessing
from time import sleep

from tqdm import tqdm


def do_the_thing(args):
    my_args, postfix = args
    if my_args:
        pbar_inner = tqdm(total=15, position=1, leave=False)
        for i in range(15):
            sleep(.1)
            pbar_inner.update()
        pbar_inner.set_postfix_str(postfix)
        pbar_inner.disable = True  # Disable further updates, including clearing of the bar.
    else:
        sleep(1.5)


if __name__ == '__main__':
    postfix = ' [Use this line/progress bar to print some stuff out.]'
    pbar_outer = tqdm(total=60, position=0, leave=True)
    for n in range(3):
        pool = multiprocessing.Pool(2)
        args = [(i % 8 == 0, postfix) for i in range(20)]
        for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
            pbar_outer.update()
        pool.close()
        pool.join()

    # Setting disable attribute prevents it from being deleted even at the end,
    # so insert a dummy inner bar to overwrite and delete it (if you need).
    tqdm(range(1), position=1, leave=False)

这个方法简单,但它依赖于tqdm的具体实现,所以你可能不太喜欢。

另一个解决办法更“正确”,但稍微复杂一点。问题在于,当do_the_thing函数退出时,进度条实例会被删除,因此我们需要在其他地方管理这个进度条实例。也就是说,我们创建一个后台进程来单独管理进度条,而不是让Pool的工作进程直接管理,然后通过一个队列来传递操作给进度条。

import multiprocessing
from time import sleep

from tqdm import tqdm


def pbar_inner_worker(queue: multiprocessing.Queue, position: int):
    """Worker that manages the inner bar."""
    pbar = tqdm(position=position, leave=False)
    while True:
        op = queue.get()
        if op is None:
            return
        for method, kwargs in op.items():
            getattr(pbar, method)(**kwargs)


def do_the_thing(args):
    my_args, postfix, pbar_inner = args
    if my_args:
        # This will invoke `pbar.reset(total=15)` in the pbar_inner_worker.
        pbar_inner.put({"reset": dict(total=15)})
        for i in range(15):
            sleep(0.1)
            pbar_inner.put({"update": dict(n=1)})
        pbar_inner.put({"set_postfix_str": dict(s=postfix)})
    else:
        sleep(1.5)


if __name__ == "__main__":
    postfix = " [Use this line/progress bar to print some stuff out.]"
    pbar_outer = tqdm(total=60, position=0, leave=True)

    # Create the inner bar.
    pbar_inner_queue = multiprocessing.Manager().Queue()
    pbar_inner_process = multiprocessing.Process(
        target=pbar_inner_worker,
        kwargs=dict(queue=pbar_inner_queue, position=1),
    )
    pbar_inner_process.start()

    for n in range(3):
        pool = multiprocessing.Pool(2)
        args = [(i % 8 == 0, postfix, pbar_inner_queue) for i in range(20)]
        for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
            pbar_outer.update()
        pool.close()
        pool.join()

    # Close the inner bar.
    pbar_inner_queue.put(None)
    pbar_inner_process.join()
    pbar_inner_process.close()

请注意,multiprocessing.Manager().Queue()可能会比较慢,所以如果你频繁更新进度条,可能会导致性能下降。

另外要注意的是,这两种方法都不适合多个进程同时更新进度条。

撰写回答