ThreadPoolExecutor 对于 CPU 密集型任务太快了

0 投票
2 回答
33 浏览
提问于 2025-04-14 17:44

我正在尝试理解ThreadPoolExecutor和ProcessPoolExecutor是怎么工作的。我原本以为,对于像增加计数器这样的CPU密集型任务,使用ThreadPoolExecutor不会有好处,因为它不会释放全局解释器锁(GIL),所以一次只能用一个进程。

@measure_execution_time
def cpu_slow_function(item):
    start = time.time()
    duration = random()
    counter = 1
    while time.time() - start < duration:
        counter += 1
    return item, counter


def test_thread_pool__cpu_bound():
    """
    100 tasks of average .5 seconds each, would take 50 seconds to complete sequentially.
    """

    items = list(range(100))

    with ThreadPoolExecutor(max_workers=100) as executor:
        results = list(executor.map(cpu_slow_function, items))

    for index, (result, counter) in enumerate(results):
        assert result == index
        assert counter >= 0.0

但让我惊讶的是,这个测试大约只花了5秒钟就完成了。根据我的假设,它应该要花大约50秒,100个任务,每个任务平均0.5秒。

我错过了什么呢?

2 个回答

0

你可以在下面这个代码变体中看到GIL的影响:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import perf_counter, time
from os import cpu_count

def func(item: int) -> tuple[int, ...]:
    start = time()
    counter = 1
    while time() - start < 0.5:
        counter += 1
    return item, counter

def main():
    n = max(2, cpu_count() or 2) - 1
    for executor in ProcessPoolExecutor, ThreadPoolExecutor:
        with executor(n) as exe:
            begin = perf_counter()
            for _ in exe.map(func, range(100)):
                pass
            duration = perf_counter() - begin
            print(executor.__name__, f"{duration:.4f}s")

if __name__ == "__main__":
    main()

为了更好地进行统计分析,应该使用一个固定的时间,而不是一个变化的(伪随机的)时间。在这里我们使用0.5秒。

另外,你需要确保进程池和线程池的大小是一样的。这个大小是根据CPU的数量减去1来决定的。

输出:

ProcessPoolExecutor 7.5492s
ThreadPoolExecutor 7.8219s

我们看到在这种情况下,多线程的速度比多进程稍慢,因为func()完全依赖于CPU。

注意:

在Apple Silicon M2上测试,os.cpu_count() == 8。

1

全局解释锁(GIL)并不是说Python的线程不能同时运行。它只是限制了在任何时刻,只有一个线程可以执行字节码。

在任何时刻,你的程序中会有一个线程正在执行字节码,而其他99个线程则在等待它们执行下一个字节码。同时,time.time()这个时钟在为所有线程计时,也就是说,现实时间在不断流逝。

撰写回答