python apscheduler,有更简单的方式运行任务吗?

1 投票
2 回答
1873 浏览
提问于 2025-04-18 03:30

我有一些工作是通过 apscheduler 来安排的。目前我有3个工作,但不久之后会有更多。我在寻找一种方法来让我的代码更好地扩展。

现在,每个工作都是一个独立的 .py 文件,在这些文件里,我把脚本变成了一个名为 run() 的函数。以下是我的代码。

from apscheduler.scheduler import Scheduler
import logging

import job1
import job2
import job3

logging.basicConfig()
sched = Scheduler()

@sched.cron_schedule(day_of_week='mon-sun', hour=7)

def runjobs():
    job1.run()
    job2.run()
    job3.run()    

sched.start()

这段代码现在是可以工作的,但其实有点傻,因为它只是完成了任务。但是当我有50个工作的时候,这段代码会变得非常冗长。我该如何扩展它呢?

注意:这些工作的实际名称是随意的,没有遵循任何模式。文件的名称是 scheduler.py,我在 Python 终端中通过 execfile('scheduler.py') 来运行它。

2 个回答

1

看看这个链接:

http://furius.ca/pubcode/pub/conf/bin/python-recursive-import-test

这个链接可以帮助你导入所有的 Python 文件,也就是以 .py 结尾的文件。

在导入的时候,你可以创建一个列表,里面放一些函数调用,比如:

[job1.run(), job2.run()]

然后你可以遍历这个列表,逐个调用这些函数 :)

谢谢 Arjun

5

这段内容是关于编程问题的讨论,通常在StackOverflow上,程序员们会分享他们遇到的困难和解决方案。大家会提问、回答,互相帮助,解决各种技术难题。

在这个特定的讨论中,可能涉及到一些代码示例和技术细节,目的是让其他人更好地理解问题的本质和解决方法。

如果你是编程新手,看到这些内容时,可以关注他们是如何描述问题的,如何一步步分析和解决的。这样可以帮助你在未来遇到类似问题时,知道该如何去思考和处理。

记得多动手实践,尝试自己写代码,遇到问题时再去查找资料或请教他人,这样才能更快地提高自己的编程能力。

import urllib
import threading
import datetime

pages = ['http://google.com', 'http://yahoo.com', 'http://msn.com']

#------------------------------------------------------------------------------
# Getting the pages WITHOUT threads
#------------------------------------------------------------------------------
def job(url):
    response = urllib.urlopen(url)
    html = response.read()

def runjobs():
    for page in pages:
        job(page)

start = datetime.datetime.now()
runjobs()
end = datetime.datetime.now()

print "jobs run in {} microseconds WITHOUT threads" \
      .format((end - start).microseconds)

#------------------------------------------------------------------------------
# Getting the pages WITH threads
#------------------------------------------------------------------------------
def job(url):
    response = urllib.urlopen(url)
    html = response.read()

def runjobs():
    threads = []
    for page in pages:
        t = threading.Thread(target=job, args=(page,))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

start = datetime.datetime.now()
runjobs()
end = datetime.datetime.now()

print "jobs run in {} microsecond WITH threads" \
      .format((end - start).microseconds)

撰写回答