第二个multiprocessing.Pool中找不到Multiprocessing SharedMemory
在我的代码中,我使用多进程池生成数据,然后在另一个池里处理这些数据。到目前为止,我的做法是把data
保存到磁盘上,然后再加载回来。现在我想把数据保存在内存中,所以我采取了这个方法。我使用了多进程的共享内存来来回传递我正在处理的BytesIO对象(这样,多进程就不需要对数据进行序列化了)。
from io import BytesIO
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
def generate_data(_dummy_arg) -> str:
data = BytesIO(bytearray(1024 * 1024 * 38))
# prepare SharedMemory to send the bytes back to main thread
buf = data.getbuffer()
shared_memory = SharedMemory(create=True, size=buf.nbytes)
shared_memory.buf[:] = buf
shared_memory.close()
return shared_memory.name
def process_data(data_name: str) -> str:
# recover the data from its name, this is where the error happens
data = SharedMemory(data_name)
# FileNotFoundError: [Errno 2] No such file or directory: '/psm_607cb218'
return "some_result"
datas: list[SharedMemory] = []
with Pool(5) as p:
for data_name in p.map(generate_data, range(5)):
# recover SharedMemory from the name
data = SharedMemory(data_name)
datas.append(data)
# some code
with Pool(5) as p:
for returned in p.map(process_data, (data.name for data in datas)):
...
但是这个实现会在第二个池中出现一个FileNotFoundError: [Errno 2] No such file or directory: '/psm_201b67a1'
的错误。就好像共享内存对象不见了一样。有没有什么想法?
编辑:如果我把数据的大小减小到,比如说1024 * 1024 * 10
(10MB),那么这个方法就能正常工作。
2 个回答
0
这可能是因为当调用了SharedMemory的close()方法时,它的实例就被解除链接了。根据我所知道的,这是一个仍未修复的bug。作为一种解决方法,可以尝试在处理完数据后再调用close方法。
def process_data(data_name: str) -> str:
data = SharedMemory(data_name)
# Process the data here...
# After processing, close the shared memory
data.close()
return "some_result"
1
一旦创建了共享内存段,它会一直存在,直到被解除链接。
关闭共享内存并不会解除链接。完成使用后,最好关闭这个内存段。为了确保这一点,使用一个简单的上下文管理器会更方便,这样可以自动处理。
在这个例子中,使用多进程池并不重要,但它是为了与提问者的做法保持一致。
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
from io import BytesIO
BUFSIZ = 30 * 1024 * 1024
SHMNAME = "CtrlZ"
class MySharedMemory(SharedMemory):
def __init__(self, name=None, create=False, size=0):
super().__init__(name, create, size)
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
def loadshm():
data = BytesIO(bytearray(BUFSIZ))
with MySharedMemory(SHMNAME, True, BUFSIZ) as shm:
shm.buf[:BUFSIZ] = data.getbuffer()
def unloadshm():
with MySharedMemory(SHMNAME) as shm:
print(len(bytes(shm.buf)) == BUFSIZ)
def main():
try:
with Pool() as pool:
pool.apply(loadshm)
pool.apply(unloadshm)
finally:
MySharedMemory(SHMNAME).unlink()
if __name__ == "__main__":
main()
输出:
True