mxnet:如何设置自定义mx.io.DataIter预取?

2024-04-19 01:02:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我的mxnet脚本很可能受到加载到GPU的数据的i/o的限制,我正试图通过预取来加快速度。问题是我不知道如何使用自定义数据迭代器进行预取。在

我的第一个假设/希望是它足以设置self.preprocess_线程以及self.prefetch_缓冲区,正如我在mxnet.io.ImageRecordUInt8Iter等迭代器中看到的here。但是,当我这样做时,在设置这些变量之前,我没有看到与脚本相关的性能变化,所以很明显设置这些变量没有起作用。在

然后我注意到,除了我为其实现子类mx.io.DataIter的基类之外,还存在一个类mx.io.PrefetchingIter。{我不知道该在哪里找到一个小例子。但是,我不清楚如何使用这个。例如。我看到除了next()之外,它还有一个iter_next()方法,它简单地说“移动到下一批”。这到底是什么意思?“转移”到下一批而不生产它意味着什么?我找到了这个类的source code,根据简单的阅读,它似乎需要多个迭代器,并且每个迭代器创建一个线程。这可能不适合我当前的设计,因为我真的希望多个线程用于从同一个迭代器中进行预取。在

下面是我试图通过一个自定义数据迭代器来实现的

  1. 我维护了一个全局multiprocessing.Queue,当数据可用时,我在其中弹出数据
  2. 我通过运行(通过multiprocessing)一个命令行脚本来生成该数据,该脚本执行一个生成numpy文件的c++二进制文件
  3. 我打开numpy文件并将其内容加载到内存中,对其进行处理,并将处理后的位放入全局multiprocessing.Queue
  4. 当队列为空时,我的自定义迭代器将关闭此队列并启动更多作业以生成更多数据。在

这是我的代码:

def launchJobForDate(date_str):
### this is a function that gets called via multiprocessing
### to produce new data by calling a c++ binary
### whenever data queue is empty so that we need to produce more data
    try:
        f = "testdata/data%s.npy"%date_str
        if not os.path.isfile(f):
            cmd = CMD % ( date_str, JSON_FILE, date_str, date_str, date_str)
            while True:
                try:
                    output = subprocess.check_output(cmd, shell=True)
                    break
                except:
                    pass
        while True:
            try:
                d = np.load(f)
                break
            except:
                pass
        data_queue.put((d, date_str))
    except Exception as ex:
        print("launchJobForDate: ERROR ", ex)

class ProduceDataIter(mx.io.DataIter):
    @staticmethod
    def processData(d, time_steps, num_inputs):
       try: 
            ...processes data...
            return [z for z in zip(bigX, bigY, bigEvalY, dates)]
        except Exception as ex:
            print("processData: ERROR ", ex)

    def __init__(self, num_mgrs, end_date_str):
        ## iter stuff
        self.preprocess_threads = 4
        self.prefetch_buffer = 1

        ## set up internal data to preserve state
        ## and make a list of dates for which to run binary

    @property
    def provide_data(self):
        return [mx.io.DataDesc(name='seq_var', 
                               shape=(args_batch_size * GPU_COUNT, 
                                      self.time_steps, 
                                      self.num_inputs), 
                               layout='NTC')]

    @property
    def provide_label(self):
        return [mx.io.DataDesc(name='bd_return', 
                                shape=(args_batch_size * GPU_COUNT)),             
                mx.io.DataDesc(name='bd_return', 
                                shape=(args_batch_size * GPU_COUNT, num_y_cols)), 
                mx.io.DataDesc(name='date', 
                               shape=(args_batch_size * GPU_COUNT))]                 


    def __next__(self):
        try:
            z = self.z.pop(0)       
            data = z[0:1]
            label = z[1:]
            return mx.io.DataBatch(data, label) 
        except Exception as ex:
            ### if self.z (a list) has no elements to pop we need
            ### to get more data off the queue, process it, and put it
            ### on self.x so it's ready for calls to __next__()
            while True:
                try:
                    d = data_queue.get_nowait()
                    processedData = ProduceDataIter.processData(d, 
                                                            self.time_steps, 
                                                            self.num_inputs)
                    self.z.extend(processedData)
                    counter_queue.put(counter_queue.get() - 1)

                    z = self.z.pop(0)
                    data = z[0:1]
                    label = z[1:]
                    return mx.io.DataBatch(data, label)

                except queue.Empty:
                    ...this is where new jobs to produce new data and put them 
                    ...on the queue would happen if nothing is left on the queue

然后,我尝试制作这些迭代器和预取迭代器,如下所示:

^{pr2}$

问题是当第一次调用mgrOuter时,mgrOuter立即抛出一个StopIteration,而且没有像我想的那样调用mgr.__next__()。在

最后,我还注意到gluon有一个DataLoader对象,seems like it might handle prefetching,但是在本例中,它似乎也假设底层数据来自一个具有有限且不变布局的Dataset,该布局是根据{}来实现的,它采用了一个索引)。所以我没有选择这个选项,因为考虑到我作为训练输入生成的数据的动态队列性质,它看起来毫无希望。在

我的问题是:

  • 我需要如何修改上面的代码,以便对我的自定义迭代器进行预取?
  • 我在哪里可以找到关于如何mx.io.预取器作品?
  • 有没有其他的策略我应该知道通过一个自定义迭代器使我的gpu获得更高的性能?现在他们的生产能力只有50%左右,提高(或降低)批量并不能改变这一点。我还可以转动哪些旋钮来提高GPU的使用效率?在

谢谢你的反馈和建议。在


Tags: to数据ioselfdatadatereturngpu
1条回答
网友
1楼 · 发布于 2024-04-19 01:02:30

正如您已经提到的,gluon DataLoader正在提供预取。在定制的DataIterator中,使用Numpy数组作为输入。因此您可以执行以下操作:

f = "testdata/data%s.npy"%date_str
data = np.load(f)
train = gluon.data.ArrayDataset(mx.nd.array(data))
train_iter = gluon.data.DataLoader(train, shuffle=True, num_workers=4, batch_size=batch_size, last_batch='rollover')

因为您是动态创建数据的,所以可以尝试在每个epoch重置DataLoader并加载一个新的Numpy数组。 如果GPU利用率仍然很低,请尝试增加批处理大小和工作线程数。另一个问题也可能是数据集的大小。重置DataLoader将影响性能,因此拥有更大的数据集将增加epoch的时间,从而提高性能。在

相关问题 更多 >