检查 `concurrent.futures.ThreadPoolExecutor` 状态

24 投票
2 回答
16903 浏览
提问于 2025-04-18 18:22

我有一个正在运行的 concurrent.futures.ThreadPoolExecutor。我想检查一下它的状态。我想知道现在有多少个线程,多少个线程在处理任务,正在处理哪些任务,还有多少个线程是空闲的,以及队列里有哪些任务在等着处理。我该怎么才能找到这些信息呢?

2 个回答

0

找出pending(待处理)期货的方式并不是特别干净和可靠,但我通常是这样做的:

if 'state=pending' in str(future):
    logger.debug('PENDING')
elif future.running():
    logger.debug('RUNNING')
elif future.cancelled():
    logger.debug('CANCELLED')
elif future.exception():
    logger.debug('EXCEPTION')
elif future.done():
    logger.debug('DONE')
14

我们可以看到一些关于池(Pool)和待处理工作项队列的信息。要了解有哪些内容,可以打印 poolx.__dict__ 来查看它的结构。可以看看线程池的代码,写得不错:concurrent.futures.thread

接下来,我们创建了一个只有一个线程的池。然后创建了两个任务:一个任务睡眠3秒,另一个任务立即返回。接着,我们打印出池中待处理的工作项数量。

之后,我们从工作队列中打印出项目。在这个例子中,一个线程正在执行 time.sleep(3) 函数,所以这个任务不在队列中。函数 sleep 的参数 [0] 和关键字参数 {} 被打印出来,因为这是池中下一个要执行的工作项。

感谢 @dano 提供的非破坏性队列的见解,还有 @abarnert。

源代码

import concurrent.futures, time

poolx = concurrent.futures.ThreadPoolExecutor(max_workers=1)
poolx.submit(time.sleep, 3)
poolx.submit(time.sleep, 0)   # very fast

print('pending:', poolx._work_queue.qsize(), 'jobs')
print('threads:', len(poolx._threads))
print()

# TODO: make thread safe; work on copy of queue?
print('Estimated Pending Work Queue:')
for num,item in enumerate(poolx._work_queue.queue):
    print('{}\t{}\t{}\t{}'.format(
        num+1, item.fn, item.args, item.kwargs,
        ))

poolx.shutdown(wait=False)

输出结果

pending: 1 jobs
threads: 1

Pending Work Queue:
1   <built-in function sleep>   (0,)    {}

撰写回答