Python中的多处理:有没有一种方法池.imap不积累记忆?

2024-03-28 15:53:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用Python中的multiprocessing模块来并行训练带有keras的神经网络,使用带有imapPool(processes = 4)对象。在每一个“周期”(即每4个进程)之后,它会稳定地使用越来越多的内存,直到最终崩溃。你知道吗

我使用memory_profiler模块跟踪我的内存使用情况,训练了12个网络。这里使用香草imapvanilla

如果我把maxtasksperchild = 1放在Pool1taskperchild

如果我使用imap(chunksize = 3)chunks

在后一种情况下,如果一切正常,我只向池中的每个进程发送一个批,所以问题似乎是这些进程携带了有关前一批的信息。如果是这样,我能强迫游泳池不要这样做吗?你知道吗

尽管chunks解决方案似乎有效,但我还是不想使用它,因为

  • 我想使用tqdm模块来跟踪进度,在chunks情况下,它只会在每个chunk之后更新,这实际上意味着它根本不会真正跟踪任何东西,因为所有chunk都会在同一时间完成(在本例中)
  • 目前,所有网络的训练时间都完全相同,但我想允许它们有单独的训练时间,其中块解决方案可能会导致一个进程获得所有长的训练时间。你知道吗

这里有一个vanilla案例中的代码片段。在另外两个例子中,我只是更改了Pool中的maxtasksperchild参数和imap中的chunksize参数:

def train_network(network):
    (...)
    return score

pool = Pool(processes = 4)
scores = pool.imap(train_network, networks)
scores = tqdm(scores, total = networks.size)

for (network, score) in zip(networks, scores):
    network.score = score

pool.close()
pool.join()

Tags: 模块内存网络进程时间情况networkprocesses
2条回答

我想出了一个似乎可行的解决办法。我抛弃了游泳池,建立了自己的简单排队系统。除了不增加(虽然它确实略微增加,但我认为这是我将一些字典存储为日志)之外,它甚至比上面的chunks解决方案消耗更少的内存:

imapqueue

我不知道为什么会这样。也许Pool对象只是占用了大量内存?不管怎样,这是我的代码:

def train_network(network):
    (...)
    return score

# Define queues to organise the parallelising
todo = mp.Queue(size = networks.size + 4)
done = mp.Queue(size = networks.size)

# Populate the todo queue
for idx in range(networks.size):
    todo.put(idx)

# Add -1's which will be an effective way of checking
# if all todo's are finished
for _ in range(4):
    todo.put(-1)

def worker(todo, done):
    ''' Network scoring worker. '''
    from queue import Empty
    while True:
        try:
            # Fetch the next todo
            idx = todo.get(timeout = 1)
        except Empty:
            # The queue is never empty, so the silly worker has to go
            # back and try again
            continue

        # If we have reached a -1 then stop
        if idx == -1:
            break
        else:
            # Score the network and store it in the done queue
            score = train_network(networks[idx])
            done.put((idx, score))

# Construct our four processes
processes = [mp.Process(target = worker,
    args = (todo, done)) for _ in range(4)]

# Daemonise the processes, which closes them when
# they finish, and start them
for p in processes:
    p.daemon = True
    p.start()

# Set up the iterable with all the scores, and set
# up a progress bar
idx_scores = (done.get() for _ in networks)
pbar = tqdm(idx_scores, total = networks.size)

# Compute all the scores in parallel
for (idx, score) in pbar:
    networks[idx].score = score

# Join up the processes and close the progress bar
for p in processes:
    p.join()
pbar.close()

不幸的是,python中的multiprocessing模块带来了巨大的开销。数据通常不在进程之间共享,需要复制。这将从python3.8开始改变。你知道吗

https://docs.python.org/3.8/library/multiprocessing.shared_memory.html

尽管python3.8的正式发布日期是2019年10月21日,但您已经可以在github上下载它了

相关问题 更多 >