Python 进度条与进程分叉结合使用

0 投票
2 回答
32 浏览
提问于 2025-04-14 16:58

我正在尝试在一段使用os.fork来并行处理的代码中加入一个进度条。我试过用一个很酷的进度条,但因为进度更新在不同进程之间的处理方式不对,所以它没有正常工作。也就是说,子进程中的进度条更新没有在父进程中显示出来。

我的代码大概是这样的:

from rich.progress import Progress
import os
with Progress() as progress:
    task = progress.add_task(total=len(args_list))
    for balanced_batch in even_batches_it:
        if os.fork():
            # do something
        else:
            try:
                for my_tuple in balanced_batch:
                    do_something(my_tuple)
                progress.advance(task, advance=len(balanced_genes_batch))
            except Exception as exception:
              # do something

任何反馈或建议都非常感谢!

2 个回答

0

在终端输出信息的时候,最好只让一个进程来处理这些输出。这就是说,你需要把进度信息传递给主进程,让它来管理进度条。

这里有一个使用队列的示例,可以在EnlightenGitHub库中找到。不过,这个示例没有使用fork(),而是用了multiprocessing模块。

0

根据我的经验,最好是专门创建一个 Python 文件来处理这个任务。这样每次想在代码中添加进度条的时候就简单多了,因为 threading 其实不太好用。

我写了一些代码,这样当你输入一个函数时,它会在运行这个函数的同时显示一个进度条:

from rich.progress import Progress
import time
import inspect

def run(func, sleep=0):
    update = lambda progress, task: progress.update(task, advance=1)

    function = inspect.getsource(func)
    mylist = function.split('\n')
    mylist.remove(mylist[0])
    length = len(mylist)
    for num, line in enumerate(mylist):
        mylist[num] = line[8:] + "\nupdate(progress, task)\ntime.sleep(sleep)" #update the progress bar

    myexec = ''
    for line in mylist:
        myexec += line + '\n'

    with Progress() as progress:
        task = progress.add_task("Working...", total=length)
        exec(myexec)

if __name__ == '__main__':
    def myfunc():
        #chose to import these modules for progress bar example since they take a LONG time to import on my computer
        import pandas
        import openai

    run(myfunc)

这段代码有点不太高效,因为它使用了 exec()。不过,它非常强大!只要在你想运行的函数的每一行末尾加上 update(progress, task),进度条就会在函数的每一行执行时更新。下面是你如何实现你的代码:

把这个示例代码保存为一个文件,名字随便起(比如 progress_bar.py)。然后从这个 Python 文件中导入 run() 函数。现在你就可以运行你的代码了:

from progress_bar import run
import os

def myfunc():
    for balanced_batch in even_batches_it:
        if os.fork():
            # do something
        else:
            try:
                for my_tuple in balanced_batch:
                    do_something(my_tuple)
                progress.advance(task, advance=len(balanced_genes_batch))
            except Exception as exception:
              # do something

run(myfunc) #runs this function so the progress bar will update alongside it, updating every line of code in the function

最后要注意的是,你的代码其实是为一个循环结构设计的,所以你并不需要我的函数来运行它(根据文档说明),不过这段代码在将来可能对你更有帮助,因为你当前的进度条可能不太好用。

撰写回答