在多个文件上应用luigi管道

2024-04-26 17:44:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我有废弃的数据,想用luigi来处理。实际上,我已经完成了所有的转换等,现在我正在处理如何将所有文件传递给luigi管道。 我有一个外部任务,它正在返回文件名和我在glob生成的列表中调用的刮取数据。从每个输入文件中,我都得到了清理和转换的输出

函数是我调用管道的第一步,如下所示:

if __name__ == '__main__':
    luigi.build([l.Load()], local_scheduler=False)
class ReadFile(luigi.ExternalTask):
    filename = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(str(self.filename))


class ExtractRawFiles(luigi.Task):
    scrape_date = ''
    scrape_hour = ''

    def output(self):
        return luigi.LocalTarget(f'{LUIGI_RAW_DIRECTORY}/{self.scrape_date}_{self.scrape_hour}.csv')

    def requires(self):
        for input_filename in glob.glob(f'{conf.RAW_DATA_DIRECTORY}/**/*.csv', recursive=True):
            yield ReadFile(input_filename)

    def run(self):
        for i in self.input():
            df.to_csv(f'{LUIGI_RAW_DIRECTORY}/{self.scrape_date}_{self.scrape_hour}.csv')

所以问题是。在多个文件上应用管道的最佳方法是什么。你知道吗

  • 将文件作为参数传递给luigi build?你知道吗
  • 应用读取luigi管道中的所有文件

Tags: 文件csv数据selfinputdateraw管道