如何动态定义Luigi任务类名

2024-04-26 13:57:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我想将多个查询定义为相互依赖的单个Luigi任务。由于BDX\u Query\u0xx的依赖项(子任务)是基于字典(cmdList)动态定义的,所以我使用“yield[Task\u Class(klass)]”语句。例如,BDX\u Query\u 0XX有3个依赖项,BDX010、BDX020、BDX030,其中BDX010是BDX020的依赖项,BDX020是BDX030的依赖项。你知道吗

问题是,当BDX020被产生并且Python将BDX010作为依赖项时,Python会抱怨没有定义BDX010。你知道吗

如果我使用“yield BDX\u Task”而不是“yield klass”,事情会很顺利,但这会使Luigi visualizer将所有动态创建的任务显示为“BDX\u Task”,而不是显示为BDX010,…020,…030。你知道吗

cmdList = {
        'BDX010': (f'"{bdx_sql}BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ',''),
        'BDX020': (f'"{bdx_sql}BDX_001_NI_DM 020.sql"  ','BDX010'),
        'BDX030': (f'"{bdx_sql}BDX_001_NI_DM 030.sql"  ','BDX020')  }


class BDX_Task(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter(default=None)
    queryKey = luigi.Parameter()
    queryCmd = luigi.Parameter()
    runDesc = luigi.Parameter()
    dependQry = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super(BDX_Task, self).__init__(*args, **kwargs)
        logger.debug(f'BDX_Task.__init__ called for queryKey ="{self.queryKey}"')

        self.trans_id = f"00903_BDX_Query_{self.queryKey}__{self.runDesc}"

    def requires(self):
        cmdListComb = dict(cmdList)
        cmdListComb.update(cmdList2)

        if self.dependQry != '' and self.dependQry in cmdListComb:
            dep_cmd, dep_dep_key = cmdListComb[self.dependQry]
            return [self.__class__(         
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = self.dependQry,
                queryCmd = dep_cmd,
                runDesc = self.runDesc,
                dependQry = dep_dep_key
            )]
        else:
            return []

    def run(self):
        strQuery_and_args = f""" -i {self.queryCmd} """
        print(strQuery_and_args)
        helpers.call_sqlcmd(self.queryKey,strQuery_and_args )
        self.get_target().touch()


class BDX_Query_0XX(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter()  
    runDesc = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super(BDX_Query_0XX, self).__init__(*args, **kwargs)

        self.trans_id =  "00902_BDX_Query_0XX" + "__" + self.runDesc  # static.

    def requires(self):
        for queryKey, (queryCmd, dependQry) in cmdList.items():

            klass = type(queryKey, (BDX_Task,),{})

            # I can avoid the error, if I do "yield BDX_Task" below instead.
            # but it causes Luigi Visualizer to show all BDX0?X tasks as "BDX_Task".
            yield klass(   
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = queryKey,
                queryCmd = queryCmd,
                runDesc = self.runDesc,  
                dependQry = dependQry
            )

    def run(self):
        self.get_target().touch()

Tags: selftasksqlparameterargsqueryluigibdx