我有废弃的数据,想用luigi来处理。实际上,我已经完成了所有的转换等,现在我正在处理如何将所有文件传递给luigi管道。 我有一个外部任务,它正在返回文件名和我在glob生成的列表中调用的刮取数据。从每个输入文件中,我都得到了清理和转换的输出
函数是我调用管道的第一步,如下所示:
if __name__ == '__main__':
luigi.build([l.Load()], local_scheduler=False)
class ReadFile(luigi.ExternalTask):
filename = luigi.Parameter()
def output(self):
return luigi.LocalTarget(str(self.filename))
class ExtractRawFiles(luigi.Task):
scrape_date = ''
scrape_hour = ''
def output(self):
return luigi.LocalTarget(f'{LUIGI_RAW_DIRECTORY}/{self.scrape_date}_{self.scrape_hour}.csv')
def requires(self):
for input_filename in glob.glob(f'{conf.RAW_DATA_DIRECTORY}/**/*.csv', recursive=True):
yield ReadFile(input_filename)
def run(self):
for i in self.input():
df.to_csv(f'{LUIGI_RAW_DIRECTORY}/{self.scrape_date}_{self.scrape_hour}.csv')
所以问题是。在多个文件上应用管道的最佳方法是什么。你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