子进程已启动后如何共享内存访问权限
我想知道如何让子进程访问共享内存中的数据,而这些数据是在子进程已经创建后才可用的(使用 multiprocessing.Process)。
我知道有 multiprocessing.sharedctypes.RawArray,但我搞不清楚如何让我的子进程访问一个在子进程已经启动后创建的 RawArray
。
这些数据是由父进程生成的,而且数据的数量事先是未知的。
如果不是因为 GIL,我会使用线程,这样会让这个任务简单一些。使用非CPython的实现也不是一个选项。
在查看 muliprocessing.sharedctypes 的底层实现时,发现共享的C类型对象是通过 使用 mmap
的内存 来分配的。
所以这个问题实际上可以简化为:子进程能否访问一个匿名映射的内存,如果 mmap()
是在子进程创建后由父进程调用的?
这有点类似于 这个问题,只是我这里调用 mmap()
的是父进程,而不是子进程。
(已解决)
我创建了自己的 RawArray
版本,底层使用了 shm_open()
。生成的共享C类型数组可以与任何进程共享,只要标识符(tag
)匹配即可。
有关详细信息和示例,请查看 这个回答。
3 个回答
我觉得你在找的是 mmap模块
关于数据的序列化,你可以看看这个问题的回答 这个链接。当然,如果你希望避免复制数据,我没有解决方案。
编辑
实际上,你可以在CPython 3.2中使用非标准库的 _multiprocessing 模块来获取mmap对象的地址,并用ctypes对象的from_address来使用它。实际上,这就是RawArray的作用。当然,你不应该尝试调整mmap对象的大小,因为在这种情况下,mmap的地址可能会改变。
import mmap
import _multiprocessing
from ctypes import Structure,c_int
map = mmap.mmap(-1,4)
class A(Structure):
_fields_ = [("x", c_int)]
x = _multiprocessing.address_of_buffer(map)
b=A.from_address(x[0])
b.x = 256
>>> map[0:4]
'\x00\x01\x00\x00'
在创建子进程后,要暴露内存,你必须用一个真实的文件来映射你的内存,也就是调用
map = mmap.mmap(open("hello.txt", "r+b").fileno(),4)
免责声明:我是这个问题的提问者。
最后,我使用了 posix_ipc 模块,创建了我自己的版本的 RawArray。我主要使用了 posix_ipc.SharedMemory
,它在后台调用了 shm_open()
。
我的实现(ShmemRawArray
)提供了与 RawArray
相同的功能,但需要两个额外的参数——一个 tag
用于唯一标识共享内存区域,以及一个 create
标志,用来决定我们是要创建一个新的共享内存段,还是连接到一个已经存在的内存段。
如果有人感兴趣,这里有个链接: https://gist.github.com/1222327
ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)
使用说明:
- 前两个参数(
typecode_or_type
和size_or_initializer
)的用法和RawArray
是一样的。 - 只要
tag
匹配,任何进程都可以访问这个共享数组。 - 当原始对象(由
ShmemRawArray(..., create=True)
返回)被删除时,共享内存段会被解除链接。 - 使用当前已存在的
tag
创建共享数组会引发ExistentialError
错误。 - 使用不存在的
tag
(或者已经解除链接的tag
)访问共享数组也会引发ExistentialError
错误。
这是一个 SSCCE(简短、自包含、可编译的示例),展示了它的实际应用。
#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray
class Point(ctypes.Structure):
_fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]
def worker(q):
# get access to ctypes array shared by parent
count, tag = q.get()
shared_data = ShmemRawArray(Point, count, tag, False)
proc_name = multiprocessing.current_process().name
print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
if __name__ == '__main__':
procs = []
np = multiprocessing.cpu_count()
queue = multiprocessing.Queue()
# spawn child processes
for i in xrange(np):
p = multiprocessing.Process(target=worker, args=(queue,))
procs.append(p)
p.start()
# create a unique tag for shmem segment
tag = "stack-overflow-%d" % multiprocessing.current_process().pid
# random number of points with random data
count = randint(3,10)
combined_data = [Point(x=random(), y=random()) for i in xrange(count)]
# create ctypes array in shared memory using ShmemRawArray
# - we won't be able to use multiprocssing.sharectypes.RawArray here
# because children already spawned
shared_data = ShmemRawArray(Point, combined_data, tag)
# give children info needed to access ctypes array
for p in procs:
queue.put((count, tag))
print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
for p in procs:
p.join()
运行这个会得到以下输出:
[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
你的问题听起来很适合使用posix_ipc
或sysv_ipc
模块,这些模块提供了POSIX或SysV的共享内存、信号量和消息队列的接口。这里面有很多很好的建议,可以帮助你选择合适的模块。
匿名的mmap(2)
区域的问题在于,你不能轻松地与其他进程共享它们。如果这些区域是基于文件的,那就简单多了,但如果你其实不需要这个文件做其他事情,那就显得有点多余。你可以在C语言中使用CLONE_VM
标志来调用clone(2)
,但我不建议在可能对内存安全有假设的语言解释器中使用它。(即使在C中,这也有点危险,因为五年后维护这个代码的程序员可能也会对CLONE_VM
的行为感到惊讶。)
不过,SysV和更新的POSIX共享内存映射允许即使是没有关系的进程也可以通过标识符来附加和分离共享内存。因此,你只需要将创建映射的进程的标识符与使用这些映射的进程共享,然后当你在映射中操作数据时,这些数据对所有进程都是可用的,而且没有额外的解析开销。shm_open(3)
函数返回一个int
,这个值在后续调用ftruncate(2)
和mmap(2)
时用作文件描述符,这样其他进程就可以使用共享内存段,而不需要在文件系统中创建文件——而且即使所有使用它的进程都退出了,这块内存仍然会存在。(这对Unix来说可能有点奇怪,但它很灵活。)