第二个multiprocessing.Pool中找不到Multiprocessing SharedMemory

0 投票
2 回答
30 浏览
提问于 2025-04-13 15:36

在我的代码中,我使用多进程池生成数据,然后在另一个池里处理这些数据。到目前为止,我的做法是把data保存到磁盘上,然后再加载回来。现在我想把数据保存在内存中,所以我采取了这个方法。我使用了多进程的共享内存来来回传递我正在处理的BytesIO对象(这样,多进程就不需要对数据进行序列化了)。

from io import BytesIO
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory


def generate_data(_dummy_arg) -> str:
    data = BytesIO(bytearray(1024 * 1024 * 38))

    # prepare SharedMemory to send the bytes back to main thread
    buf = data.getbuffer()
    shared_memory = SharedMemory(create=True, size=buf.nbytes)
    shared_memory.buf[:] = buf
    shared_memory.close()
    return shared_memory.name


def process_data(data_name: str) -> str:
    # recover the data from its name, this is where the error happens
    data = SharedMemory(data_name)
    # FileNotFoundError: [Errno 2] No such file or directory: '/psm_607cb218'
    return "some_result"


datas: list[SharedMemory] = []
with Pool(5) as p:
    for data_name in p.map(generate_data, range(5)):
        # recover SharedMemory from the name
        data = SharedMemory(data_name)
        datas.append(data)

# some code

with Pool(5) as p:
    for returned in p.map(process_data, (data.name for data in datas)):
        ...

但是这个实现会在第二个池中出现一个FileNotFoundError: [Errno 2] No such file or directory: '/psm_201b67a1'的错误。就好像共享内存对象不见了一样。有没有什么想法?

编辑:如果我把数据的大小减小到,比如说1024 * 1024 * 10(10MB),那么这个方法就能正常工作。

2 个回答

0

这可能是因为当调用了SharedMemory的close()方法时,它的实例就被解除链接了。根据我所知道的,这是一个仍未修复的bug。作为一种解决方法,可以尝试在处理完数据后再调用close方法。

def process_data(data_name: str) -> str:
    data = SharedMemory(data_name)
    # Process the data here...
    # After processing, close the shared memory
    data.close()
    return "some_result"
1

一旦创建了共享内存段,它会一直存在,直到被解除链接。

关闭共享内存并不会解除链接。完成使用后,最好关闭这个内存段。为了确保这一点,使用一个简单的上下文管理器会更方便,这样可以自动处理。

在这个例子中,使用多进程池并不重要,但它是为了与提问者的做法保持一致。

from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
from io import BytesIO

BUFSIZ = 30 * 1024 * 1024
SHMNAME = "CtrlZ"


class MySharedMemory(SharedMemory):
    def __init__(self, name=None, create=False, size=0):
        super().__init__(name, create, size)

    def __enter__(self):
        return self

    def __exit__(self, *_):
        self.close()


def loadshm():
    data = BytesIO(bytearray(BUFSIZ))
    with MySharedMemory(SHMNAME, True, BUFSIZ) as shm:
        shm.buf[:BUFSIZ] = data.getbuffer()


def unloadshm():
    with MySharedMemory(SHMNAME) as shm:
        print(len(bytes(shm.buf)) == BUFSIZ)


def main():
    try:
        with Pool() as pool:
            pool.apply(loadshm)
            pool.apply(unloadshm)
    finally:
        MySharedMemory(SHMNAME).unlink()


if __name__ == "__main__":
    main()

输出:

True

撰写回答