我有一小群工人(4人)和一大堆任务(5000人)。我正在使用一个池并使用map_async()发送任务。因为我正在运行的任务相当长,所以我将块大小强制为1,这样一个长进程就不能容纳一些较短的进程。
我想做的是定期检查还有多少任务要提交。我知道最多有4个会被激活,我关心还有多少要处理。
我到处搜索,找不到任何人在做这件事。
一些简单的代码可以帮助您:
import multiprocessing
import time
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()
while True:
if not jobs.ready():
print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
jobs.wait(2)
else:
break
据我所知,没有什么不严密的方法,但是如果您使用
Pool.imap_unordered()
函数而不是map_async,那么您可以截取已处理的元素。我要减去
process_count
,因为您几乎可以假设所有进程都将处理,但有两个例外:1)如果使用迭代器,则可能没有其他项可供使用和处理,2)您可能只剩下不到4个项。我没有为第一个异常编写代码。但如果你需要的话,这样做应该很容易。无论如何,你的例子使用了一个列表,所以你不应该有这个问题。编辑:我也意识到你在使用While循环,这使得它看起来像是你在尝试周期性地更新一些东西,比如说,每半秒或者其他什么东西。我作为示例给出的代码不会这样做。我不确定那是不是个问题。
看起来像是工作。剩下的就是你想要的。_表示它是一个内部值,可能会随开发人员的突发奇想而改变,但它似乎是获取该信息的唯一方法。
我有类似的要求:跟踪进度,根据结果执行临时工作,在任意时间干净地停止所有处理。我处理它的方法是使用
apply_async
一次发送一个任务。我所做事情的一个简化版本:注意,我在结果中使用
Queue
,而不是return
。相关问题 更多 >
编程相关推荐