如何从apscheduler访问返回值?

2024-03-29 09:56:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我不太清楚如何在apscheduler中访问调度作业的返回值。作业需要在每天不同的时间运行,我需要今天作业的返回值来安排明天的作业。在

这个链接(how to get return value from apscheduler jobs)似乎是这个问题之前最好的答案。它建议在调度程序中添加一个侦听器。我添加了一个侦听器,但不确定如何访问它的返回值。我可以访问附加到调度程序的侦听器,但不能访问它们的输出。当计划作业运行时,将打印下面代码中的job_nses()侦听器。在

此外,我知道我需要访问JobExecutionEvent(https://apscheduler.readthedocs.io/en/latest/modules/events.html#module-apscheduler.events),它保存函数的返回值。在

首先,我要访问的函数是run_all(),其中执行了一系列操作,但是对于测试用例,我只返回True。在

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, JobExecutionEvent
from datetime import datetime, timedelta
import logging


def run_all():
    return True


def job_runs(event):  # listener function
    if event.exception:
        print('The job did not run')
    else:
        print('The job completed @ {}'.format(datetime.now()))


def job_return_val(event):  # listener function
    return event.retval

然后,我设置调度程序,添加侦听器,并添加作业。触发器设置为在作业添加到调度程序后1分钟运行函数。在

^{pr2}$

接下来,我启动调度程序并打印计划的作业。另外,我设置了日志记录,以便知道调度程序在哪里。在

  test = scheduler.start()
  scheduler.print_jobs()
  logging.basicConfig()
  logging.getLogger('apscheduler').setLevel(logging.DEBUG)

启用日志记录后,调度程序报告作业已运行并从调度程序中删除,正如我所期望的那样。job_nses()将正确的输出打印到控制台。对于断点,我知道job\u return_val()被调用。但是,我不知道它返回的值被发送到哪里。该函数似乎是在另一个名为APScheduler的线程中调用的。我不太了解线程,但这是有道理的。但是,我不知道该线程的输出何时返回到主线程。在

最后,我尝试用代码、job-id、jobstore和scheduled-run-time从scheduler和job的属性访问JobExceptionEvent,但是JobExceptionEvent似乎不知道事件是在scheduler中运行的。由于上一段中描述的线程,这似乎也有意义。在

任何帮助整理这个问题太好了!在


Tags: 函数runfromimport程序eventreturnlogging
2条回答

您需要的是实现stateful jobs feature。在

listener的返回值在任何地方都没有使用(请参见code),因此返回任何值都是没有用的。如果需要根据上一个作业的值(在侦听器中通过事件对象获取)调度另一个作业,则必须在该侦听器中正确地执行该操作。在

编辑:为了说明如何进行编辑(并证明这是可能的),请参见以下示例代码:

from datetime import datetime
import time

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.schedulers.background import BackgroundScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


def tack():
    print('Tack! The time is: %s' % datetime.now())


def listener(event):
    if not event.exception:
        job = scheduler.get_job(event.job_id)
        if job.name == 'tick':
            scheduler.add_job(tack)


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_listener(listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    scheduler.add_job(tick, 'interval', seconds=5)
    scheduler.start()

    try:
        while True:
            time.sleep(1)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

输出:

^{pr2}$

相关问题 更多 >