无法向add_date_job()添加方法以在SQLAlchemyJobStore中存储作业

0 投票
1 回答
1529 浏览
提问于 2025-04-17 15:33

问题描述:

我想给 scheduler.add_date_job() 添加一个方法 (Test::start()),这个方法是配置用来存储任务的 SQLAlchemyJobStore。把任务添加到任务存储中是成功的。但是当我尝试启动调度器时,obj_to_ref(在 apscheduler/util.py 中)无法获取给定对象的 ref_to_obj(),在这个例子中,给定的对象就是 Test::start() - 换句话说,就是 <bound method Test.start of <__main__.Test instance at 0xa119a6c>>

但在以下情况下,同样的操作是正常工作的:

  1. 其他任务存储(例如 RAMJobStore - 这是默认的,当没有添加/配置任务存储时)。
  2. 当调用 scheduler.add_date_job() 时使用其他函数(例如下面代码中的 func),而不是像 Test::start() 这样的类方法(任务存储为 SQLAlchemyJobStore)[对于 funcref_to_obj()obj_to_ref()<function func at 0xb768ed14>]. 我在 apscheduler/util.py 中添加了一些调试信息来确认这一点。

代码如下:

from apscheduler.scheduler import Scheduler as scheduler
from datetime import datetime, date, time, timedelta
import time
import logging

logging.basicConfig(filename='/tmp/log', level=logging.DEBUG,
        format='[%(asctime)s]: %(levelname)s : %(message)s')

class Failed(Exception):
    def __str__(self):
        return 'Failed!!'

# APScheduler Configure Options
_g_aps_default_config = {
    'apscheduler.standalone' : True,
    'apscheduler.jobstore.default.class' : 'apscheduler.jobstores.sqlalchemy_store:SQLAlchemyJobStore',
    'apscheduler.jobstore.default.url' : 'mysql://root:root123@localhost/jobstore',
    'apscheduler.jobstore.default.tablename' : 'mytable'
}

class Test:
    def __init__(self, *args, **kwargs):
        self.scheduler = scheduler(_g_aps_default_config)
        self.__running = False
        # Intentionally don't want to start!!
        self.__dont_start = True 
        self.__retry_count = 0
        self.__start_max_retries = 5

    def start(self):
        try:
            # Try to start here! 
            # Intentionally don't want to start for the first 5 times
            if self.__retry_count < self.__start_max_retries:
                self.__retry_count += 1
                raise Failed
            if self.__running:
                raise Failed
            self.__running = True
            print 'started successfully :)'
        except Failed:
            # log the start failure and reschedule the start()
            print 'attempt (#%d): unable to start now.. ' \
                  'so rescheduling to start after 5 seconds' % self.__retry_count
            alarm_time = datetime.now() + timedelta(seconds=5)
            self.scheduler.add_date_job(self.start, alarm_time)
            self.scheduler.start()

def func():
    print 'this is a func and not a method!!!'

if __name__ == '__main__':
    t = Test()
    t.start()
    while True:
        time.sleep(10)
    t.stop()

堆栈跟踪如下:

Traceback (most recent call last):
  File "user1.py", line 55, in <module>
    t.start()
  File "user1.py", line 48, in start
    self.scheduler.start()
  File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/scheduler.py", line 109, in start
    self._real_add_job(job, jobstore, False)
  File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/scheduler.py", line 259, in _real_add_job
    store.add_job(job)
  File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/jobstores/sqlalchemy_store.py", line 58, in add_job
    job_dict = job.__getstate__()
  File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/job.py", line 120, in __getstate__
    state['func_ref'] = obj_to_ref(self.func)
  File "/usr/lib/python2.7/site-packages/APScheduler-2.1.0-py2.7.egg/apscheduler/util.py", line 174, in obj_to_ref
    raise ValueError('Cannot determine the reference to %s' % repr(obj))
ValueError: Cannot determine the reference to <bound method Test.start of <__main__.Test instance at 0xa119a6c>>

我在 apscheduler/util.py 中添加的调试信息如下:

161 def obj_to_ref(obj):
162     """
163     Returns the path to the given object.
164     """
165     ref = '%s:%s' % (obj.__module__, get_callable_name(obj))
166     print 'obj_to_ref : obj : %s' % obj
167     print 'obj_to_ref : ref : %s' % ref
168     try:
169         obj2 = ref_to_obj(ref)
170         print 'obj_to_ref : obj2 : %s' % obj2
171         if obj != obj2:
172             raise ValueError
173     except Exception:
174         raise ValueError('Cannot determine the reference to %s' % repr(obj))
175 
176     return ref

以下是 Test::start() 的调试打印:

obj_to_ref : obj : <bound method Test.start of <__main__.Test instance at 0xa119a6c>>
obj_to_ref : ref : __main__:Test.start
obj_to_ref : obj2 : <unbound method Test.start>

scheduler.add_date_job() 改成一个 function(例如 func),而不是一个 method(例如 Test::start()

self.scheduler.add_date_job(func, alarm_time)

以下是 func() 的调试打印:

obj_to_ref : obj : <function func at 0xb768ed14>
obj_to_ref : ref : __main__:func
obj_to_ref : obj2 : <function func at 0xb768ed14>

我是不是做错了什么?还是说这是 apscheduler/util.py 中与 SQLAlchemyJobStore 相关的一个bug?

有没有已知的解决办法?

1 个回答

1

主要问题是,当你使用 SQLAlchemyJobStore 或其他任何不是 RAMJobStore 的任务存储时,apscheduler 会用一种叫做 pickle 的方式把你的任务保存到存储中。它只保存你在 scheduler.add_date_job 方法中指定的函数的引用名。

所以在你的情况下,它保存的内容大概是 <内存中的对象ID>.start

因此,对于任务函数,你应该使用 在模块顶层定义的函数,而不是实例方法。

这也意味着 apscheduler 不会在任务运行之间保存任务函数的状态。你可能需要在方法内部实现状态的保存和加载到数据库中,但这样会让事情变得太复杂。

更好的方法是实现一个自定义的调度触发器类,这样可以决定何时运行任务。你可能仍然需要为触发器加载/保存状态,以便它能够支持停止和启动调度器的过程。

一些链接:

撰写回答