python 使用 pool.map_async 时无输出

1 投票
1 回答
7623 浏览
提问于 2025-04-18 16:43

我在使用pool.map调用的函数中处理数据时遇到了一些非常奇怪的问题。例如,下面的代码按预期工作……

import csv
import multiprocessing
import itertools
from collections import deque

cur_best = 0
d_sol = deque(maxlen=9)
d_names = deque(maxlen=9)

**import CSV Data1**

def calculate(vals):
    #global cur_best
    sol = sum(int(x[2]) for x in vals)
    names = [x[0] for x in vals]
    print(", ".join(names) + " = " + str(sol))

def process():
    pool = multiprocessing.Pool(processes=4)
    prod = itertools.product(([x[2], x[4], x[10]] for x in Data1))
    result = pool.map_async(calculate, prod)
    pool.close()
    pool.join()
    return result

process()

但是,当我在calculate()中添加一个简单的if语句时,就没有任何输出了。

   def calculate(vals):
        #global cur_best
        sol = sum(int(x[2]) for x in vals)
        if sol > cur_best:
             cur_best = sol
             names = [x[0] for x in vals]
             print(", ".join(names) + " = " + str(cur_best))
             #would like to append cur_best and names to a deque

我尝试调整'cur_best'的声明位置,但没有效果。

我想在进行计算时跟踪“当前最佳”解决方案。在我的线性代码中,这个逻辑是在一个嵌套的for循环中进行的,我会把每个新的'cur_best'添加到一个双端队列(deque)中。

我现在遇到的问题是否与pool.map或pool.map_async的工作方式有关?我还能否把calculate()函数当作线性循环来处理?

我还有几个其他的条件语句需要处理。我应该在代码的其他部分处理这些吗?如果是的话,具体该怎么做呢?

1 个回答

4

这里可能有两个问题。首先,你在工作函数中没有看到任何输出,可能是因为它抛出了一个异常。因为你使用了 map_async,所以在你调用 result.get() 之前,你是看不到这个异常的。不过,由于你在使用 map_async 后立刻调用了 closejoin,所以你可能应该直接使用 map,这样它会一直等到所有工作完成(或者抛出异常)后再继续。我不太确定异常发生的原因(从你提供的代码中看不出什么问题),但我猜可能是你在某个地方取错了列表的索引。

其次,正如 Armin Rigo 指出的那样,cur_best 在所有进程之间并不是共享的,所以你的逻辑不会按你想的那样工作。我认为最简单的办法是使用一个 multiprocessing.Value 来创建一个共享内存中的整数,这样所有进程都可以访问。

如果你想把得到的结果添加到一个 deque 中,你需要创建共享的双端队列,使用 multiprocessing.Manager。一个 Manager 会启动一个服务器进程,来管理对某个对象(比如 deque)的共享访问。你池中的每个进程(以及父进程)都可以访问一个 Proxy 对象,这个对象可以与 Manager 的进程通信,以便读写共享对象。

下面是一个示例,展示了以上讨论的内容:

import itertools
import multiprocessing
from collections import deque
from multiprocessing.managers import BaseManager, MakeProxyType

class DequeManager(BaseManager):
   pass

BaseDequeProxy = MakeProxyType('BaseDequeProxy', (
    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
    '__mul__', '__reversed__', '__rmul__', '__setitem__',
    'append', 'count', 'extend', 'extendleft', 'index', 'insert', 'pop', 
    'remove', 'reverse', 'sort', 'appendleft', 'popleft', 'rotate', 
    '__imul__'
    ))
class DequeProxy(BaseDequeProxy):
    def __iadd__(self, value):
        self._callmethod('extend', (value,))
        return self
    def __imul__(self, value):
        self._callmethod('__imul__', (value,))
        return self

DequeManager.register('deque', deque, DequeProxy)


cur_best = d_sol = d_names = None

def init_globals(best, sol, names):
    """ This will be called in each worker process. 

    A global variable (cur_best) will be created in each worker.
    Because it is a multiprocessing.Value, it will be shared
    between each worker, too.

    """
    global cur_best, d_sol, d_names
    cur_best = best
    d_sol = sol
    d_names = names

def calculate(vals):
    global cur_best
    sol = sum(int(x[2]) for x in vals)
    if sol > cur_best.value:
        cur_best.value = sol
        names = [x[0] for x in vals]
        print(", ".join(names) + " = " + str(cur_best.value))
        d_sol.append(cur_best.value)
        d_names.append(names)
    return sol

def process():
    global d_sol, d_names
    cur_best = multiprocessing.Value("I", 0)  # unsigned int

    m = DequeManager()
    m.start()
    d_sol = m.deque(maxlen=9)
    d_names = m.deque(maxlen=9)  

    pool = multiprocessing.Pool(processes=4, initializer=init_globals, 
                                initargs=(cur_best, d_sol, d_names))
    prod = itertools.product([x[2], x[4], x[10]] for x in Data1)
    result = pool.map(calculate, prod)  # map instead of map_async
    pool.close()
    pool.join()
    return result  # Result will be a list containing the value of `sol` returned from each worker call

if __name__ == "__main__":    
    print(process())

撰写回答