多处理停止程序执行

2024-04-27 18:52:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我是一个多处理机的noob,我正在尝试加速我的一个旧算法。它工作得非常好,没有多处理,但在我尝试实现它的那一刻,程序停止工作:它一直待命,直到我中止脚本。 另一个问题是,它不会填充数据帧:同样,它正常工作,但对于多处理,它只返回NaN

func运作良好

stockUniverse = list(map(lambda s: s.strip(), Stocks)) #Stocks = list

def func(i):

    df.at[i, 'A'] = 1
    df.at[i, 'B'] = 2
    df.at[i, 'C'] = 3
    print(i, 'downloaded')
    return True


if __name__ == "__main__":
    print('Start')

    pool = mp.Pool(mp.cpu_count())
    pool.imap(func, stockUniverse)
    print(df)

结果是:

Index 19  NaN  NaN  NaN
index 20  NaN  NaN  NaN

然后它停在那里直到我点击Ctrl+C。 谢谢


Tags: 程序脚本算法dfmpnanatlist
1条回答
网友
1楼 · 发布于 2024-04-27 18:52:10

map函数阻塞,直到所有提交的任务都完成,并从worker函数返回返回值的列表。但是imap函数立即返回,并带有一个迭代器,该迭代器必须迭代以在每个值可用时逐个返回。您的原始代码没有迭代该迭代器,而是立即打印出它期望的是更新的df。但是您没有给任务足够的时间来开始和完成df的修改。理论上,如果在print语句之前插入对time.sleep的调用足够长的时间,那么在打印出df之前,任务就已经开始并完成了。但显然,迭代迭代器是确保所有任务都已完成的最有效方法,也是返回值的唯一方法

但是,正如我在评论中提到的,你有一个更大的问题。您提交的任务由您创建的进程池中的进程调用的辅助函数func执行,每个进程都在自己的地址空间中执行。您没有在运行的平台上标记您的问题(每当您使用multiprocessing标记问题时,您也应该使用平台标记问题),但我可以推断您运行的平台使用spawn方法创建新进程,例如Windows,这就是为什么您使用if __name__ == "__main__":创建新进程(即处理池)的块控制代码。当使用spawn创建新进程时,将创建一个新的空地址空间,启动一个新的Python解释器,并从顶部重新执行源代码(如果没有创建新进程的if __name__ == "__main__":块控制代码,您将进入创建新进程的无限递归循环)。但这意味着在if __name__ == "__main__":块(如果您在Windows下运行,则必须忽略该块)之外的全局范围内对df的任何定义将在创建每个进程时为池中的每个进程创建一个新的单独实例

如果您在Linux下运行,其中fork用于创建新进程,那么情况就有点不同了。新进程将从主进程和所有声明的变量继承原始地址空间,但使用写时复制。这意味着,一旦子流程尝试修改此继承存储中的任何变量,就会创建页面的副本,并且该流程现在将处理自己的副本。因此,为了更新,不能共享任何内容

因此,您应该修改程序,使辅助函数值返回主进程,主进程将执行必要的更新:

import multiprocessing as mp
import pandas as pd

def func(stock):
    return (stock, (('A', 1), ('B', 1), ('C', 1)))

if __name__ == "__main__":
    stockUniverse = ['abc', 'def', 'ghi', 'klm']
    d = {col: pd.Series(index=stockUniverse, dtype='int32') for col in ['A', 'B', 'C']}
    df = pd.DataFrame(d)

    pool_size = min(mp.cpu_count(), len(stockUniverse))
    pool = mp.Pool(pool_size)
    for result in pool.imap_unordered(func, stockUniverse):
        stock, col_values = result # unpack
        for col_value in col_values:
            col, value = col_value # unpack
            df.at[stock, col] = value
    print(df)

印刷品:

     A  B  C
abc  1  1  1
def  1  1  1
ghi  1  1  1
klm  1  1  1

请注意,我使用了imap_unordered而不是imap。前一种方法允许以任意顺序(即当结果可用时)返回结果,通常效率更高,而且由于返回值包含设置正确的df行所需的所有信息,因此我们不再需要任何特定的顺序

但是:

如果您的辅助函数基本上什么也不做,只是从网站下载,并且CPU密集型处理很少,那么您可以(应该)通过简单地替换以下内容来使用线程池:

from multiprocessing.pool import ThreadPool
...
    MAX_THREADS_TO_USE = 100 # or maybe even larger!!!
    pool_size = min(MAX_THREADS_TO_USE, len(stockUniverse))
    pool = ThreadPool(pool_size)

而且由于所有线程共享相同的地址空间,您可以按原样使用原始辅助函数func

相关问题 更多 >