Python:如何检查多处理池中挂起的任务数?

2024-06-16 14:06:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一小群工人(4人)和一大堆任务(5000人)。我正在使用一个池并使用map_async()发送任务。因为我正在运行的任务相当长,所以我将块大小强制为1,这样一个长进程就不能容纳一些较短的进程。

我想做的是定期检查还有多少任务要提交。我知道最多有4个会被激活,我关心还有多少要处理。

我到处搜索,找不到任何人在做这件事。

一些简单的代码可以帮助您:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break

Tags: importmapasynctime进程jobsnotmultiprocessing
3条回答

据我所知,没有什么不严密的方法,但是如果您使用Pool.imap_unordered()函数而不是map_async,那么您可以截取已处理的元素。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

我要减去process_count,因为您几乎可以假设所有进程都将处理,但有两个例外:1)如果使用迭代器,则可能没有其他项可供使用和处理,2)您可能只剩下不到4个项。我没有为第一个异常编写代码。但如果你需要的话,这样做应该很容易。无论如何,你的例子使用了一个列表,所以你不应该有这个问题。

编辑:我也意识到你在使用While循环,这使得它看起来像是你在尝试周期性地更新一些东西,比如说,每半秒或者其他什么东西。我作为示例给出的代码不会这样做。我不确定那是不是个问题。

看起来像是工作。剩下的就是你想要的。_表示它是一个内部值,可能会随开发人员的突发奇想而改变,但它似乎是获取该信息的唯一方法。

我有类似的要求:跟踪进度,根据结果执行临时工作,在任意时间干净地停止所有处理。我处理它的方法是使用apply_async一次发送一个任务。我所做事情的一个简化版本:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

注意,我在结果中使用Queue,而不是return

相关问题 更多 >