我的mxnet脚本很可能受到加载到GPU的数据的i/o的限制,我正试图通过预取来加快速度。问题是我不知道如何使用自定义数据迭代器进行预取。在
我的第一个假设/希望是它足以设置self.preprocess_线程以及self.prefetch_缓冲区,正如我在mxnet.io.ImageRecordUInt8Iter
等迭代器中看到的here。但是,当我这样做时,在设置这些变量之前,我没有看到与脚本相关的性能变化,所以很明显设置这些变量没有起作用。在
然后我注意到,除了我为其实现子类mx.io.DataIter
的基类之外,还存在一个类mx.io.PrefetchingIter
。{我不知道该在哪里找到一个小例子。但是,我不清楚如何使用这个。例如。我看到除了next()
之外,它还有一个iter_next()
方法,它简单地说“移动到下一批”。这到底是什么意思?“转移”到下一批而不生产它意味着什么?我找到了这个类的source code,根据简单的阅读,它似乎需要多个迭代器,并且每个迭代器创建一个线程。这可能不适合我当前的设计,因为我真的希望多个线程用于从同一个迭代器中进行预取。在
下面是我试图通过一个自定义数据迭代器来实现的
multiprocessing.Queue
,当数据可用时,我在其中弹出数据multiprocessing
)一个命令行脚本来生成该数据,该脚本执行一个生成numpy
文件的c++二进制文件numpy
文件并将其内容加载到内存中,对其进行处理,并将处理后的位放入全局multiprocessing.Queue
这是我的代码:
def launchJobForDate(date_str):
### this is a function that gets called via multiprocessing
### to produce new data by calling a c++ binary
### whenever data queue is empty so that we need to produce more data
try:
f = "testdata/data%s.npy"%date_str
if not os.path.isfile(f):
cmd = CMD % ( date_str, JSON_FILE, date_str, date_str, date_str)
while True:
try:
output = subprocess.check_output(cmd, shell=True)
break
except:
pass
while True:
try:
d = np.load(f)
break
except:
pass
data_queue.put((d, date_str))
except Exception as ex:
print("launchJobForDate: ERROR ", ex)
class ProduceDataIter(mx.io.DataIter):
@staticmethod
def processData(d, time_steps, num_inputs):
try:
...processes data...
return [z for z in zip(bigX, bigY, bigEvalY, dates)]
except Exception as ex:
print("processData: ERROR ", ex)
def __init__(self, num_mgrs, end_date_str):
## iter stuff
self.preprocess_threads = 4
self.prefetch_buffer = 1
## set up internal data to preserve state
## and make a list of dates for which to run binary
@property
def provide_data(self):
return [mx.io.DataDesc(name='seq_var',
shape=(args_batch_size * GPU_COUNT,
self.time_steps,
self.num_inputs),
layout='NTC')]
@property
def provide_label(self):
return [mx.io.DataDesc(name='bd_return',
shape=(args_batch_size * GPU_COUNT)),
mx.io.DataDesc(name='bd_return',
shape=(args_batch_size * GPU_COUNT, num_y_cols)),
mx.io.DataDesc(name='date',
shape=(args_batch_size * GPU_COUNT))]
def __next__(self):
try:
z = self.z.pop(0)
data = z[0:1]
label = z[1:]
return mx.io.DataBatch(data, label)
except Exception as ex:
### if self.z (a list) has no elements to pop we need
### to get more data off the queue, process it, and put it
### on self.x so it's ready for calls to __next__()
while True:
try:
d = data_queue.get_nowait()
processedData = ProduceDataIter.processData(d,
self.time_steps,
self.num_inputs)
self.z.extend(processedData)
counter_queue.put(counter_queue.get() - 1)
z = self.z.pop(0)
data = z[0:1]
label = z[1:]
return mx.io.DataBatch(data, label)
except queue.Empty:
...this is where new jobs to produce new data and put them
...on the queue would happen if nothing is left on the queue
然后,我尝试制作这些迭代器和预取迭代器,如下所示:
^{pr2}$问题是当第一次调用mgrOuter
时,mgrOuter
立即抛出一个StopIteration
,而且没有像我想的那样调用mgr.__next__()
。在
最后,我还注意到gluon
有一个DataLoader
对象,seems like it might handle prefetching,但是在本例中,它似乎也假设底层数据来自一个具有有限且不变布局的Dataset
,该布局是根据{
我的问题是:
谢谢你的反馈和建议。在
正如您已经提到的,gluon DataLoader正在提供预取。在定制的DataIterator中,使用Numpy数组作为输入。因此您可以执行以下操作:
因为您是动态创建数据的,所以可以尝试在每个epoch重置DataLoader并加载一个新的Numpy数组。 如果GPU利用率仍然很低,请尝试增加批处理大小和工作线程数。另一个问题也可能是数据集的大小。重置DataLoader将影响性能,因此拥有更大的数据集将增加epoch的时间,从而提高性能。在
相关问题 更多 >
编程相关推荐