为什么我的多处理代码停止处理大型数据集?

2024-05-23 14:27:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图计算特征的平方矩阵(Information_Gains_Matrix)和相应的平方权重矩阵(Weights_Matrix)的莫兰指数。对于Information_Gains_Matrix中的每个特征,我想在Weights_Matrix是固定的情况下计算莫兰指数

因此,我尝试使用multiprocessing pool.map来处理Information_Gains_Matrix的每个特性。我可以让代码以各种方式在小型测试数据集上执行此操作。但是,当我使用实际的大数据集时,代码会运行,但是CPU使用率会下降到0%,进程会挂起,并且不会释放任何内容

我曾经尝试过使用全局变量和共享变量,以防内存问题,我也尝试过使用不同的队列方法,以防这样可以解决问题,但没有成功。下面的代码是这些示例中的一个,适用于小数据集,但不适用于大数据集

import multiprocessing
from multiprocessing import RawArray, Pool, Lock
from functools import partial 
import numpy as np

## Set up initial fake data

Information_Gains_Matrix = np.random.uniform(0,1,(22000,22000))
Weights_Matrix = np.random.uniform(0,1,(22000,22000))

## Function I want to parallelise.  
def Feature_Moran_Index(Chunks,Wij,N):   
    Moran_Index_Scores = np.zeros(Chunks.shape[0])
    for i in np.arange(Chunks.shape[0]):
        print(Chunks[i]) # Print ind to show it's running
        Feature = Information_Gains_Matrix[Chunks[i],:]    
        X_bar = np.mean(Feature)
        if X_bar != 0:
            Deviance = Feature - X_bar
            Outer_Deviance = np.outer(Deviance,Deviance)
            Deviance2 = Deviance * Deviance
            Denom = np.sum(Deviance2)
            Moran_Index_Scores[i] = (N/Wij) * (np.sum((W * np.ndarray.flatten(Outer_Deviance)))/Denom)
    return Moran_Index_Scores

## Set up chunks inds for each core.
Use_Cores = (multiprocessing.cpu_count()-2)
Chunk_Size = np.ceil(Information_Gains_Matrix.shape[0] / Use_Cores)
Range = np.arange(Information_Gains_Matrix.shape[0]).astype("i")
Chunk_Range = np.arange(Chunk_Size).astype("i")
Chunks = []
for i in np.arange(Use_Cores-1):
    Chunks.append(Range[Chunk_Range])
    Range = np.delete(Range,Chunk_Range)

Chunks.append(Range)

if __name__ == '__main__':
    W = RawArray('d', Information_Gains_Matrix.shape[0] * Information_Gains_Matrix.shape[1])
    W_np = np.frombuffer(W, dtype=np.float64).reshape((Information_Gains_Matrix.shape[0], Information_Gains_Matrix.shape[1]))
    np.copyto(W_np, Weights_Matrix)
    N = Information_Gains_Matrix.shape[0]
    Wij = np.sum(Weights_Matrix)  
    with Pool(processes=Use_Cores) as pool:
        Results = pool.map(partial(Feature_Moran_Index, Wij=Wij,N=N), Chunks)

Moran_Index_Score = np.concatenate(Results)

我对这种方法并不忠诚,如果有人能以任何方式帮助我并行计算各个特征的莫兰指数,我将不胜感激,因为我似乎无法让它发挥作用


Tags: indexinformationnprangemultiprocessingmatrixchunksfeature
1条回答
网友
1楼 · 发布于 2024-05-23 14:27:11

Feature_Moran_Index中,Deviance具有形状(22000,),并且Outer_Deviance具有形状(22000, 22000),并且使用3.8GB的RAM

数量

np.sum(W * np.ndarray.flatten(Outer_Deviance))

相等于

np.sum(W_np * Outer_Deviance)

相等于

Deviance @ W_np @ Deviance

用最后一个表达式替换第一个表达式并删除Outer_Deviance的定义后,程序运行到结束,内存使用量为c。11GB

相关问题 更多 >