Python: 如何检查multiprocessing.Pool中待处理任务的数量?

14 投票
4 回答
10963 浏览
提问于 2025-04-16 15:04

我有一小组工人(4个),还有一个非常大的任务列表(大约5000个任务)。我正在使用一个任务池,并通过map_async()来发送任务。因为我运行的任务比较长,所以我强制设置了chunksize为1,这样一个长时间的任务就不会拖慢其他短任务的进度。

我想定期检查一下还有多少任务需要提交。我知道最多会有4个任务在同时进行,但我关心的是还有多少任务需要处理。

我在网上查了很久,但找不到有人在做这个。

这里有一些简单的代码可以帮助理解:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break

4 个回答

1

我知道的没有绝对可靠的方法,不过如果你用 Pool.imap_unordered() 这个函数,而不是 map_async,你就可以拦截正在处理的元素。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

我在这里减去 process_count,因为你可以基本上假设所有的进程都会在处理,只有两个例外:1)如果你用的是迭代器,可能没有更多的项目可以处理;2)你可能剩下的项目少于4个。我没有为第一个例外写代码,但如果你需要的话,这应该很简单。总之,你的例子用的是列表,所以你应该不会遇到这个问题。

编辑:我还注意到你在使用一个 While 循环,这看起来像是你想定期更新一些东西,比如每半秒更新一次。我的示例代码不会那样做。我不确定这是否是个问题。

3

你可以通过查看 Pool._cache 这个属性来检查有多少个待处理的任务,前提是你在使用 apply_async。这个地方存储着 ApplyResult,直到它们可以使用,而这个数量就是待处理的 ApplyResult 的数量。

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()
9

看起来 jobs._number_left 是你想要的东西。这里的 _ 表示这是一个内部值,可能会随开发者的想法而改变,但目前似乎这是获取该信息的唯一方法。

撰写回答