把大号的子弹放在多处理.Queu

2024-04-29 08:03:30 发布

您现在位置:Python中文网/ 问答频道 /正文

当尝试将一个大的ndarray放入Process中的Queue时,我遇到了以下问题:

首先,代码如下:

import numpy
import multiprocessing
from ctypes import c_bool
import time


def run(acquisition_running, data_queue):
    while acquisition_running.value:
        length = 65536
        data = numpy.ndarray(length, dtype='float')

        data_queue.put(data)
        time.sleep(0.1)


if __name__ == '__main__':
    acquisition_running = multiprocessing.Value(c_bool)

    data_queue = multiprocessing.Queue()

    process = multiprocessing.Process(
        target=run, args=(acquisition_running, data_queue))

    acquisition_running.value = True
    process.start()
    time.sleep(1)
    acquisition_running.value = False
    process.join()

    print('Finished')

    number_items = 0

    while not data_queue.empty():
        data_item = data_queue.get()
        number_items += 1

    print(number_items)
  1. 如果我使用length=10左右,一切正常。我有9件东西通过队列传送。

  2. 如果我增加到length=1000,在我的计算机上process.join()块,尽管run()函数已经完成。我可以用process.join()注释这行,然后会看到,队列中只有2个项目,所以很明显,将数据放入队列的速度非常慢。

我的计划是运输4个月,每个长度65536。对于Thread来说,这非常快(<;1ms)。有没有办法提高进程的数据传输速度?在

我在Windows机器上使用了python3.4,但是在Linux上使用python3.4,我得到了相同的行为。在


Tags: runimportnumberdatatime队列queuevalue
3条回答

“是否有办法提高进程的数据传输速度?”

当然,考虑到要解决的正确问题。目前,您只是在填充缓冲区而不同时清空它。恭喜你,你刚刚给自己建立了一个所谓的僵局。{

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe.

但是,让我们慢慢来。首先,“速度”不是你的问题!我知道您只是在试验Python的multiprocessing。阅读代码时最重要的一点是父对象和子对象之间的通信流,尤其是事件处理没有真正意义。如果你想解决一个现实世界中的问题,你肯定无法用这种方式解决它。如果您没有实际问题,那么在开始编写代码之前,首先需要找到一个好的问题;-)。最后,您将需要了解操作系统为进程间通信提供的通信原语。在

解释您所观察到的情况:

您的子进程生成大约10 * length * size(float)字节的数据(考虑到您的子进程可以执行大约10次迭代,而父进程在将acquisition_running设置为False之前睡了大约1s)。当父进程休眠时,子进程将指定的数据量放入队列中。您需要了解队列是一个复杂的构造。你不需要了解每一点。但有一件事真的很重要:进程间通信的队列显然使用了某种位于父进程和子进程之间的缓冲区。缓冲区的大小通常是有限的。您正在从子级中写入此缓冲区,而不在父级中同时从该缓冲区中读取数据。也就是说,缓冲区内容在父级休眠时稳定增长。通过增加length,您将遇到队列缓冲区已满且子进程无法再向其写入的情况。但是,子进程在写入所有数据之前不能终止。同时,父进程等待子进程终止。在

你看到了吗?一个实体等待另一个实体。父节点等待子节点终止,子节点等待父节点腾出一些空间。这种情况称为死锁。它无法自行解决。在

关于细节,缓冲区的情况比上面描述的要复杂一些。您的子进程产生了一个额外的线程,该线程试图通过管道将缓冲数据推送到父进程。实际上,这个管道的缓冲区是限制实体。它由操作系统定义,至少在Linux上,它通常不大于65536字节。在

换言之,最重要的部分是:父对象在试图将写入管道之前,不会从管道中读取。在使用管道的每个有意义的场景中,读取写入都以相当同步的方式发生,因此一个进程可以对另一个进程提供的输入做出快速反应。你所做的恰恰相反:你让你的父对象进入睡眠状态,从而使它对来自子对象的输入不响应,从而导致死锁。在

(*)“当进程第一次将一个项目放入队列时,将启动一个进料器线程,该线程将对象从缓冲区传输到管道”,从https://docs.python.org/2/library/multiprocessing.html

要解决这个问题,您可以做的一件事是在每个进程之间卸载队列,这与JPG提供的优秀答案一致。在

所以改为这样做:

process.start()
data_item = data_queue.get()
process.join()

虽然这并不能完全复制代码中的行为(数据计数的数量),但您应该明白了;)

如果您有非常大的数组,您可能只想传递它们的pickled状态——或者更好的选择是使用multiprocessing.Array或{}来创建一个共享内存数组(对于后者,请参见http://briansimulator.org/sharing-numpy-arrays-between-processes/)。您必须担心冲突,因为您将拥有一个不受GIL约束的数组,并且需要锁。但是,您只需要发送数组索引就可以访问共享数组数据。在

相关问题 更多 >