路易吉任务去哪了?
这是我第一次接触Luigi(还有Python!),有一些问题想请教。相关的代码如下:
from Database import Database
import luigi
class bbSanityCheck(luigi.Task):
conn = luigi.Parameter()
date = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(bbSanityCheck, self).__init__(*args, **kwargs)
self.has_run = False
def run(self):
print "Entering run of bb sanity check"
# DB STUFF HERE THAT DOESN"T MATTER
print "Are we in la-la land?"
def complete(self):
print "BB Sanity check being asked for completeness: " , self.has_run
return self.has_run
class Pipeline(luigi.Task):
date = luigi.DateParameter()
def requires(self):
db = Database('cbs')
self.conn = db.connect()
print "I'm about to yield!"
return bbSanityCheck(conn = self.conn, date = self.date)
def run(self):
print "Hello World"
self.conn.query("""SELECT *
FROM log_blackbook""")
result = conn.store_result()
print result.fetch_row()
def complete(self):
return False
if __name__=='__main__':
luigi.run()
输出结果在这里(相关的数据库返回信息已删去):
DEBUG: Checking if Pipeline(date=2013-03-03) is complete
I'm about to yield!
INFO: Scheduled Pipeline(date=2013-03-03)
I'm about to yield!
DEBUG: Checking if bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03) is complete
BB Sanity check being asked for completeness: False
INFO: Scheduled bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 5150] Running bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
Entering run of bb sanity check
Are we in la-la land?
INFO: [pid 5150] Done bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: Worker was stopped. Shutting down Keep-Alive thread
所以我的问题是:
1.) 为什么“我快要输出了”会打印两次?
2.) 为什么“你好,世界”从来没有被打印出来?
3.) 什么是“1个待处理任务可能由其他工作者运行”?
我更喜欢输出结果超级干净,因为这样更容易维护。我希望能把这些警告信息搞清楚。
我还注意到需要使用“yield”或者“return item, item2, item3”。我读过关于yield的内容,也理解它的意思。但我不明白的是,这两种写法哪种更好,或者它们之间是否有一些细微的区别,而我作为新手还没有理解。
1 个回答
5
我觉得你对luigi的工作原理有些误解。
(1) 嗯……我不太确定。看起来更像是INFO和DEBUG同时打印相同内容的问题。
(2) 你想运行一个叫Pipeline的任务,而这个任务又依赖于bbSanityCheck。可是,bbSanityCheck.complete()永远不会返回True,因为你从来没有把has_run设置为True。所以Pipeline这个任务就永远无法运行,也无法输出“hello world”,因为它的依赖任务从来没有完成。
(3) 这可能是因为你有一个待处理的任务(其实就是Pipeline)。但是luigi知道这个任务不可能运行,所以它就停止了。
我个人认为,不应该用has_run来判断一个任务是否已经运行,而是应该检查这个任务的结果是否存在。比如,如果这个任务对数据库做了什么,那么complete()应该检查预期的内容是否存在。