一旦没有线要我,垃圾箱就去锁

2024-04-18 07:56:54 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个函数,决不能同时从两个线程调用相同的值。为了实现这一点,我有一个defaultdict,它为给定的密钥生成新的threading.Lock。因此,我的代码与此类似:

from collections import defaultdict
import threading

lock_dict = defaultdict(threading.Lock)
def f(x):
    with lock_dict[x]:
        print "Locked for value x"

问题是,一旦不再需要该锁,我就无法想出如何安全地从defaultdict中删除该锁。如果不这样做,我的程序就有一个内存泄漏,当用许多不同的x值调用f时,这种情况就会变得很明显

我不能简单地在f的末尾del lock_dict[x],因为在另一个线程正在等待锁的情况下,第二个线程将锁定一个不再与lock-dict[x]关联的锁,因此两个线程最终可能同时调用具有相同值x的f


Tags: 函数代码fromimportlockdefwith密钥
1条回答
网友
1楼 · 发布于 2024-04-18 07:56:54

我会用另一种方法:

fcond = threading.Condition()
fargs = set()

def f(x):
    with fcond:
        while x in fargs:
            fcond.wait()
        fargs.add(x)  # this thread has exclusive rights to use `x`

    # do useful stuff with x
    # any other thread trying to call f(x) will
    # block in the .wait above()

    with fcond:
        fargs.remove(x)      # we're done with x
        fcond.notify_all()   # let blocked threads (if any) proceed

条件有一个学习曲线,但一旦上升,它们会使编写正确的线程安全、无竞争的代码变得更容易。在

原代码的线程安全性

@JimMischel在一篇评论中问原始人使用defaultdict是否受到种族歧视。好问题!在

答案是-唉-“你必须盯着你的特定Python实现”。在

假设CPython实现:如果defaultdict调用的代码的任何一个都调用Python代码,或者释放GIL(全局解释器锁)的C代码,那么2个(或更多)线程可以“同时”调用withlock_dict[x],并且x不在dict中,并且:

  1. 线程1发现x不在dict中,获得一个锁,然后丢失它的时间片(在dict中设置x之前)。在
  2. 线程2看到x不在dict中,并且还得到一个锁。在
  3. 其中一个线程的锁以dict结尾,但是两个线程都执行f(x)。在

盯着3.4.0a4+的源代码(当前的开发主管),defaultdict和{}都是由不发布GIL的C代码实现的。我不记得早期版本在不同的时候是否实现了Python中的defaultdict或{}的全部或部分。在

我建议的替代代码中充满了用Python实现的东西(所有threading.Condition方法),但是设计上没有竞争——即使您使用的是旧版本的Python,其中的集合也是用Python实现的(只有在条件变量锁的保护下才能访问集合)。在

每个参数一个锁

如果没有条件,这似乎要困难得多。在最初的方法中,我相信您需要保留想要使用x的线程的数量,并且需要一个锁来保护这些计数和保护字典。我为此想出的最好的代码是冗长的,把它放在上下文管理器中似乎是最明智的。要使用,请为每个需要它的函数创建一个参数锁:

^{pr2}$

然后f()的主体可以简单地编码:

def f(x):
    with farglocker(x):
        # only one thread at a time can run with argument `x`

当然,条件方法也可以包装在上下文管理器中。代码如下:

import threading

class ArgLocker:
    def __init__(self):
        self.xs = dict() # maps x to (lock, count) pair
        self.lock = threading.Lock()

    def __call__(self, x):
        return AllMine(self.xs, self.lock, x)

class AllMine:
    def __init__(self, xs, lock, x):
        self.xs = xs
        self.lock = lock
        self.x = x

    def __enter__(self):
        x = self.x
        with self.lock:
            xlock = self.xs.get(x)
            if xlock is None:
                xlock = threading.Lock()
                xlock.acquire()
                count = 0
            else:
                xlock, count = xlock
            self.xs[x] = xlock, count + 1

        if count: # x was already known - wait for it
            xlock.acquire()
        assert xlock.locked

    def __exit__(self, *args):
        x = self.x
        with self.lock:
            xlock, count = self.xs[x]
            assert xlock.locked
            assert count > 0
            count -= 1
            if count:
                self.xs[x] = xlock, count
            else:
                del self.xs[x]
            xlock.release()

那么哪种方式更好?使用条件;—)这种方式“几乎明显正确”,但是每个参数锁定(LPA)方法有点令人费解。LPA方法的优势在于,当一个线程使用x完成时,允许继续进行的只有线程是那些希望使用相同的x的线程;使用条件,.notify_all()会唤醒等待任何参数的所有阻塞线程。但是,除非在试图使用相同参数的线程之间存在非常严重的争用,否则这不会有太大的影响:使用条件时,唤醒的那些没有等待x的线程保持清醒,直到看到{}为真,然后立即再次阻塞(.wait())。在

相关问题 更多 >