Python 多进程数组
尽管有很多看起来相似的问题和答案,但我还是想说说我的情况:
我有一个相当大的二维numpy数组,想要用多进程的方式逐行处理它。对于每一行,我需要找到特定的数字值,并用这些值来设置另一个二维numpy数组中的值。举个小例子(实际上我的数组大约有10000x10000个单元格):
import numpy as np
inarray = np.array([(1.5,2,3), (4,5.1,6), (2.7, 4.8, 4.3)])
outarray = np.array([(0.0,0.0,0.0), (0.0,0.0,0.0), (0.0,0.0,0.0)])
我现在想要逐行处理这个inarray,使用多进程来找到inarray中所有大于5的单元格(比如inarray[1,1]和inarray[1,2]),并将outarray中对应的单元格(索引位置都减一)设置为1(比如outarray[0,0]和outarray[0,1])。
2 个回答
0
我觉得使用多进程可能不是个好主意,因为你想让多个进程去修改同一个对象。我认为这样做并不太好。我明白通过多个进程来查找索引会很方便,但为了把数据发送到另一个进程,这个对象在内部会被序列化(也就是“打包”),这是我所知道的情况。
请试试这个,告诉我们它是否非常慢:
outarray[inarray[1:,1:] > 5] = 1
outarray
array([[ 1., 1., 0.],
[ 0., 0., 0.],
[ 0., 0., 0.]])
2
如果你能使用最新的numpy开发版本,那么你可以用多线程来代替多进程。因为几个月前合并的这个更新,numpy在索引的时候会释放全局解释器锁(GIL),所以你可以这样做:
import numpy as np
import threading
def target(in_, out):
out[in_ > .5] = 1
def multi_threaded(a, thread_count=3):
b = np.zeros_like(a)
chunk = len(a) // thread_count
threads = []
for j in xrange(thread_count):
sl_a = slice(1 + chunk*j,
a.shape[0] if j == thread_count-1 else 1 + chunk*(j+1),
None)
sl_b = slice(sl_a.start-1, sl_a.stop-1, None)
threads.append(threading.Thread(target=target, args=(a[sl_a, 1:],
b[sl_b, :-1])))
for t in threads:
t.start()
for t in threads:
t.join()
return b
现在你可以做一些这样的事情:
In [32]: a = np.random.rand(100, 100000)
In [33]: %timeit multi_threaded(a, 1)
1 loops, best of 3: 121 ms per loop
In [34]: %timeit multi_threaded(a, 2)
10 loops, best of 3: 86.6 ms per loop
In [35]: %timeit multi_threaded(a, 3)
10 loops, best of 3: 79.4 ms per loop