如何在Python子进程间传递大numpy数组而不保存到磁盘?
有没有什么好的方法可以在两个 Python 子进程之间传递大量数据,而不使用磁盘?下面是我希望实现的一个简单示例:
import sys, subprocess, numpy
cmdString = """
import sys, numpy
done = False
while not done:
cmd = raw_input()
if cmd == 'done':
done = True
elif cmd == 'data':
##Fake data. In real life, get data from hardware.
data = numpy.zeros(1000000, dtype=numpy.uint8)
data.dump('data.pkl')
sys.stdout.write('data.pkl' + '\\n')
sys.stdout.flush()"""
proc = subprocess.Popen( #python vs. pythonw on Windows?
[sys.executable, '-c %s'%cmdString],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for i in range(3):
proc.stdin.write('data\n')
print proc.stdout.readline().rstrip()
a = numpy.load('data.pkl')
print a.shape
proc.stdin.write('done\n')
这个代码创建了一个子进程,它生成一个 numpy 数组并将这个数组保存到磁盘上。然后,父进程从磁盘加载这个数组。这样是可以工作的!
但问题是,我们的硬件生成数据的速度是磁盘读写速度的 10 倍。有没有办法在内存中直接从一个 Python 进程传输数据到另一个进程,甚至不需要复制数据?我能不能像传引用那样传递数据?
我第一次尝试在内存中传输数据的效果非常糟糕:
import sys, subprocess, numpy
cmdString = """
import sys, numpy
done = False
while not done:
cmd = raw_input()
if cmd == 'done':
done = True
elif cmd == 'data':
##Fake data. In real life, get data from hardware.
data = numpy.zeros(1000000, dtype=numpy.uint8)
##Note that this is NFG if there's a '10' in the array:
sys.stdout.write(data.tostring() + '\\n')
sys.stdout.flush()"""
proc = subprocess.Popen( #python vs. pythonw on Windows?
[sys.executable, '-c %s'%cmdString],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for i in range(3):
proc.stdin.write('data\n')
a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8)
print a.shape
proc.stdin.write('done\n')
这个方法非常慢(比保存到磁盘还慢),而且非常脆弱。肯定有更好的方法!
我并不一定要使用 'subprocess' 模块,只要数据处理的过程不阻塞父应用程序就可以。我也尝试过 'multiprocessing',但到目前为止没有成功。
背景:我们有一台硬件设备,每秒可以生成大约 2 GB 的数据,这些数据以一系列 ctypes 缓冲区的形式出现。处理这些缓冲区的 Python 代码已经忙得不可开交,根本无法应对如此大量的信息。我想在一个“主”程序中协调这些信息流,同时让多个硬件设备同时运行,而不让子进程互相阻塞。我现在的做法是在子进程中先对数据进行一些处理,然后再保存到磁盘,但如果能把完整的数据直接传给“主”进程就好了。
6 个回答
从其他回答来看,似乎 numpy-sharedmem 是个不错的选择。
不过,如果你需要一个纯 Python 的解决方案,或者安装扩展库像 cython 之类的太麻烦,那么你可以试试下面这段代码,它是 Nadav 代码的简化版:
import numpy, ctypes, multiprocessing
_ctypes_to_numpy = {
ctypes.c_char : numpy.dtype(numpy.uint8),
ctypes.c_wchar : numpy.dtype(numpy.int16),
ctypes.c_byte : numpy.dtype(numpy.int8),
ctypes.c_ubyte : numpy.dtype(numpy.uint8),
ctypes.c_short : numpy.dtype(numpy.int16),
ctypes.c_ushort : numpy.dtype(numpy.uint16),
ctypes.c_int : numpy.dtype(numpy.int32),
ctypes.c_uint : numpy.dtype(numpy.uint32),
ctypes.c_long : numpy.dtype(numpy.int64),
ctypes.c_ulong : numpy.dtype(numpy.uint64),
ctypes.c_float : numpy.dtype(numpy.float32),
ctypes.c_double : numpy.dtype(numpy.float64)}
_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(),
_ctypes_to_numpy.keys()))
def shm_as_ndarray(mp_array, shape = None):
'''Given a multiprocessing.Array, returns an ndarray pointing to
the same data.'''
# support SynchronizedArray:
if not hasattr(mp_array, '_type_'):
mp_array = mp_array.get_obj()
dtype = _ctypes_to_numpy[mp_array._type_]
result = numpy.frombuffer(mp_array, dtype)
if shape is not None:
result = result.reshape(shape)
return numpy.asarray(result)
def ndarray_to_shm(array, lock = False):
'''Generate an 1D multiprocessing.Array containing the data from
the passed ndarray. The data will be *copied* into shared
memory.'''
array1d = array.ravel(order = 'A')
try:
c_type = _numpy_to_ctypes[array1d.dtype]
except KeyError:
c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)]
result = multiprocessing.Array(c_type, array1d.size, lock = lock)
shm_as_ndarray(result)[:] = array1d
return result
你可以这样使用它:
- 用
sa = ndarray_to_shm(a)
把 ndarraya
转换成一个共享的 multiprocessing.Array。 - 用
multiprocessing.Process(target = somefunc, args = (sa, )
(还有start
,可能还要join
)来在一个单独的 进程中调用somefunc
,并传递共享数组。 - 在
somefunc
中,使用a = shm_as_ndarray(sa)
来获取一个指向共享数据的 ndarray。(其实,你可能想在原始进程中,创建sa
后立即做同样的事情,这样就能有两个 ndarray 指向同样的数据。)
据我所知,你不需要把锁设置为 True,因为 shm_as_ndarray
本身不会使用锁。如果你需要锁的话,可以把锁设置为 True,然后在 sa
上调用 acquire/release。
另外,如果你的数组不是一维的,可能还需要把形状也一起传递给 sa(例如,使用 args = (sa, a.shape)
)。
这个解决方案的好处是,不需要额外的包或扩展模块,除了 multiprocessing(这个是标准库里自带的)。
基本上,你想在不同的程序之间共享一块内存,并把它当作一个numpy数组来使用,对吧?
如果是这样的话,可以看看这个(这是Nadav Horesh之前在numpy讨论区发的,不是我的作品)。有几个类似的实现(有些更灵活),但它们基本上都使用了这个原理。
# "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"
# Modified and corrected by Nadav Horesh, Mar 2010
# No rights reserved
import numpy as N
import ctypes
import multiprocessing as MP
_ctypes_to_numpy = {
ctypes.c_char : N.dtype(N.uint8),
ctypes.c_wchar : N.dtype(N.int16),
ctypes.c_byte : N.dtype(N.int8),
ctypes.c_ubyte : N.dtype(N.uint8),
ctypes.c_short : N.dtype(N.int16),
ctypes.c_ushort : N.dtype(N.uint16),
ctypes.c_int : N.dtype(N.int32),
ctypes.c_uint : N.dtype(N.uint32),
ctypes.c_long : N.dtype(N.int64),
ctypes.c_ulong : N.dtype(N.uint64),
ctypes.c_float : N.dtype(N.float32),
ctypes.c_double : N.dtype(N.float64)}
_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))
def shmem_as_ndarray(raw_array, shape=None ):
address = raw_array._obj._wrapper.get_address()
size = len(raw_array)
if (shape is None) or (N.asarray(shape).prod() != size):
shape = (size,)
elif type(shape) is int:
shape = (shape,)
else:
shape = tuple(shape)
dtype = _ctypes_to_numpy[raw_array._obj._type_]
class Dummy(object): pass
d = Dummy()
d.__array_interface__ = {
'data' : (address, False),
'typestr' : dtype.str,
'descr' : dtype.descr,
'shape' : shape,
'strides' : None,
'version' : 3}
return N.asarray(d)
def empty_shared_array(shape, dtype, lock=True):
'''
Generate an empty MP shared array given ndarray parameters
'''
if type(shape) is not int:
shape = N.asarray(shape).prod()
try:
c_type = _numpy_to_ctypes[dtype]
except KeyError:
c_type = _numpy_to_ctypes[N.dtype(dtype)]
return MP.Array(c_type, shape, lock=lock)
def emptylike_shared_array(ndarray, lock=True):
'Generate a empty shared array with size and dtype of a given array'
return empty_shared_array(ndarray.size, ndarray.dtype, lock)
在网上查找关于Joe Kington发布的代码时,我发现了一个叫做 numpy-sharedmem 的包。根据这个 numpy/multiprocessing 教程,它似乎有着相似的背景(可能大部分作者都是同一批人?——我不太确定)。
使用这个sharedmem模块,你可以创建一个共享内存的numpy数组(太棒了!),然后像这样和 multiprocessing 一起使用:
import sharedmem as shm
import numpy as np
import multiprocessing as mp
def worker(q,arr):
done = False
while not done:
cmd = q.get()
if cmd == 'done':
done = True
elif cmd == 'data':
##Fake data. In real life, get data from hardware.
rnd=np.random.randint(100)
print('rnd={0}'.format(rnd))
arr[:]=rnd
q.task_done()
if __name__=='__main__':
N=10
arr=shm.zeros(N,dtype=np.uint8)
q=mp.JoinableQueue()
proc = mp.Process(target=worker, args=[q,arr])
proc.daemon=True
proc.start()
for i in range(3):
q.put('data')
# Wait for the computation to finish
q.join()
print arr.shape
print(arr)
q.put('done')
proc.join()
运行后会得到
rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]