检查 `concurrent.futures.ThreadPoolExecutor` 状态
我有一个正在运行的 concurrent.futures.ThreadPoolExecutor
。我想检查一下它的状态。我想知道现在有多少个线程,多少个线程在处理任务,正在处理哪些任务,还有多少个线程是空闲的,以及队列里有哪些任务在等着处理。我该怎么才能找到这些信息呢?
2 个回答
0
找出pending
(待处理)期货的方式并不是特别干净和可靠,但我通常是这样做的:
if 'state=pending' in str(future):
logger.debug('PENDING')
elif future.running():
logger.debug('RUNNING')
elif future.cancelled():
logger.debug('CANCELLED')
elif future.exception():
logger.debug('EXCEPTION')
elif future.done():
logger.debug('DONE')
14
我们可以看到一些关于池(Pool)和待处理工作项队列的信息。要了解有哪些内容,可以打印 poolx.__dict__
来查看它的结构。可以看看线程池的代码,写得不错:concurrent.futures.thread
接下来,我们创建了一个只有一个线程的池。然后创建了两个任务:一个任务睡眠3秒,另一个任务立即返回。接着,我们打印出池中待处理的工作项数量。
之后,我们从工作队列中打印出项目。在这个例子中,一个线程正在执行 time.sleep(3)
函数,所以这个任务不在队列中。函数 sleep
的参数 [0]
和关键字参数 {}
被打印出来,因为这是池中下一个要执行的工作项。
感谢 @dano 提供的非破坏性队列的见解,还有 @abarnert。
源代码
import concurrent.futures, time
poolx = concurrent.futures.ThreadPoolExecutor(max_workers=1)
poolx.submit(time.sleep, 3)
poolx.submit(time.sleep, 0) # very fast
print('pending:', poolx._work_queue.qsize(), 'jobs')
print('threads:', len(poolx._threads))
print()
# TODO: make thread safe; work on copy of queue?
print('Estimated Pending Work Queue:')
for num,item in enumerate(poolx._work_queue.queue):
print('{}\t{}\t{}\t{}'.format(
num+1, item.fn, item.args, item.kwargs,
))
poolx.shutdown(wait=False)
输出结果
pending: 1 jobs
threads: 1
Pending Work Queue:
1 <built-in function sleep> (0,) {}