Python 高级调度器作业在 jobstore 中未找到 (apscheduler)
考虑一下这段小代码
from apscheduler.scheduler import Scheduler
import time
class First():
def __init__(self):
self.remove_job=None
def go(self):
self.remove_job('test')
class Sched():
def __init__(self):
self.sched = Scheduler()
self.sched.add_interval_job( self.execute,
seconds=1,
name = 'test'
)
def execute(self):
print "i'm alive"
def remove_job(self,job):
self.sched.print_jobs()
self.sched.unschedule_job(job)
def main():
first = First()
sched = Sched()
first.remove_job=sched.remove_job
sched.sched.start()
time.sleep(5)
first.go()
return 0
if __name__ == '__main__':
main()
python sched_test.py
i'm alive
i'm alive
i'm alive
i'm alive
i'm alive
Jobstore default:
test (trigger: interval[0:00:01], next run at: 2011-12-22 01:25:36.577572)
Traceback (most recent call last):
File "sched_test.py", line 55, in <module>
main()
File "sched_test.py", line 51, in main
first.go()
File "sched_test.py", line 31, in go
self.remove_job('test')
File "sched_test.py", line 43, in remove_job
self.sched.unschedule_job(job)
File "/usr/local/lib/python2.7/dist-packages/APScheduler-2.0.2-py2.7.egg/apscheduler/scheduler.py", line 401, in unschedule_job
raise KeyError('Job "%s" is not scheduled in any job store' % job)
KeyError: 'Job "test" is not scheduled in any job store'
我在打印工作时为什么会出现这个错误?不过,print_jobs()
这个函数能给我正确的概览。
有没有人能帮我解释一下这个问题?
3 个回答
0
注意,你的 First
类其实并没有 sched
这个实例;它肯定无法访问你可能想要操作的 Scheduler
的 sched.sched
实例。
class First():
def __init__(self):
self.remove_job=None
def go(self):
self.remove_job('test')
也许你应该先创建一个 Sched
对象,这样你就可以把它传递给 First()
的构造函数,这样就能调用它了。我这里简单描述一个我认为可以解决这个问题的(未经过测试的)方法:
class First():
def __init__(self, sched):
self.sched = sched
def go(self):
self.sched.remove_job('test')
def main():
sched = Sched()
first = First(sched)
sched.sched.start()
time.sleep(5)
first.go()
return 0
这样做可以让 Sched
类独立存在——也许通过合并 First
和 Sched
可以找到更好的设计——因为 First
知道 Sched
控制的某个任务的名字,这说明可能有些地方不太对劲。
也许你可以退一步,解释一下你想解决什么问题?这看起来不是最干净的解决方案,所以我在想你遇到的问题是否可以通过更好的方法来解决。
3
这对大多数人来说可能很明显,但我花了一段时间才明白。想和大家分享一下是什么让我的代码正常工作的:
myJobName= "Homework"
for job in self.sched.get_jobs():
if job.name == "Homework":
self.sched.unschedule_job(job)
print "No more homework!"
1
你需要把通过add_interval_job得到的工作实例传给unschedule_job,而不是传一个字符串。这样就能解决问题了。