如何在Python 2.7中实现带超时的锁

24 投票
7 回答
20136 浏览
提问于 2025-04-17 07:46

有没有办法在Python中实现一个锁,用于多线程的场景,并且它的acquire方法可以设置任意的超时时间?我目前找到的解决方案都是通过轮询来实现的,这种方法

  • 我觉得不够优雅,也不够高效
  • 而且没有保证锁在解决关键区问题时的有限等待和进展

有没有更好的实现方法呢?

7 个回答

3

如果有人需要Python 3.2及以上版本的接口:

import threading
import time


class Lock(object):
    _lock_class = threading.Lock

    def __init__(self):
        self._lock = self._lock_class()
        self._cond = threading.Condition(threading.Lock())

    def acquire(self, blocking=True, timeout=-1):
        if not blocking or timeout == 0:
            return self._lock.acquire(False)
        cond = self._cond
        lock = self._lock
        if timeout < 0:
            with cond:
                while True:
                    if lock.acquire(False):
                        return True
                    else:
                        cond.wait()
        else:
            with cond:
                current_time = time.time()
                stop_time = current_time + timeout
                while current_time < stop_time:
                    if lock.acquire(False):
                        return True
                    else:
                        cond.wait(stop_time - current_time)
                        current_time = time.time()
                return False

    def release(self):
        with self._cond:
            self._lock.release()
            self._cond.notify()

    __enter__ = acquire

    def __exit__(self, t, v, tb):
        self.release()


class RLock(Lock):
    _lock_class = threading.RLock
6

我用线程安全的队列写了一个版本,具体可以参考这个链接 http://docs.python.org/2/library/queue.html,它的 put 和 get 方法支持超时功能。

到现在为止运行得很好,但如果有人能帮我看看这个代码,我会很感激。

"""
Thread-safe lock mechanism with timeout support module.
"""

from threading import ThreadError, current_thread
from Queue import Queue, Full, Empty


class TimeoutLock(object):
    """
    Thread-safe lock mechanism with timeout support.
    """

    def __init__(self, mutex=True):
        """
        Constructor.
        Mutex parameter specifies if the lock should behave like a Mutex, and
        thus use the concept of thread ownership.
        """
        self._queue = Queue(maxsize=1)
        self._owner = None
        self._mutex = mutex

    def acquire(self, timeout=0):
        """
        Acquire the lock.
        Returns True if the lock was succesfully acquired, False otherwise.

        Timeout:
        - < 0 : Wait forever.
        -   0 : No wait.
        - > 0 : Wait x seconds.
        """
        th = current_thread()
        try:
            self._queue.put(
                th, block=(timeout != 0),
                timeout=(None if timeout < 0 else timeout)
            )
        except Full:
            return False

        self._owner = th
        return True

    def release(self):
        """
        Release the lock.
        If the lock is configured as a Mutex, only the owner thread can release
        the lock. If another thread attempts to release the lock a
        ThreadException is raised.
        """
        th = current_thread()
        if self._mutex and th != self._owner:
            raise ThreadError('This lock isn\'t owned by this thread.')

        self._owner = None
        try:
            self._queue.get(False)
            return True
        except Empty:
            raise ThreadError('This lock was released already.')
23

为了更详细地解释Steven的评论建议:

import threading
import time

lock = threading.Lock()
cond = threading.Condition(threading.Lock())

def waitLock(timeout):
    with cond:
        current_time = start_time = time.time()
        while current_time < start_time + timeout:
            if lock.acquire(False):
                return True
            else:
                cond.wait(timeout - current_time + start_time)
                current_time = time.time()
    return False

需要注意的几点:

  • 这里有两个 threading.Lock() 对象,一个是内部的 threading.Condition() 使用的。
  • 在操作 cond 时,会先获取它的锁;不过,调用 wait() 时会释放这个锁,这样就可以让多个线程同时观察它。
  • 这个等待操作是在一个循环里进行的,这个循环会跟踪时间。threading.Condition 可能会因为其他原因被通知,而不仅仅是超时,所以如果你真的想让它超时,还是需要跟踪时间。
  • 即使有了条件,你仍然需要“轮询”真正的锁,因为可能会有多个线程同时被唤醒并争抢这个锁。如果获取锁失败,循环就会返回继续等待。
  • 调用这个 waitLock 函数的地方,应该在 lock.release() 后跟一个 cond.notify(),这样其他等待的线程就会被通知可以重新尝试获取锁。这个在例子中没有展示。

撰写回答