ThreadPoolExecutor 对于 CPU 密集型任务太快了
我正在尝试理解ThreadPoolExecutor和ProcessPoolExecutor是怎么工作的。我原本以为,对于像增加计数器这样的CPU密集型任务,使用ThreadPoolExecutor不会有好处,因为它不会释放全局解释器锁(GIL),所以一次只能用一个进程。
@measure_execution_time
def cpu_slow_function(item):
start = time.time()
duration = random()
counter = 1
while time.time() - start < duration:
counter += 1
return item, counter
def test_thread_pool__cpu_bound():
"""
100 tasks of average .5 seconds each, would take 50 seconds to complete sequentially.
"""
items = list(range(100))
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(cpu_slow_function, items))
for index, (result, counter) in enumerate(results):
assert result == index
assert counter >= 0.0
但让我惊讶的是,这个测试大约只花了5秒钟就完成了。根据我的假设,它应该要花大约50秒,100个任务,每个任务平均0.5秒。
我错过了什么呢?
2 个回答
0
你可以在下面这个代码变体中看到GIL的影响:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import perf_counter, time
from os import cpu_count
def func(item: int) -> tuple[int, ...]:
start = time()
counter = 1
while time() - start < 0.5:
counter += 1
return item, counter
def main():
n = max(2, cpu_count() or 2) - 1
for executor in ProcessPoolExecutor, ThreadPoolExecutor:
with executor(n) as exe:
begin = perf_counter()
for _ in exe.map(func, range(100)):
pass
duration = perf_counter() - begin
print(executor.__name__, f"{duration:.4f}s")
if __name__ == "__main__":
main()
为了更好地进行统计分析,应该使用一个固定的时间,而不是一个变化的(伪随机的)时间。在这里我们使用0.5秒。
另外,你需要确保进程池和线程池的大小是一样的。这个大小是根据CPU的数量减去1来决定的。
输出:
ProcessPoolExecutor 7.5492s
ThreadPoolExecutor 7.8219s
我们看到在这种情况下,多线程的速度比多进程稍慢,因为func()完全依赖于CPU。
注意:
在Apple Silicon M2上测试,os.cpu_count() == 8。
1
全局解释锁(GIL)并不是说Python的线程不能同时运行。它只是限制了在任何时刻,只有一个线程可以执行字节码。
在任何时刻,你的程序中会有一个线程正在执行字节码,而其他99个线程则在等待它们执行下一个字节码。同时,time.time()
这个时钟在为所有线程计时,也就是说,现实时间在不断流逝。