如何在Python子进程间传递大numpy数组而不保存到磁盘?

29 投票
6 回答
16695 浏览
提问于 2025-04-16 12:01

有没有什么好的方法可以在两个 Python 子进程之间传递大量数据,而不使用磁盘?下面是我希望实现的一个简单示例:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        data.dump('data.pkl')
        sys.stdout.write('data.pkl' + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    print proc.stdout.readline().rstrip()
    a = numpy.load('data.pkl')
    print a.shape

proc.stdin.write('done\n')

这个代码创建了一个子进程,它生成一个 numpy 数组并将这个数组保存到磁盘上。然后,父进程从磁盘加载这个数组。这样是可以工作的!

但问题是,我们的硬件生成数据的速度是磁盘读写速度的 10 倍。有没有办法在内存中直接从一个 Python 进程传输数据到另一个进程,甚至不需要复制数据?我能不能像传引用那样传递数据?

我第一次尝试在内存中传输数据的效果非常糟糕:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        ##Note that this is NFG if there's a '10' in the array:
        sys.stdout.write(data.tostring() + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8)
    print a.shape

proc.stdin.write('done\n')

这个方法非常慢(比保存到磁盘还慢),而且非常脆弱。肯定有更好的方法!

我并不一定要使用 'subprocess' 模块,只要数据处理的过程不阻塞父应用程序就可以。我也尝试过 'multiprocessing',但到目前为止没有成功。

背景:我们有一台硬件设备,每秒可以生成大约 2 GB 的数据,这些数据以一系列 ctypes 缓冲区的形式出现。处理这些缓冲区的 Python 代码已经忙得不可开交,根本无法应对如此大量的信息。我想在一个“主”程序中协调这些信息流,同时让多个硬件设备同时运行,而不让子进程互相阻塞。我现在的做法是在子进程中先对数据进行一些处理,然后再保存到磁盘,但如果能把完整的数据直接传给“主”进程就好了。

6 个回答

5

从其他回答来看,似乎 numpy-sharedmem 是个不错的选择。

不过,如果你需要一个纯 Python 的解决方案,或者安装扩展库像 cython 之类的太麻烦,那么你可以试试下面这段代码,它是 Nadav 代码的简化版:

import numpy, ctypes, multiprocessing

_ctypes_to_numpy = {
    ctypes.c_char   : numpy.dtype(numpy.uint8),
    ctypes.c_wchar  : numpy.dtype(numpy.int16),
    ctypes.c_byte   : numpy.dtype(numpy.int8),
    ctypes.c_ubyte  : numpy.dtype(numpy.uint8),
    ctypes.c_short  : numpy.dtype(numpy.int16),
    ctypes.c_ushort : numpy.dtype(numpy.uint16),
    ctypes.c_int    : numpy.dtype(numpy.int32),
    ctypes.c_uint   : numpy.dtype(numpy.uint32),
    ctypes.c_long   : numpy.dtype(numpy.int64),
    ctypes.c_ulong  : numpy.dtype(numpy.uint64),
    ctypes.c_float  : numpy.dtype(numpy.float32),
    ctypes.c_double : numpy.dtype(numpy.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(),
                            _ctypes_to_numpy.keys()))


def shm_as_ndarray(mp_array, shape = None):
    '''Given a multiprocessing.Array, returns an ndarray pointing to
    the same data.'''

    # support SynchronizedArray:
    if not hasattr(mp_array, '_type_'):
        mp_array = mp_array.get_obj()

    dtype = _ctypes_to_numpy[mp_array._type_]
    result = numpy.frombuffer(mp_array, dtype)

    if shape is not None:
        result = result.reshape(shape)

    return numpy.asarray(result)


def ndarray_to_shm(array, lock = False):
    '''Generate an 1D multiprocessing.Array containing the data from
    the passed ndarray.  The data will be *copied* into shared
    memory.'''

    array1d = array.ravel(order = 'A')

    try:
        c_type = _numpy_to_ctypes[array1d.dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)]

    result = multiprocessing.Array(c_type, array1d.size, lock = lock)
    shm_as_ndarray(result)[:] = array1d
    return result

