如何使用task wrapp中的list分配luigi参数

2024-04-20 02:08:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用luigi提取不同的用户操作,并将每个操作同时保存为csv。你知道吗

我的想法是查看我的源数据,找到唯一的行动,并创建csv的使用每个行动的名称。你知道吗

class data_filter(luigi.Task):
        task = luigi.Parameter()
        def run(self):
                data_filter = full_file[full_file['properties_url'].str.contains(task)]
                data_filter.to_csv('/Users/Documents/Data/'+str(task)+'.csv')
        def requires(self):
                return []
        def output(self):
                return luigi.LocalTarget('/Users/Documents/Data/'+str(task)+'.csv')
#chaining tasks with wrapper
class wrapper(luigi.WrapperTask):
        def requires(self):
                file = pd.read_csv('/Users/Desktop/attr.csv')
                actions = file.utm_source.unique()
                task_list = []
                for current_task in actions:
                        task_list.append(data_filter(task=current_task))
                return task_list
        def run(self):
                print ('Wrapper has ended')
                pd.DataFrame().to_csv('/Users/Documents/Data/wrangle.csv')
        def output(self):
                return luigi.LocalTarget('/Users/Documents/Data/dwrangle.csv') 
if __name__ == '__main__':
    luigi.run(wrapper())

包装器应该通过查看所有唯一的操作,将它们分配给task_list并运行task_list…同时将我正在迭代的当前任务分配给task=路易吉。参数在我的数据过滤器类中。你知道吗

但是,这将返回错误消息:

  return luigi.LocalTarget('/Users/emmanuels/Documents/GitHub/Springboard-DSC/Springboard-DSC/Capstone 1 - Attribution Model/Data/'+str(task)+'.csv')
NameError: name 'task' is not defined

以及

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed scheduling:
    - 1 wrapper()

Did not run any tasks
This progress looks :( because there were tasks whose scheduling failed

我只想弄清楚我做错了什么


Tags: csvrunselftaskdatareturndeffilter