动态分配和销毁互斥锁?

0 投票
2 回答
870 浏览
提问于 2025-04-16 14:11

我有一个应用程序,是基于Eventlet构建的。

我正在尝试写一个不错的装饰器,用来在多个线程之间同步对某些方法的调用。

现在这个装饰器大概是这样的:

_semaphores_semaphore = semaphore.Semaphore()
_semaphores = {}

def synchronized(name):
    def wrap(f):
        def inner(*args, **kwargs):
            # Grab the lock protecting _semaphores.
            with _semaphores_semaphore:
                # If the named semaphore does not yet exist, create it.
                if name not in _semaphores:
                    _semaphores[name] = semaphore.Semaphore()
                sem = _semaphores[name]

            with sem:
                return f(*args, **kwargs)

这个装饰器运行得很好,看起来也很安全,虽然我对线程安全和锁的理解可能有点生疏。

问题是,应用程序中其他地方已经有一个特定的使用信号量的情况,我想把它改成使用这个装饰器。这个信号量是根据用户输入动态创建的:它需要创建一个文件。它会在一个字典中检查是否已经有这个文件的信号量,如果没有,就创建一个并锁定它。一旦完成并释放了锁,它会检查这个信号量是否被其他进程锁定了,如果没有,就删除这个信号量。这段代码是基于绿色线程的假设写的,在那个上下文中是安全的,但如果我想把它改成使用我的装饰器,这就是我无法解决的问题。

如果我不在乎清理那些可能永远不会再用的信号量(可能会有成千上万的这些),那我没问题。如果我想清理它们,我就不知道该怎么做。

要删除信号量,显然我需要持有_semaphores_semaphore,因为我在操作_semaphores字典,但我也得对特定的信号量做点什么,而我想到的所有方法似乎都有竞争条件的问题: * 在“with sem:”块内,我可以从_semaphores中获取_semaphores_semaphore和sem。然而,其他线程可能在等待它(在“with sem:”),如果有新的线程想要访问同样的资源,它在_semaphores中找不到相同的信号量,而是会创建一个新的 => 失败。 我可以稍微改进一下,检查sem的状态,看看是否有其他线程在等我释放它。如果有,就不动它;如果没有,就删除它。这样,最后一个等待该资源的线程会删除它。然而,如果一个线程刚刚离开“with _semaphores_semaphore:”块,但还没到“with sem:”块,我就会遇到之前同样的问题 => 失败。

我感觉我可能漏掉了什么明显的东西,但我就是想不出来是什么。

2 个回答

0

mchro的回答对我没用,因为当一个线程需要创建新的信号量时,它会阻塞所有线程在同一个信号量上。

我想出的解决办法是,在两个事务之间用_semaphores来记录占用人数(这两个操作都是在同一个互斥锁下完成的):

A: get semaphore
A1: dangerzone
B: with sem: block etc
C: cleanup semaphore

问题在于,如何知道在AC之间有多少人。信号量的计数器并不能告诉你,因为可能有人在A1里。解决办法是为每个信号量在_semaphores中保持一个进入者的计数器,在A时加1,在C时减1,如果计数器为0,那就说明在AC之间没有其他人使用同一个键,这样你就可以安全地删除它。

0

我觉得你可以用一种叫做读写锁的东西来解决这个问题,也就是在_semaphores字典上使用共享-独占锁。这段代码还没有经过测试,只是为了展示这个原理。你可以在这里找到一个读写锁的实现:http://code.activestate.com/recipes/413393-multiple-reader-one-writer-mrow-resource-locking/

_semaphores_rwlock = RWLock()
_semaphores = {}

def synchronized(name):
    def wrap(f):
        def inner(*args, **kwargs):
            lock = _semaphores_rwlock.reader()
            # If the named semaphore does not yet exist, create it.
            if name not in _semaphores:
                lock = _semaphores_rwlock.writer()
                _semaphores[name] = semaphore.Semaphore()

            sem = _semaphores[name]

            with sem:
                retval = f(*args, **kwargs)
            lock.release()
            return retval

当你想要清理的时候,你可以这样做:

wlock = _semaphores_rwlock.writer() #this might take a while; it waits for all readers to release
cleanup(_semaphores)
wlock.release()

撰写回答