multiprocessing Pool.imap坏了吗?

7 投票
2 回答
6796 浏览
提问于 2025-04-16 14:42

我试过在Ubuntu的python2.6包里自带的多进程功能(__version__显示是0.70a1)和从PyPI下载的最新版本(2.6.2.1)。在这两种情况下,我都不知道怎么正确使用imap——这导致整个解释器对ctrl-C的操作没有反应(不过map用起来没问题)。调试工具pdb显示next()里的wait()调用上卡住了,所以没有人来唤醒我们。有没有什么提示?提前谢谢。

$ cat /tmp/go3.py
import multiprocessing as mp
print mp.Pool(1).map(abs, range(3))
print list(mp.Pool(1).imap(abs, range(3)))

$ python /tmp/go3.py
[0, 1, 2]
^C^C^C^C^C^\Quit

2 个回答

5

在我的情况下,我使用了 pool.imap(),但没有期待返回值,所以它没有正常工作。不过,当我换成 pool.map() 时,一切都正常。问题正如之前的回答所说:没有调用结束处理程序,所以进程在开始之前就被终止了。

解决办法是调用一个结束处理程序,比如 list() 函数。这样做后,它就能正常工作了,因为现在需要把结果交给 list 函数,从而执行了这个进程。简单来说,下面是解释(当然,这只是简化版,暂时就当它是有用的东西):

from multiprocessing import Pool
from shutil import copy
from tqdm import tqdm

filedict = { r"C:\src\file1.txt": r"C:\trg\file1_fixed.txt",
             r"C:\src\file2.txt": r"C:\trg\file2_fixed.txt",
             r"C:\src\file3.txt": r"C:\trg\file3_fixed.txt",
             r"C:\src\file4.txt": r"C:\trg\file4_fixed.txt" }

# target process
def copyfile(srctrg):  
    copy(srctrg[0],srctrg[1])
    return True

# a couple of trial processes for illustration
with Pool(2) as pool:

    # works fine with map, but cannot utilize tqdm() since no iterator object is returned 
    pool.map(copyfile,list(filedict.items()))

    # will not work, since no finalizer is called for imap
    tqdm(pool.imap(copyfile,list(filedict.items())))    # NOT WORKING

    # this works, since the finalization is forced for the process
    list(tqdm(pool.imap(copyfile,list(filedict.items()))))

在我的情况下,简单的解决方案是把整个 tqdm(pool.imap(...)) 包裹在一个 list() 中,以强制执行。

18

首先注意到,这段代码是可以正常工作的:

import multiprocessing as mp
import multiprocessing.util as util
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))

这里的区别在于,pool 在调用 pool.imap() 结束时并不会被清理掉。

相反,

print(list(mp.Pool(1).imap(abs, range(3))))

这段代码会导致 Pool 实例在 imap 调用结束后很快就被清理掉。没有引用的情况下,会调用 Finalizer(在 Pool 类中称为 self._terminate)。这会启动一系列命令,逐步关闭任务处理线程、结果处理线程、工作子进程等等。

这一切发生得非常快,以至于在大多数情况下,发送给任务处理器的任务并没有完成。

以下是相关的代码片段:

来自 /usr/lib/python2.6/multiprocessing/pool.py:

class Pool(object):
    def __init__(self, processes=None, initializer=None, initargs=()):
        ...
        self._terminate = Finalize(
            self, self._terminate_pool,
            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                  self._task_handler, self._result_handler, self._cache),
            exitpriority=15
            )

来自 /usr/lib/python2.6/multiprocessing/util.py:

class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        ...
        if obj is not None:
            self._weakref = weakref.ref(obj, self)   

这个 weakref.ref(obj,self) 的作用是,当 obj 即将被清理时,会调用 self()

我使用了调试命令 util.log_to_stderr(util.SUBDEBUG) 来了解事件的顺序。例如:

import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)

print(list(mp.Pool(1).imap(abs, range(3))))

得到的结果是

[DEBUG/MainProcess] created semlock with handle 3077013504
[DEBUG/MainProcess] created semlock with handle 3077009408
[DEBUG/MainProcess] created semlock with handle 3077005312
[DEBUG/MainProcess] created semlock with handle 3077001216
[INFO/PoolWorker-1] child process calling self.run()
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x9d6e62c>, <multiprocessing.queues.SimpleQueue object at 0x9cf04cc>, <multiprocessing.queues.SimpleQueue object at 0x9d6e40c>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1217967248)>, <Thread(Thread-2, started daemon -1226359952)>, {0: <multiprocessing.pool.IMapIterator object at 0x9d6eaec>}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
...

并且与此进行比较

import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))

得到的结果是

[DEBUG/MainProcess] created semlock with handle 3078684672
[DEBUG/MainProcess] created semlock with handle 3078680576
[DEBUG/MainProcess] created semlock with handle 3078676480
[DEBUG/MainProcess] created semlock with handle 3078672384
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[0, 1, 2]
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[SUBDEBUG/MainProcess] calling <Finalize object, callback=_terminate_pool, args=(<Queue.Queue instance at 0xb763e60c>, <multiprocessing.queues.SimpleQueue object at 0xb76c94ac>, <multiprocessing.queues.SimpleQueue object at 0xb763e3ec>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1218274448)>, <Thread(Thread-2, started daemon -1226667152)>, {}), exitprority=15>
...
[DEBUG/MainProcess] finalizing pool

撰写回答