如何在Python 2.7中实现带超时的锁
有没有办法在Python中实现一个锁,用于多线程的场景,并且它的acquire
方法可以设置任意的超时时间?我目前找到的解决方案都是通过轮询来实现的,这种方法
- 我觉得不够优雅,也不够高效
- 而且没有保证锁在解决关键区问题时的有限等待和进展
有没有更好的实现方法呢?
7 个回答
3
如果有人需要Python 3.2及以上版本的接口:
import threading
import time
class Lock(object):
_lock_class = threading.Lock
def __init__(self):
self._lock = self._lock_class()
self._cond = threading.Condition(threading.Lock())
def acquire(self, blocking=True, timeout=-1):
if not blocking or timeout == 0:
return self._lock.acquire(False)
cond = self._cond
lock = self._lock
if timeout < 0:
with cond:
while True:
if lock.acquire(False):
return True
else:
cond.wait()
else:
with cond:
current_time = time.time()
stop_time = current_time + timeout
while current_time < stop_time:
if lock.acquire(False):
return True
else:
cond.wait(stop_time - current_time)
current_time = time.time()
return False
def release(self):
with self._cond:
self._lock.release()
self._cond.notify()
__enter__ = acquire
def __exit__(self, t, v, tb):
self.release()
class RLock(Lock):
_lock_class = threading.RLock
6
我用线程安全的队列写了一个版本,具体可以参考这个链接 http://docs.python.org/2/library/queue.html,它的 put 和 get 方法支持超时功能。
到现在为止运行得很好,但如果有人能帮我看看这个代码,我会很感激。
"""
Thread-safe lock mechanism with timeout support module.
"""
from threading import ThreadError, current_thread
from Queue import Queue, Full, Empty
class TimeoutLock(object):
"""
Thread-safe lock mechanism with timeout support.
"""
def __init__(self, mutex=True):
"""
Constructor.
Mutex parameter specifies if the lock should behave like a Mutex, and
thus use the concept of thread ownership.
"""
self._queue = Queue(maxsize=1)
self._owner = None
self._mutex = mutex
def acquire(self, timeout=0):
"""
Acquire the lock.
Returns True if the lock was succesfully acquired, False otherwise.
Timeout:
- < 0 : Wait forever.
- 0 : No wait.
- > 0 : Wait x seconds.
"""
th = current_thread()
try:
self._queue.put(
th, block=(timeout != 0),
timeout=(None if timeout < 0 else timeout)
)
except Full:
return False
self._owner = th
return True
def release(self):
"""
Release the lock.
If the lock is configured as a Mutex, only the owner thread can release
the lock. If another thread attempts to release the lock a
ThreadException is raised.
"""
th = current_thread()
if self._mutex and th != self._owner:
raise ThreadError('This lock isn\'t owned by this thread.')
self._owner = None
try:
self._queue.get(False)
return True
except Empty:
raise ThreadError('This lock was released already.')
23
为了更详细地解释Steven的评论建议:
import threading
import time
lock = threading.Lock()
cond = threading.Condition(threading.Lock())
def waitLock(timeout):
with cond:
current_time = start_time = time.time()
while current_time < start_time + timeout:
if lock.acquire(False):
return True
else:
cond.wait(timeout - current_time + start_time)
current_time = time.time()
return False
需要注意的几点:
- 这里有两个
threading.Lock()
对象,一个是内部的threading.Condition()
使用的。 - 在操作
cond
时,会先获取它的锁;不过,调用wait()
时会释放这个锁,这样就可以让多个线程同时观察它。 - 这个等待操作是在一个循环里进行的,这个循环会跟踪时间。
threading.Condition
可能会因为其他原因被通知,而不仅仅是超时,所以如果你真的想让它超时,还是需要跟踪时间。 - 即使有了条件,你仍然需要“轮询”真正的锁,因为可能会有多个线程同时被唤醒并争抢这个锁。如果获取锁失败,循环就会返回继续等待。
- 调用这个
waitLock
函数的地方,应该在lock.release()
后跟一个cond.notify()
,这样其他等待的线程就会被通知可以重新尝试获取锁。这个在例子中没有展示。