无法使用特定的开始时间计划程序按时间间隔计划python作业

2024-04-26 15:13:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我们需要每小时在Flask中安排一个python作业,下面的代码片段按预期工作

   from apscheduler.schedulers.background import BackgroundScheduler
   scheduler = BackgroundScheduler()
   scheduler.add_job(func=sched, trigger="interval", seconds=60*60)
   scheduler.start()
   atexit.register(lambda: scheduler.shutdown())

但面临的挑战是,我们通常会在太平洋标准时间下午2:00到下午4:00之间将应用程序部署到任何地方。对于一个实例,下午2:15代码部署成功。上述代码从下午3:15开始触发,间隔1小时。但是我们希望python作业从下午3:00开始触发,然后每隔1小时触发一次

简单来说:如果代码在下午2:15部署-->;作业开始时间应为下午3:00、下午4:00、下午5:00……以此类推

因此,我们尝试了下一次运行,但没有成功:->代码甚至没有触发

from apscheduler.schedulers.background import BackgroundScheduler
start_at=datetime.now(timezone('America/Los_Angeles')).replace(microsecond=0, second=0, 
minute=0)+timedelta(minutes=60) #To convert 2:15 PM to 3:00 PM
scheduler = BackgroundScheduler()
scheduler.add_job(func=sched_onefilm, trigger="interval", seconds=60*5,next_run_time=start_at)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())

还尝试了开始日期-->代码甚至没有触发

from apscheduler.schedulers.background import BackgroundScheduler
start_at=datetime.now(timezone('America/Los_Angeles')).replace(microsecond=0, second=0, 
minute=0)+timedelta(minutes=60)
scheduler = BackgroundScheduler()
scheduler.add_job(func=sched_onefilm, trigger="interval", seconds=60*5,start_date=start_at)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())

如果有人能建议/帮助我们找出问题所在,这对克服这个问题真的很有帮助

提前谢谢


Tags: 代码fromimportadd作业jobstartat
2条回答

这里有三个链接,我想可以回答你的问题。您需要仔细阅读文档来定制您的特定解决方案

所有3个都来自数字海洋的文档:

  1. https://apscheduler.readthedocs.io/en/stable/userguide.html#configuring-the-scheduler
  2. https://apscheduler.readthedocs.io/en/stable/modules/schedulers/base.html#apscheduler.schedulers.base.BaseScheduler
  3. https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html#module-apscheduler.triggers.cron

在第三个链接中,他们指出:“例如,天=1,分钟=20相当于年='',月='',天=1,周='',周中的天='',小时='*',分钟=20,秒=0。然后,作业将在每年的每月第一天以每小时20分钟的速度执行。”

我希望我的cron作业每天凌晨1点运行,因此我使用:

scheduler.add_job(func=renew_subscription, trigger="cron", hour=1)

但出于测试目的,我使用了以下方法,以便能够看到代码以30秒的速度每分钟运行一次:

scheduler.add_job(func=renew_subscription, trigger="cron", second=30)

你有没有试过

from simple_scheduler.event import event_scheduler
TZ = "America/Los_Angeles"
hour = ["0"+str(h) if h//10 == 0 else str(h) for h in range(0,24)]
WHEN = ["*|"+str(h)+":00" for h in hour]

event_scheduler.add_job(target = sched,
                        when = WHEN,
                        tz = TZ)
event_scheduler.run()

相关问题 更多 >