使用Python每15分钟精准计算时间表

0 投票
2 回答
55 浏览
提问于 2025-04-12 20:59

我按照这里的例子进行了操作。

它的运行效果是正常的,但有一个小问题。如果do_job()这个方法执行超过一分钟,它会在完成后再等一分钟,然后再安排下一次计算。

但我希望它能在执行开始后的一分钟就开始计时。请问有没有办法做到这一点?任何建议都非常感谢。

编辑

import schedule
import time

def job():
    print("Process triggered at:", time.strftime("%Y-%m-%d %H:%M:%S"))
    time.sleep(30)
    print("Process ended at:", time.strftime("%Y-%m-%d %H:%M:%S"))

# Define the frequency (in seconds) at which the process should be triggered
frequency_seconds = 60  # Trigger every 60 seconds

# Schedule the job to run at the specified frequency
schedule.every(frequency_seconds).seconds.do(job)

# Main loop to keep the script running
while True:
    schedule.run_pending()
    time.sleep(1)  # Sleep for 1 second to avoid high CPU usage

在上面的代码中,生成的输出如下:

进程开始时间:2024-03-25 06:57:23
进程结束时间:2024-03-25 06:57:53
进程开始时间:2024-03-25 06:58:54
进程结束时间:2024-03-25 06:59:24

可以看到,第二个进程是在第一个进程结束后才开始的。第一个进程结束时间和第二个进程开始时间之间的时间差是60秒。

但我希望的是,第一个进程开始时间和第二个进程开始时间之间的时间差应该是60秒。
60秒的计时不应该从进程结束时间开始。job()应该在每60秒的间隔内被调用。所以我期望的输出应该是这样的:

进程开始时间:2024-03-25 06:57:23
进程结束时间:2024-03-25 06:57:53
进程开始时间:2024-03-25 06:58:23
进程结束时间:2024-03-25 06:58:53

我希望这个期望的输出能清楚地说明我想要实现的目标。再次感谢你们抽时间阅读和回复。

2 个回答

1

如果你的目标是让新任务在上一个任务开始后的几秒钟内启动,而不是在上一个任务结束后启动,你可以使用下面的代码:

#!/usr/bin/env python3

import sched, time

# Define the frequency (in seconds) at which the process should be triggered
frequency_seconds = 60  # Trigger every 60 seconds
priority = 1

# Create a scheduler
s = sched.scheduler(time.time, time.sleep)

def job():
    # Reschedule ourselves for next run
    s.enterabs(time.time() + frequency_seconds, priority, job)

    print("Process triggered at:", time.strftime("%Y-%m-%d %H:%M:%S"))
    time.sleep(25)
    print("Process ended at:", time.strftime("%Y-%m-%d %H:%M:%S"))

# Start first job immediately, i.e. delay=0
s.enter(0, priority, job)

# Keep running future jobs
s.run()

如果你担心在之前的任务还在运行的时候,新的任务不能启动。也就是说,你想每分钟启动一个新任务,即使这些任务需要90秒才能完成,你可能需要使用线程。请告诉我这是否是你的问题。

1

timed-count 这个包让这个事情变得很简单。

它不会随着时间的推移而变慢,而且如果你的任务花费的时间太长,它还可以选择性地抛出一个错误。

from timed_count import timed_count

for _ in timed_count(period=60, error_on_missed=True):
    job()

撰写回答