我并行化了吗?

2024-05-07 23:49:54 发布

您现在位置:Python中文网/ 问答频道 /正文

基本上,我必须从5000个矩阵的样本中取样,以获得一定的概率分布。所以我只需要计算元素X在这5000个矩阵的(I,j)位置出现的次数。然后,我将这些[值和计数]保存在字典中

也就是说,我认为并行化我的代码可能是个好主意,因为串行代码的运行速度会非常慢。 代码如下:

import multiprocessing as mp
import numpy as np


def func(N):
    d = {}

    filenames = ["file" + str(k) + ".txt" for k in range(N, N + 1000)]
    ##each of these files is a 306x306 matrix
    for i in range(306):
        data = np.vstack([np.loadtxt(f, delimiter=",", usecols=i) for f in filenames])
        for j in range(306):
            values, counts = np.unique(data.T[j], return_counts=True)
            for i in values:
                d[i] = counts[np.where(values == i)]
    return d


if __name__ == "__main__":
    N = mp.cpu_count()
    with mp.Pool(processes=N) as p:
        results = p.map(func, [m for m in range(1000, 5000, 1000)])

因为这是我第一次并行化一个函数,我想得到一些反馈。此外,由于它必须加载一个1000x306矩阵,因此速度仍然很慢,因此任何关于如何改进它的建议都是非常受欢迎的


Tags: 代码inimportfordataasnprange
1条回答
网友
1楼 · 发布于 2024-05-07 23:49:54

根据这一描述:

how many times element X occurs in the position (i,j) of these 5000 matrices

我将重新构造您的代码,以返回306x306字典数组,其中包含该位置出现的每个值的键,以及该值出现多少次的值。然后,您可以并行生成文件子集的数据,然后在最后合并数据。您应该调整chunksize以一次加载多个文件(尽可能多的内存),以减少手动循环数组索引的次数。在访问arr[:,i,j]时,将数据重新排序为“Fortran”顺序应该会使数组访问更高效(对np.unique的调用会更快)

arr_shape = (5, 5)
chunksize = 10 #load chunks of 200 files at a time
start_n = 0
end_n = 100

def func(N):
    #unpack args from input tuple
    filenames = ["file" + str(k) + ".txt" for k in range(N[0], N[1])]
    
    #load and stack all the arrays into single array
    arr = np.stack([np.loadtxt(f, delimiter=",") for f in filenames])
    #re-order data in memory for efficient access along 0th axis (not needed, but likely faster)
    arr = np.asfortranarray(arr) 
    
    res = []
    #the more arrays you can load at once (chunksize), the fewer times we have to go through this inefficient loop
    for i in range(arr_shape[0]):
        res.append(list())
        for j in range(arr_shape[1]):
            #each res[i][j] will be a tuple of (values, counts)
            res[i].append(np.unique(arr[:,i,j], return_counts=True))
    return res

if __name__ == "__main__":

    with mp.Pool() as p:
        
        #build tuples of (start, end) for chunks of arbitrary size
        chunks = []
        for start in range(start_n, end_n, chunksize):
            if start + chunksize > end_n:
                end = end_n
            else:
                end = start + chunksize
            chunks.append((start, end))
            
        #build array of dicts to merge results into
        d = []
        for i in range(arr_shape[0]):
            d.append(list())
            for j in range(arr_shape[1]):
                #each d[i][j] will be a dict of d[value] = count
                d[i].append(defaultdict(int)) #empty values default to 0
                
        #call our "func" in parallel, and get any results as they come in.
        for res in p.imap_unordered(func=func, iterable=chunks):
            #merge the results into d
            for i in range(arr_shape[0]):
                for j in range(arr_shape[1]):
                    #recall result is array of tuples of (values, counts). zip() is an easy way to get them in pairs
                    for value, count in zip(res[i][j][0], res[i][j][1]):
                        d[i][j][value] += count

相关问题 更多 >