你可以这样使用它:

  1. sa = ndarray_to_shm(a) 把 ndarray a 转换成一个共享的 multiprocessing.Array
  2. multiprocessing.Process(target = somefunc, args = (sa, )(还有 start,可能还要 join)来在一个单独的 进程中调用 somefunc,并传递共享数组。
  3. somefunc 中,使用 a = shm_as_ndarray(sa) 来获取一个指向共享数据的 ndarray。(其实,你可能想在原始进程中,创建 sa 后立即做同样的事情,这样就能有两个 ndarray 指向同样的数据。)

据我所知,你不需要把锁设置为 True,因为 shm_as_ndarray 本身不会使用锁。如果你需要锁的话,可以把锁设置为 True,然后在 sa 上调用 acquire/release。

另外,如果你的数组不是一维的,可能还需要把形状也一起传递给 sa(例如,使用 args = (sa, a.shape))。

这个解决方案的好处是,不需要额外的包或扩展模块,除了 multiprocessing(这个是标准库里自带的)。

10

基本上,你想在不同的程序之间共享一块内存,并把它当作一个numpy数组来使用,对吧?

如果是这样的话,可以看看这个(这是Nadav Horesh之前在numpy讨论区发的,不是我的作品)。有几个类似的实现(有些更灵活),但它们基本上都使用了这个原理。

#    "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"
# Modified and corrected by Nadav Horesh, Mar 2010
# No rights reserved


import numpy as N
import ctypes
import multiprocessing as MP

_ctypes_to_numpy = {
    ctypes.c_char   : N.dtype(N.uint8),
    ctypes.c_wchar  : N.dtype(N.int16),
    ctypes.c_byte   : N.dtype(N.int8),
    ctypes.c_ubyte  : N.dtype(N.uint8),
    ctypes.c_short  : N.dtype(N.int16),
    ctypes.c_ushort : N.dtype(N.uint16),
    ctypes.c_int    : N.dtype(N.int32),
    ctypes.c_uint   : N.dtype(N.uint32),
    ctypes.c_long   : N.dtype(N.int64),
    ctypes.c_ulong  : N.dtype(N.uint64),
    ctypes.c_float  : N.dtype(N.float32),
    ctypes.c_double : N.dtype(N.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))


def shmem_as_ndarray(raw_array, shape=None ):

    address = raw_array._obj._wrapper.get_address()
    size = len(raw_array)
    if (shape is None) or (N.asarray(shape).prod() != size):
        shape = (size,)
    elif type(shape) is int:
        shape = (shape,)
    else:
        shape = tuple(shape)

    dtype = _ctypes_to_numpy[raw_array._obj._type_]
    class Dummy(object): pass
    d = Dummy()
    d.__array_interface__ = {
        'data' : (address, False),
        'typestr' : dtype.str,
        'descr' :   dtype.descr,
        'shape' : shape,
        'strides' : None,
        'version' : 3}
    return N.asarray(d)

def empty_shared_array(shape, dtype, lock=True):
    '''
    Generate an empty MP shared array given ndarray parameters
    '''

    if type(shape) is not int:
        shape = N.asarray(shape).prod()
    try:
        c_type = _numpy_to_ctypes[dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[N.dtype(dtype)]
    return MP.Array(c_type, shape, lock=lock)

def emptylike_shared_array(ndarray, lock=True):
    'Generate a empty shared array with size and dtype of a  given array'
    return empty_shared_array(ndarray.size, ndarray.dtype, lock)
30

在网上查找关于Joe Kington发布的代码时,我发现了一个叫做 numpy-sharedmem 的包。根据这个 numpy/multiprocessing 教程,它似乎有着相似的背景(可能大部分作者都是同一批人?——我不太确定)。

使用这个sharedmem模块,你可以创建一个共享内存的numpy数组(太棒了!),然后像这样和 multiprocessing 一起使用:

import sharedmem as shm
import numpy as np
import multiprocessing as mp

def worker(q,arr):
    done = False
    while not done:
        cmd = q.get()
        if cmd == 'done':
            done = True
        elif cmd == 'data':
            ##Fake data. In real life, get data from hardware.
            rnd=np.random.randint(100)
            print('rnd={0}'.format(rnd))
            arr[:]=rnd
        q.task_done()

if __name__=='__main__':
    N=10
    arr=shm.zeros(N,dtype=np.uint8)
    q=mp.JoinableQueue()    
    proc = mp.Process(target=worker, args=[q,arr])
    proc.daemon=True
    proc.start()

    for i in range(3):
        q.put('data')
        # Wait for the computation to finish
        q.join()   
        print arr.shape
        print(arr)
    q.put('done')
    proc.join()

运行后会得到

rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]

撰写回答