路易吉任务去哪了?

2 投票
1 回答
1860 浏览
提问于 2025-04-18 17:13

这是我第一次接触Luigi(还有Python!),有一些问题想请教。相关的代码如下:

from Database import Database
import luigi

class bbSanityCheck(luigi.Task):

  conn = luigi.Parameter()
  date = luigi.Parameter()
  def __init__(self, *args, **kwargs):
    super(bbSanityCheck, self).__init__(*args, **kwargs)
    self.has_run = False

  def run(self):
    print "Entering run of bb sanity check"
    # DB STUFF HERE THAT DOESN"T MATTER
   print "Are we in la-la land?"

  def complete(self):
    print "BB Sanity check being asked for completeness: " , self.has_run
    return self.has_run

class Pipeline(luigi.Task):
  date = luigi.DateParameter()

  def requires(self):
    db = Database('cbs')
    self.conn = db.connect()
    print "I'm about to yield!"
    return bbSanityCheck(conn = self.conn, date = self.date)


  def run(self):
    print "Hello World"
    self.conn.query("""SELECT * 
              FROM log_blackbook""")
    result = conn.store_result()

    print result.fetch_row()

  def complete(self):
    return False

if __name__=='__main__':
  luigi.run()

输出结果在这里(相关的数据库返回信息已删去):

DEBUG: Checking if Pipeline(date=2013-03-03) is complete
I'm about to yield!
INFO: Scheduled Pipeline(date=2013-03-03)
I'm about to yield!
DEBUG: Checking if bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03) is complete
BB Sanity check being asked for completeness:  False
INFO: Scheduled bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 5150] Running   bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
Entering run of bb sanity check
Are we in la-la land?
INFO: [pid 5150] Done      bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: Worker was stopped. Shutting down Keep-Alive thread

所以我的问题是:

1.) 为什么“我快要输出了”会打印两次?

2.) 为什么“你好,世界”从来没有被打印出来?

3.) 什么是“1个待处理任务可能由其他工作者运行”?

我更喜欢输出结果超级干净,因为这样更容易维护。我希望能把这些警告信息搞清楚。

我还注意到需要使用“yield”或者“return item, item2, item3”。我读过关于yield的内容,也理解它的意思。但我不明白的是,这两种写法哪种更好,或者它们之间是否有一些细微的区别,而我作为新手还没有理解。

1 个回答

5

我觉得你对luigi的工作原理有些误解。

(1) 嗯……我不太确定。看起来更像是INFO和DEBUG同时打印相同内容的问题。

(2) 你想运行一个叫Pipeline的任务,而这个任务又依赖于bbSanityCheck。可是,bbSanityCheck.complete()永远不会返回True,因为你从来没有把has_run设置为True。所以Pipeline这个任务就永远无法运行,也无法输出“hello world”,因为它的依赖任务从来没有完成。

(3) 这可能是因为你有一个待处理的任务(其实就是Pipeline)。但是luigi知道这个任务不可能运行,所以它就停止了。

我个人认为,不应该用has_run来判断一个任务是否已经运行,而是应该检查这个任务的结果是否存在。比如,如果这个任务对数据库做了什么,那么complete()应该检查预期的内容是否存在。

撰写回答