如何在python中创建一个后台线程间隔函数调用?

2024-06-05 23:55:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试实现在后台工作的心跳调用。如何创建一个线程化的间隔调用,即每30秒调用一次,调用以下函数:

self.mqConn.heartbeat_tick()

我怎么才能阻止这条线呢?

非常感谢。


Tags: 函数self间隔线程后台heartbeattick线呢
2条回答

使用包含循环的线程

from threading import Thread
import time

def background_task():
    while not background_task.cancelled:
        self.mqConn.heartbeat_tick()
        time.sleep(30)
background_task.cancelled = False

t = Thread(target=background_task)
t.start()

background_task.cancelled = True

或者,您可以将timer子类化,使取消操作变得容易:

from threading import Timer

class RepeatingTimer(Timer):
    def run(self):
        while not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
            self.finished.wait(self.interval)


t = RepeatingTimer(30.0, self.mqConn.heartbeat_tick)
t.start() # every 30 seconds, call heartbeat_tick

# later
t.cancel() # cancels execution

Eric的回答的一个简短的后续说明:在python 2中,不能对Timer进行子类划分,因为它实际上是围绕真正类_Timer的一个轻量级函数包装器。如果你这样做了,你会得到在this post中弹出的问题。

使用_Timer代替修复它:

from threading import _Timer

class RepeatingTimer(_Timer):
    def run(self):
        while not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
            self.finished.wait(self.interval)


t = RepeatingTimer(30.0, self.mqConn.heartbeat_tick)
t.start() # every 30 seconds, call heartbeat_tick

# later
t.cancel() # cancels execution

相关问题 更多 >