Python中有没有简单的方法等待某个条件成立?

61 投票
12 回答
193687 浏览
提问于 2025-04-15 22:26

我想在一个脚本中等待,直到某些条件都变成真。

我知道可以自己实现事件处理,使用条件变量等,但我不想费那么大劲去做。因为有些对象的属性变化是来自一个外部线程,那个线程是在一个用C++写的库(Boost.Python)里,所以我不能简单地在一个类里重写__setattr__方法来放一个条件变量。这让我只能尝试从C++创建并信号一个Python条件变量,或者把一个本地的条件变量包装起来,然后在Python中等待。这两种方法听起来都很麻烦,复杂得没有必要,而且还很无聊。

有没有更简单的方法来做到这一点,而不是不停地轮询条件呢?

理想情况下,它应该类似于

res = wait_until(lambda: some_predicate, timeout)
if (not res):
    print 'timed out'

12 个回答

8

这里有另一种解决方案。目标是让多个线程在进行某些工作之前,彼此等待,以确保工作按照非常精确的顺序进行。这项工作可能需要的时间是未知的。不断地轮询(检查条件)不好,主要有两个原因:一是会消耗CPU时间,二是条件满足后,动作并不会立即开始。

class Waiter():

    def __init__(self, init_value):
        self.var = init_value
        self.var_mutex = threading.Lock()
        self.var_event = threading.Event()

    def WaitUntil(self, v):
        while True:
            self.var_mutex.acquire()
            if self.var == v:
                self.var_mutex.release()
                return # Done waiting
            self.var_mutex.release()
            self.var_event.wait(1) # Wait 1 sec

    def Set(self, v):
        self.var_mutex.acquire()
        self.var = v
        self.var_mutex.release()
        self.var_event.set() # In case someone is waiting
        self.var_event.clear()

这是测试它的方法

class TestWaiter():

    def __init__(self):
        self.waiter = Waiter(0)
        threading.Thread(name='Thread0', target=self.Thread0).start()
        threading.Thread(name='Thread1', target=self.Thread1).start()
        threading.Thread(name='Thread2', target=self.Thread2).start()

    def Thread0(self):
        while True:
            self.waiter.WaitUntil(0)
            # Do some work
            time.sleep(np.random.rand()*2)
            self.waiter.Set(1)

    def Thread1(self):
        while True:
            self.waiter.WaitUntil(1)
            # Do some work
            time.sleep(np.random.rand())
            self.waiter.Set(2)

    def Thread2(self):
        while True:
            self.waiter.WaitUntil(2)
            # Do some work
            time.sleep(np.random.rand()/10)
            self.waiter.Set(0)

用于多进程的等待器:

import multiprocessing as mp
import ctypes

class WaiterMP():
    def __init__(self, init_value, stop_value=-1):
        self.var = mp.Value(ctypes.c_int, init_value)
        self.stop_value = stop_value
        self.event = mp.Event()

    def Terminate(self):
        self.Set(self.stop_value)

    def Restart(self):
        self.var.value = self.init_value

    def WaitUntil(self, v):
        while True:
            if self.var.value == v or self.var.value == self.stop_value:
                return
            # Wait 1 sec and check aiagn (in case event was missed)
            self.event.wait(1)

    def Set(self, v):
        exit = self.var.value == self.stop_value
        if not exit: # Do not set var if threads are exiting
            self.var.value = v
        self.event.set() # In case someone is waiting
        self.event.clear()

如果这仍然不是最佳解决方案,请留言。

20

另一个不错的工具包是 waiting - https://pypi.org/project/waiting/

安装方法:

pip install waiting

使用方法: 你需要传入一个函数,这个函数会在每次检查条件时被调用,还有一个超时时间(也就是你希望等待的时间),另外(这很有用)你可以传入一个描述信息,如果超时了,这个描述会显示出来。

使用函数:

from waiting import wait


def is_something_ready(something):
    if something.ready():
        return True
    return False


# wait for something to be ready
something = # whatever

wait(lambda: is_something_ready(something), timeout_seconds=120, waiting_for="something to be ready")

# this code will only execute after "something" is ready
print("Done")

注意:这个函数必须返回一个布尔值 - 当等待结束时返回 True,否则返回 False。

53

很遗憾,满足你的要求的唯一方法就是定期去检查,也就是我们说的 轮询,比如说……:

import time

def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs):
  mustend = time.time() + timeout
  while time.time() < mustend:
    if somepredicate(*args, **kwargs): return True
    time.sleep(period)
  return False

或者类似的方式。如果 somepredicate 可以拆分成更简单的部分,这样可以优化很多(比如说,如果它是几个条件的 and 组合,尤其是其中一些条件可以通过 threading.Event 等方式来优化检测),那么就可以有更多的优化方法。但就你问的这些一般情况来说,这种效率不高的方法就是唯一的解决办法。

撰写回答