精准调度Python脚本每小时运行
在我提问之前,Cron Jobs 和任务调度器将是我最后的选择,这个脚本会在Windows和Linux上使用,我更希望有一种编程的方法来完成这个,而不是让最终用户自己去做。
有没有适合Python的库可以用来安排任务?我需要每小时运行一次一个函数,但是如果我每小时运行一次脚本并使用.sleep,"每小时一次"的执行时间会因为脚本和/或函数运行时的延迟而与前一天的执行时间不同。
有没有什么最好的方法可以在一天中的特定时间(多次)运行一个函数,而不使用Cron Job或任务调度器来安排呢?
如果这不可能,我也希望听听你们的看法。
AP Scheduler完全符合我的需求。
版本 < 3.0
import datetime
import time
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()
def job_function():
print("Hello World")
print(datetime.datetime.now())
time.sleep(20)
# Schedules job_function to be run once each minute
sched.add_cron_job(job_function, minute='0-59')
输出:
>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110
版本 > 3.0
(来自Animesh Pandey的回答)
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
14 个回答
3
每小时运行一次这个脚本,间隔15分钟。
比如说,你想要每15分钟获取一次股票价格的报价,这些报价会在每15分钟更新一次。
while True:
print("Update data:", datetime.now())
sleep = 15 - datetime.now().minute % 15
if sleep == 15:
run_strategy()
time.sleep(sleep * 60)
else:
time.sleep(sleep * 60)
13
我能推荐的最简单的方法就是使用schedule这个库。
在你的问题中,你提到“我需要每小时运行一次一个函数”,实现这个功能的代码非常简单:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().hour.do(thing_you_wanna_do)
while True:
schedule.run_pending()
你还问了如何在一天中的特定时间做某件事,这里有一些示例可以参考:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().day.at("10:30").do(thing_you_wanna_do)
schedule.every().monday.do(thing_you_wanna_do)
schedule.every().wednesday.at("13:15").do(thing_you_wanna_do)
# If you would like some randomness / variation you could also do something like this
schedule.every(1).to(2).hours.do(thing_you_wanna_do)
while True:
schedule.run_pending()
使用的90%的代码都是schedule库的示例代码。祝你调度愉快!
28
对于 apscheduler
版本低于 3.0 的内容,可以参考 Unknown 的回答。
对于 apscheduler
版本高于 3.0 的内容:
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
更新:
你可以查看 apscheduler
的 文档。
这里是关于 apscheduler-3.3.1
在 Python 3.6.2
上的使用说明。
"""
Following configurations are set for the scheduler:
- a MongoDBJobStore named “mongo”
- an SQLAlchemyJobStore named “default” (using SQLite)
- a ThreadPoolExecutor named “default”, with a worker count of 20
- a ProcessPoolExecutor named “processpool”, with a worker count of 5
- UTC as the scheduler’s timezone
- coalescing turned off for new jobs by default
- a default maximum instance limit of 3 for new jobs
"""
from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
"""
Method 1:
"""
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
"""
Method 2 (ini format):
"""
gconfig = {
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
}
sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format
@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()
sched_method2.configure(gconfig=gconfig)
sched_method2.start()
35
要每小时的第10分钟执行一次某个任务。
from datetime import datetime, timedelta
while 1:
print 'Run something..'
dt = datetime.now() + timedelta(hours=1)
dt = dt.replace(minute=10)
while datetime.now() < dt:
time.sleep(1)
108
也许这个对你有帮助:高级Python调度器
这是他们文档中的一小段代码:
from apscheduler.schedulers.blocking import BlockingScheduler
def some_job():
print "Decorated job"
scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()