Python计划每几秒钟运行一次函数

2024-04-16 19:40:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我是新使用python的,我有一个python代码,其中有3个函数。我想在每X秒后运行这三个函数。我正在使用APScheduler包和add_作业方法。当我为一个函数实现时,我看到一条警告消息说“跳过:已达到最大运行实例数(1)”。那么,当我使用add_job方法调度所有三个函数时会发生什么

代码看起来像这样

  scheduler1 = APScheduler()
scheduler2 = APScheduler()
scheduler3 = APScheduler()

def fun1():
  print("From Func1")

def fun2():
  print("From Func2")

def fun3():
  print("From Func3")


if __name__ == '__main__':
    scheduler1.add_job(id='Scheduled task', func=fun1, trigger='interval', seconds=5)
    scheduler.start()
    scheduler2.add_job(id='Scheduled task', func=fun2, trigger='interval', seconds=5)
    scheduler.start()
    scheduler3.add_job(id='Scheduled task', func=fun3, trigger='interval', seconds=5)
    scheduler.start()

使用上述类型的代码,我可以实现一个在后台运行三个作业并且每5秒运行一次的调度程序吗


Tags: 函数代码fromaddidtaskdefjob
2条回答

这行吗

import time

scheduler = APScheduler()

def fun1():
  print("From Func1")

def fun2():
  print("From Func2")

def fun3():
  print("From Func3")


if __name__ == '__main__':
    while True:
        scheduler.add_job(id='Scheduled task', func=fun1, trigger='interval', seconds=5)
        scheduler.add_job(id='Scheduled task', func=fun2, trigger='interval', seconds=5)
        scheduler.add_job(id='Scheduled task', func=fun3, trigger='interval', seconds=5)
        scheduler.start()
        time.sleep(20)

它应该一直运行,直到按Ctrl+c

你也可以这样试试

import time

scheduler = APScheduler()

def fun1():
  print("From Func1")

def fun2():
  print("From Func2")

def fun3():
  print("From Func3")

scheduler.add_job(id='Scheduled task', func=fun1, trigger='interval', seconds=5)
scheduler.add_job(id='Scheduled task', func=fun2, trigger='interval', seconds=5)
scheduler.add_job(id='Scheduled task', func=fun3, trigger='interval', seconds=5)
scheduler.start()

if __name__ == '__main__':
    while True:
        time.sleep(20)

这样,您只需将作业添加到调度程序中一次,然后一遍又一遍地运行启动循环

首先,你的代码没有什么问题,只是在输入对象名时犯了愚蠢的错误,而且显然没有优化

以下是通过修正打字错误而获得的版本

from apscheduler.schedulers.background import BackgroundScheduler
from time import sleep

scheduler1 = BackgroundScheduler()
scheduler2 = BackgroundScheduler()
scheduler3 = BackgroundScheduler()

def fun1():
  print("From Func1")

def fun2():
  print("From Func2")

def fun3():
  print("From Func3")


if __name__ == '__main__':
    scheduler1.add_job(id='Scheduled task', func=fun1, trigger='interval', seconds=5)
    scheduler1.start()
    scheduler2.add_job(id='Scheduled task', func=fun2, trigger='interval', seconds=5)
    scheduler2.start()
    scheduler3.add_job(id='Scheduled task', func=fun3, trigger='interval', seconds=5)
    scheduler3.start()
    while True:
        sleep(1)

我添加了额外的睡眠功能,以阻止程序被终止,并测试计时器是否工作,其工作正常

以下版本是代码的优化形式

from apscheduler.schedulers.background import BackgroundScheduler
from time import sleep

scheduler = BackgroundScheduler()

def fun1():
  print("From Func1")

def fun2():
  print("From Func2")

def fun3():
  print("From Func3")


if __name__ == '__main__':
    scheduler.add_job(id='Scheduled task 1', func=fun1, trigger='interval', seconds=5)
    scheduler.add_job(id='Scheduled task 2', func=fun2, trigger='interval', seconds=5)
    scheduler.add_job(id='Scheduled task 3', func=fun3, trigger='interval', seconds=5)
    scheduler.start()
    while True:
        sleep(1)

使用单个调度程序对象在指定的时间段后运行所有函数

enter image description here

相关问题 更多 >