无法向add_date_job()添加方法以在SQLAlchemyJobStore中存储作业
问题描述:
我想给 scheduler.add_date_job()
添加一个方法 (Test::start()
),这个方法是配置用来存储任务的 SQLAlchemyJobStore
。把任务添加到任务存储中是成功的。但是当我尝试启动调度器时,obj_to_ref
(在 apscheduler/util.py
中)无法获取给定对象的 ref_to_obj()
,在这个例子中,给定的对象就是 Test::start()
- 换句话说,就是 <bound method Test.start of <__main__.Test instance at 0xa119a6c>>
。
但在以下情况下,同样的操作是正常工作的:
- 其他任务存储(例如
RAMJobStore
- 这是默认的,当没有添加/配置任务存储时)。 - 当调用
scheduler.add_date_job()
时使用其他函数(例如下面代码中的func
),而不是像Test::start()
这样的类方法(任务存储为SQLAlchemyJobStore
)[对于func
的ref_to_obj()
和obj_to_ref()
是<function func at 0xb768ed14>
]. 我在apscheduler/util.py
中添加了一些调试信息来确认这一点。
代码如下:
from apscheduler.scheduler import Scheduler as scheduler
from datetime import datetime, date, time, timedelta
import time
import logging
logging.basicConfig(filename='/tmp/log', level=logging.DEBUG,
format='[%(asctime)s]: %(levelname)s : %(message)s')
class Failed(Exception):
def __str__(self):
return 'Failed!!'
# APScheduler Configure Options
_g_aps_default_config = {
'apscheduler.standalone' : True,
'apscheduler.jobstore.default.class' : 'apscheduler.jobstores.sqlalchemy_store:SQLAlchemyJobStore',
'apscheduler.jobstore.default.url' : 'mysql://root:root123@localhost/jobstore',
'apscheduler.jobstore.default.tablename' : 'mytable'
}
class Test:
def __init__(self, *args, **kwargs):
self.scheduler = scheduler(_g_aps_default_config)
self.__running = False
# Intentionally don't want to start!!
self.__dont_start = True
self.__retry_count = 0
self.__start_max_retries = 5
def start(self):
try:
# Try to start here!
# Intentionally don't want to start for the first 5 times
if self.__retry_count < self.__start_max_retries:
self.__retry_count += 1
raise Failed
if self.__running:
raise Failed
self.__running = True
print 'started successfully :)'
except Failed:
# log the start failure and reschedule the start()
print 'attempt (#%d): unable to start now.. ' \
'so rescheduling to start after 5 seconds' % self.__retry_count
alarm_time = datetime.now() + timedelta(seconds=5)
self.scheduler.add_date_job(self.start, alarm_time)
self.scheduler.start()
def func():
print 'this is a func and not a method!!!'
if __name__ == '__main__':
t = Test()
t.start()
while True:
time.sleep(10)
t.stop()
堆栈跟踪如下:
Traceback (most recent call last):
File "user1.py", line 55, in <module>
t.start()
File "user1.py", line 48, in start
self.scheduler.start()
File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/scheduler.py", line 109, in start
self._real_add_job(job, jobstore, False)
File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/scheduler.py", line 259, in _real_add_job
store.add_job(job)
File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/jobstores/sqlalchemy_store.py", line 58, in add_job
job_dict = job.__getstate__()
File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/job.py", line 120, in __getstate__
state['func_ref'] = obj_to_ref(self.func)
File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/util.py", line 174, in obj_to_ref
raise ValueError('Cannot determine the reference to %s' % repr(obj))
ValueError: Cannot determine the reference to <bound method Test.start of <__main__.Test instance at 0xa119a6c>>
我在 apscheduler/util.py
中添加的调试信息如下:
161 def obj_to_ref(obj):
162 """
163 Returns the path to the given object.
164 """
165 ref = '%s:%s' % (obj.__module__, get_callable_name(obj))
166 print 'obj_to_ref : obj : %s' % obj
167 print 'obj_to_ref : ref : %s' % ref
168 try:
169 obj2 = ref_to_obj(ref)
170 print 'obj_to_ref : obj2 : %s' % obj2
171 if obj != obj2:
172 raise ValueError
173 except Exception:
174 raise ValueError('Cannot determine the reference to %s' % repr(obj))
175
176 return ref
以下是 Test::start()
的调试打印:
obj_to_ref : obj : <bound method Test.start of <__main__.Test instance at 0xa119a6c>>
obj_to_ref : ref : __main__:Test.start
obj_to_ref : obj2 : <unbound method Test.start>
把 scheduler.add_date_job()
改成一个 function
(例如 func
),而不是一个 method
(例如 Test::start()
)
self.scheduler.add_date_job(func, alarm_time)
以下是 func()
的调试打印:
obj_to_ref : obj : <function func at 0xb768ed14>
obj_to_ref : ref : __main__:func
obj_to_ref : obj2 : <function func at 0xb768ed14>
我是不是做错了什么?还是说这是 apscheduler/util.py
中与 SQLAlchemyJobStore
相关的一个bug?
有没有已知的解决办法?
1 个回答
主要问题是,当你使用 SQLAlchemyJobStore
或其他任何不是 RAMJobStore
的任务存储时,apscheduler 会用一种叫做 pickle 的方式把你的任务保存到存储中。它只保存你在 scheduler.add_date_job
方法中指定的函数的引用名。
所以在你的情况下,它保存的内容大概是 <内存中的对象ID>.start
。
因此,对于任务函数,你应该使用 在模块顶层定义的函数,而不是实例方法。
这也意味着 apscheduler 不会在任务运行之间保存任务函数的状态。你可能需要在方法内部实现状态的保存和加载到数据库中,但这样会让事情变得太复杂。
更好的方法是实现一个自定义的调度触发器类,这样可以决定何时运行任务。你可能仍然需要为触发器加载/保存状态,以便它能够支持停止和启动调度器的过程。
一些链接: