python apscheduler,有更简单的方式运行任务吗?
我有一些工作是通过 apscheduler
来安排的。目前我有3个工作,但不久之后会有更多。我在寻找一种方法来让我的代码更好地扩展。
现在,每个工作都是一个独立的 .py
文件,在这些文件里,我把脚本变成了一个名为 run()
的函数。以下是我的代码。
from apscheduler.scheduler import Scheduler
import logging
import job1
import job2
import job3
logging.basicConfig()
sched = Scheduler()
@sched.cron_schedule(day_of_week='mon-sun', hour=7)
def runjobs():
job1.run()
job2.run()
job3.run()
sched.start()
这段代码现在是可以工作的,但其实有点傻,因为它只是完成了任务。但是当我有50个工作的时候,这段代码会变得非常冗长。我该如何扩展它呢?
注意:这些工作的实际名称是随意的,没有遵循任何模式。文件的名称是 scheduler.py
,我在 Python 终端中通过 execfile('scheduler.py')
来运行它。
2 个回答
1
看看这个链接:
http://furius.ca/pubcode/pub/conf/bin/python-recursive-import-test
这个链接可以帮助你导入所有的 Python 文件,也就是以 .py 结尾的文件。
在导入的时候,你可以创建一个列表,里面放一些函数调用,比如:
[job1.run(), job2.run()]
然后你可以遍历这个列表,逐个调用这些函数 :)
谢谢 Arjun
5
这段内容是关于编程问题的讨论,通常在StackOverflow上,程序员们会分享他们遇到的困难和解决方案。大家会提问、回答,互相帮助,解决各种技术难题。
在这个特定的讨论中,可能涉及到一些代码示例和技术细节,目的是让其他人更好地理解问题的本质和解决方法。
如果你是编程新手,看到这些内容时,可以关注他们是如何描述问题的,如何一步步分析和解决的。这样可以帮助你在未来遇到类似问题时,知道该如何去思考和处理。
记得多动手实践,尝试自己写代码,遇到问题时再去查找资料或请教他人,这样才能更快地提高自己的编程能力。
import urllib
import threading
import datetime
pages = ['http://google.com', 'http://yahoo.com', 'http://msn.com']
#------------------------------------------------------------------------------
# Getting the pages WITHOUT threads
#------------------------------------------------------------------------------
def job(url):
response = urllib.urlopen(url)
html = response.read()
def runjobs():
for page in pages:
job(page)
start = datetime.datetime.now()
runjobs()
end = datetime.datetime.now()
print "jobs run in {} microseconds WITHOUT threads" \
.format((end - start).microseconds)
#------------------------------------------------------------------------------
# Getting the pages WITH threads
#------------------------------------------------------------------------------
def job(url):
response = urllib.urlopen(url)
html = response.read()
def runjobs():
threads = []
for page in pages:
t = threading.Thread(target=job, args=(page,))
t.start()
threads.append(t)
for t in threads:
t.join()
start = datetime.datetime.now()
runjobs()
end = datetime.datetime.now()
print "jobs run in {} microsecond WITH threads" \
.format((end - start).microseconds)