Python: 如何检查multiprocessing.Pool中待处理任务的数量?
我有一小组工人(4个),还有一个非常大的任务列表(大约5000个任务)。我正在使用一个任务池,并通过map_async()来发送任务。因为我运行的任务比较长,所以我强制设置了chunksize为1,这样一个长时间的任务就不会拖慢其他短任务的进度。
我想定期检查一下还有多少任务需要提交。我知道最多会有4个任务在同时进行,但我关心的是还有多少任务需要处理。
我在网上查了很久,但找不到有人在做这个。
这里有一些简单的代码可以帮助理解:
import multiprocessing
import time
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()
while True:
if not jobs.ready():
print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
jobs.wait(2)
else:
break
4 个回答
1
我知道的没有绝对可靠的方法,不过如果你用 Pool.imap_unordered()
这个函数,而不是 map_async,你就可以拦截正在处理的元素。
import multiprocessing
import time
process_count = 4
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
# Actually, you should return the job you've created here.
return num
pool = multiprocess.Pool(process_count)
jobs = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
jobs.append(job)
job_count += 1
incomplete = len(items) - job_count
unsubmitted = max(0, incomplete - process_count)
print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted
pool.close()
我在这里减去 process_count
,因为你可以基本上假设所有的进程都会在处理,只有两个例外:1)如果你用的是迭代器,可能没有更多的项目可以处理;2)你可能剩下的项目少于4个。我没有为第一个例外写代码,但如果你需要的话,这应该很简单。总之,你的例子用的是列表,所以你应该不会遇到这个问题。
编辑:我还注意到你在使用一个 While 循环,这看起来像是你想定期更新一些东西,比如每半秒更新一次。我的示例代码不会那样做。我不确定这是否是个问题。
3
你可以通过查看 Pool._cache
这个属性来检查有多少个待处理的任务,前提是你在使用 apply_async
。这个地方存储着 ApplyResult
,直到它们可以使用,而这个数量就是待处理的 ApplyResult
的数量。
import multiprocessing as mp
import random
import time
def job():
time.sleep(random.randint(1,10))
print("job finished")
if __name__ == '__main__':
pool = mp.Pool(5)
for _ in range(10):
pool.apply_async(job)
while pool._cache:
print("number of jobs pending: ", len(pool._cache))
time.sleep(2)
pool.close()
pool.join()
9
看起来 jobs._number_left
是你想要的东西。这里的 _
表示这是一个内部值,可能会随开发者的想法而改变,但目前似乎这是获取该信息的唯一方法。