在run()中生成任务时Luigi中的TaskClassAmbigiousException

2024-05-19 01:42:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在努力解决路易吉的一个错误,我不明白。我不知道这是否是一个已知的问题,一个Luigi的限制,或者我是否做错了什么

我正在使用Luigi解决一个有很多任务和依赖关系的实际问题。然而,我已经做了一个玩具的例子,在其中这个问题出现得很清楚

让我们考虑两个任务,Taska和Taskb,Taska需要执行两个先前的TaskB实例,它们具有路易吉参数的不同值。p>

如果我在TaksA的requires()方法中对依赖项进行编码,那么就不会发生什么不好的事情。这三个任务都会执行,我会编写退出文件

但是如果我在TaksA的run()方法中对依赖项进行编码,那么就会得到难看的TaskClassAmbigiousException

在我的实际问题中,我无法在requires()方法中生成任务,因为我需要知道在requireres()方法中生成的上一个任务的结果,所以我尝试在run()中生成该任务,并得到了相同的异常

好的,下面是玩具示例的代码。首先,在requireres()中生成任务,它可以工作:

import luigi

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputB{j}.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputA{i}.txt".format(i=self.i))

    def requires(self):
        yield TaskB(j=self.i)
        yield TaskB(j=self.i+1)

    def run(self):
        print_file = ""
        for input_target in self.input():
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)
               
if __name__ == '__main__':
   taskA = TaskA(i=2)

第二,在run()中生成任务,我得到:

 File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/worker.py", line 1081, in _handle_next_task
    for module, name, params in new_requirements]

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/worker.py", line 1081, in <listcomp>
    for module, name, params in new_requirements]

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/task_register.py", line 251, in load_task
    task_cls = Register.get_task_cls(task_name)

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/task_register.py", line 181, in get_task_cls
    raise TaskClassAmbigiousException('Task %r is ambiguous' % name)

守则:

import luigi

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputB{j}.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputA{i}.txt".format(i=self.i))

    def requires(self):
        pass

    def run(self):
        print_file = ""
        target1 = yield TaskB(j=self.i)
        target2 = yield TaskB(j=self.i+1)
        for input_target in [target1, target2]:
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)
               
if __name__ == '__main__':
   taskA = TaskA(i=2)
   luigi.build([taskA], workers=1,local_scheduler=True,log_level='WARNING')

编辑:我编辑以添加另一个相关问题。我想做的是生成一个任务,该任务的参数取决于先前生成的任务,如果这对我来说是可能的,那么就足够了:

  def requires(self):
        taskb_target = yield TaskB(j=self.i)
        taskb_target.open('r')
# do something and yield next Task depending on what taskb_target has
        yield TaskB(j=self.i+1)

但不幸的是,这不起作用。Luigi说“非类型”对象没有“打开”属性

但是,在run()方法中生成任务时,可以在运行时访问输出。似乎有一个很大的不对称性

第二次编辑:

我做了更多的试验,发现了一个奇怪的结论:我在原始问题中编写的第二段代码(在.py文件中时)可以永久执行,即使删除了输出文件,从而迫使luigi重新执行任务。但是,第一段代码只能执行一次(而且,在第一次执行时,它可以工作!!)。但是,如果删除文件并再次执行代码,您将得到ambigous任务错误

我认为这与luigi的Register对象有关。但真正让我困惑的是,无论我是在requires中生成taskB还是在run方法中生成taskB,这种行为都是不同的

我仍然不知道在重新定义已经在luigi的Register模块中的类任务时是否会出现问题。可能是。。。我还尝试将类定义放置在一个不同于main.py的.py中,但运行两次就会中断。正确运行的唯一方法是重新启动内核,而您只有一次机会


Tags: 方法runinselftaskoutputdefline
2条回答

好的,现在我发现只有在执行两次脚本时才会出现问题。在尝试一个错误时,我发现问题在于,当再次导入TaskA和TaskB时,它们会再次在luigi.task\u register.register中注册

事实上,Register有一个属性_reg,它包含所有注册的类。在模块的第二个执行中,TaskA和TaskB再次注册。我不知道为什么。这很奇怪,但却是真的。只有在导入TaskA时才会发生这种情况,这更奇怪

因此,我发现解决此问题的方法如下:

import luigi
from luigi import task_register

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputB{j}.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputA{i}.txt".format(i=self.i))

    def requires(self):
        pass

    def run(self):
        print_file = ""
        target1 = yield TaskB(j=self.i)
        target2 = yield TaskB(j=self.i+1)
        for input_target in [target1, target2]:
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

taskA_list = [c for c in task_register.Register._reg if c.__name__ == 'TaskA']
if len(taskA_list) > 1:
   task_register.Register._reg.pop()
   task_register.Register._reg.pop()
               
if __name__ == '__main__':
   taskA = TaskA(i=2)
   luigi.build([taskA], workers=1,local_scheduler=True,log_level='WARNING')

这是一个有效的技巧,使模块永远可以重新执行,无论是否删除输出文件。但显然这不是最优雅的解决方案。我写它只是为了帮助luigi开发人员修复它,或者如果我做错了什么来纠正我

当您使用yield时,不会得到返回值,因为您基本上是return从一个共同例程中获取一个值。实际上,我很惊讶在requires的内部使用yield对您有效,因为它导致了我的崩溃。您要做的是首先定义任务,然后生成任务。例如,你会:

class TaskA(luigi.Task):
    def run(self):
        task_1 = TaskB(j=self.i)
        yield task_1
        with task_1.output().open('r') as in_file:
            # Get data

        task_2 = TaskB(j=self.i+1, ...)
        yield task_2
        ...

相关问题 更多 >

    热门问题