Python中有没有简单的方法等待某个条件成立?
我想在一个脚本中等待,直到某些条件都变成真。
我知道可以自己实现事件处理,使用条件变量等,但我不想费那么大劲去做。因为有些对象的属性变化是来自一个外部线程,那个线程是在一个用C++写的库(Boost.Python)里,所以我不能简单地在一个类里重写__setattr__
方法来放一个条件变量。这让我只能尝试从C++创建并信号一个Python条件变量,或者把一个本地的条件变量包装起来,然后在Python中等待。这两种方法听起来都很麻烦,复杂得没有必要,而且还很无聊。
有没有更简单的方法来做到这一点,而不是不停地轮询条件呢?
理想情况下,它应该类似于
res = wait_until(lambda: some_predicate, timeout)
if (not res):
print 'timed out'
12 个回答
这里有另一种解决方案。目标是让多个线程在进行某些工作之前,彼此等待,以确保工作按照非常精确的顺序进行。这项工作可能需要的时间是未知的。不断地轮询(检查条件)不好,主要有两个原因:一是会消耗CPU时间,二是条件满足后,动作并不会立即开始。
class Waiter():
def __init__(self, init_value):
self.var = init_value
self.var_mutex = threading.Lock()
self.var_event = threading.Event()
def WaitUntil(self, v):
while True:
self.var_mutex.acquire()
if self.var == v:
self.var_mutex.release()
return # Done waiting
self.var_mutex.release()
self.var_event.wait(1) # Wait 1 sec
def Set(self, v):
self.var_mutex.acquire()
self.var = v
self.var_mutex.release()
self.var_event.set() # In case someone is waiting
self.var_event.clear()
这是测试它的方法
class TestWaiter():
def __init__(self):
self.waiter = Waiter(0)
threading.Thread(name='Thread0', target=self.Thread0).start()
threading.Thread(name='Thread1', target=self.Thread1).start()
threading.Thread(name='Thread2', target=self.Thread2).start()
def Thread0(self):
while True:
self.waiter.WaitUntil(0)
# Do some work
time.sleep(np.random.rand()*2)
self.waiter.Set(1)
def Thread1(self):
while True:
self.waiter.WaitUntil(1)
# Do some work
time.sleep(np.random.rand())
self.waiter.Set(2)
def Thread2(self):
while True:
self.waiter.WaitUntil(2)
# Do some work
time.sleep(np.random.rand()/10)
self.waiter.Set(0)
用于多进程的等待器:
import multiprocessing as mp
import ctypes
class WaiterMP():
def __init__(self, init_value, stop_value=-1):
self.var = mp.Value(ctypes.c_int, init_value)
self.stop_value = stop_value
self.event = mp.Event()
def Terminate(self):
self.Set(self.stop_value)
def Restart(self):
self.var.value = self.init_value
def WaitUntil(self, v):
while True:
if self.var.value == v or self.var.value == self.stop_value:
return
# Wait 1 sec and check aiagn (in case event was missed)
self.event.wait(1)
def Set(self, v):
exit = self.var.value == self.stop_value
if not exit: # Do not set var if threads are exiting
self.var.value = v
self.event.set() # In case someone is waiting
self.event.clear()
如果这仍然不是最佳解决方案,请留言。
另一个不错的工具包是 waiting
- https://pypi.org/project/waiting/
安装方法:
pip install waiting
使用方法: 你需要传入一个函数,这个函数会在每次检查条件时被调用,还有一个超时时间(也就是你希望等待的时间),另外(这很有用)你可以传入一个描述信息,如果超时了,这个描述会显示出来。
使用函数:
from waiting import wait
def is_something_ready(something):
if something.ready():
return True
return False
# wait for something to be ready
something = # whatever
wait(lambda: is_something_ready(something), timeout_seconds=120, waiting_for="something to be ready")
# this code will only execute after "something" is ready
print("Done")
注意:这个函数必须返回一个布尔值 - 当等待结束时返回 True,否则返回 False。
很遗憾,满足你的要求的唯一方法就是定期去检查,也就是我们说的 轮询,比如说……:
import time
def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs):
mustend = time.time() + timeout
while time.time() < mustend:
if somepredicate(*args, **kwargs): return True
time.sleep(period)
return False
或者类似的方式。如果 somepredicate
可以拆分成更简单的部分,这样可以优化很多(比如说,如果它是几个条件的 and
组合,尤其是其中一些条件可以通过 threading.Event
等方式来优化检测),那么就可以有更多的优化方法。但就你问的这些一般情况来说,这种效率不高的方法就是唯一的解决办法。