Python 多进程数组

3 投票
2 回答
1341 浏览
提问于 2025-04-18 06:28

尽管有很多看起来相似的问题和答案,但我还是想说说我的情况:

我有一个相当大的二维numpy数组,想要用多进程的方式逐行处理它。对于每一行,我需要找到特定的数字值,并用这些值来设置另一个二维numpy数组中的值。举个小例子(实际上我的数组大约有10000x10000个单元格):

import numpy as np
inarray = np.array([(1.5,2,3), (4,5.1,6), (2.7, 4.8, 4.3)])
outarray = np.array([(0.0,0.0,0.0), (0.0,0.0,0.0), (0.0,0.0,0.0)])

我现在想要逐行处理这个inarray,使用多进程来找到inarray中所有大于5的单元格(比如inarray[1,1]和inarray[1,2]),并将outarray中对应的单元格(索引位置都减一)设置为1(比如outarray[0,0]和outarray[0,1])。

我看过这个链接这个链接以及这个链接,但很遗憾,我还是不知道该怎么做。求助!

2 个回答

0

我觉得使用多进程可能不是个好主意,因为你想让多个进程去修改同一个对象。我认为这样做并不太好。我明白通过多个进程来查找索引会很方便,但为了把数据发送到另一个进程,这个对象在内部会被序列化(也就是“打包”),这是我所知道的情况。

请试试这个,告诉我们它是否非常慢:

outarray[inarray[1:,1:] > 5] = 1
outarray

array([[ 1.,  1.,  0.],
       [ 0.,  0.,  0.],
       [ 0.,  0.,  0.]])
2

如果你能使用最新的numpy开发版本,那么你可以用多线程来代替多进程。因为几个月前合并的这个更新,numpy在索引的时候会释放全局解释器锁(GIL),所以你可以这样做:

import numpy as np
import threading

def target(in_, out):
    out[in_ > .5] = 1

def multi_threaded(a, thread_count=3):
    b = np.zeros_like(a)
    chunk = len(a) // thread_count
    threads = []
    for j in xrange(thread_count):
        sl_a = slice(1 + chunk*j,
                     a.shape[0] if j == thread_count-1 else 1 + chunk*(j+1),
                     None)
        sl_b = slice(sl_a.start-1, sl_a.stop-1, None)
        threads.append(threading.Thread(target=target, args=(a[sl_a, 1:],
                                                             b[sl_b, :-1])))
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return b

现在你可以做一些这样的事情:

In [32]: a = np.random.rand(100, 100000)

In [33]: %timeit multi_threaded(a, 1)
1 loops, best of 3: 121 ms per loop

In [34]: %timeit multi_threaded(a, 2)
10 loops, best of 3: 86.6 ms per loop

In [35]: %timeit multi_threaded(a, 3)
10 loops, best of 3: 79.4 ms per loop

撰写回答