在多个线程中共享的Python内存数组(类似于进程空间中的memcached)

2 投票
2 回答
2271 浏览
提问于 2025-04-16 10:10

我有一个多线程的程序,简单来说,它的功能是下载网页、处理这些网页并存储结果。处理网页所用的规则等信息存储在一个数据库里。最开始的时候,这个数据库的请求量非常大(处理每个网页需要1到50次请求)。第一步是把这些信息缓存到memcached中(如果某个域名没有规则,它就返回一个空字符串""),这样比每处理一个项目都要请求数据库1到50次要好得多。但是我现在还是在频繁请求memcached,这样会增加网络延迟(每处理一个项目需要1到50次往返请求,这个数量很快就累积起来了,即使是在本地以太网中也是如此)。

所以我想把结果缓存到进程空间中的一个数组里,基本上就是在内存中复制memcached。数据量还好,我打算用Python的集合来基本复制键值存储(这很简单)。

但问题是:通常一堆线程会同时访问同一个网站,并需要相同的规则集,所以我想避免“雷鸣般的群体问题”(也就是说,10个线程都在尝试获取example.com的规则,如果这些规则不在本地缓存中,也不在memcached中,就会导致数据库被请求,虽然不是特别严重,但还是有点影响)。

  1. 设置一个线程(称为“update_thread”)来更新内存中的数组,建立一个工作队列。如果某个线程无法从本地缓存中获取某个域名的规则,它就把这个域名写入工作队列,然后睡眠一段时间再尝试,直到本地内存缓存中有一个空字符串""或者一组规则可用为止。线程“update_thread”会读取工作队列,从memcached中获取规则,如果那里没有,就从数据库中获取,并将它们写入memcached和本地缓存(如果没有规则,就在值中传播一个空字符串"")。这样做的缺点是增加了一个线程;会增加全局解释器锁(GIL)的竞争,稍微会有延迟(我们必须等“update_thread”运行,因为我们受制于GIL)。此外,还增加了另一个线程和工作队列的复杂性。只有“update_thread”可以写入内存缓存数组,所以不需要加锁等操作。

  2. 我们使用锁来控制对内存中缓存数组的写入访问。如果某个线程找不到规则集,它会尝试从memcached中获取规则集,如果那里也没有,就请求数据库。一旦找到规则,它会锁定内存数组并将规则(或者空字符串""作为值)写入内存缓存。缺点是:我们可能仍然会遇到“雷鸣般的群体问题”,但可以通过写入一个特殊值,比如“正在获取规则,请稍等一秒”,来缓解这个问题,这样其他线程就会等待。

有没有其他人能想到其他解决方案,或者对我提出的两个方案发表意见?我觉得我可能会选择第二个方案,因为加锁加上“正在获取规则,请稍等一秒”似乎比增加一个线程和工作队列要简单。或者我是不是漏掉了什么显而易见且简单的解决方案?

2 个回答

0

从一个独立的、受控的进程空间跳到共享内存和某种互斥锁,这个变化可真不小。

如果你的问题是因为要处理某些事情而产生的五十次往返延迟,那为什么不使用多重获取(multiget),一次性处理所有呢?

1

如果我理解得没错,问题是多个线程会同时从memcached获取相同的数据。你希望能协调这些线程,让一个线程去获取数据,而其他线程则等待,等数据到达后再共享这些数据。

你可以为想要缓存的对象创建一个包装类。在开始通过网络获取值之前,先在缓存中放一个空的包装对象。如果其他线程也在寻找相同的数据,它们会被阻塞,直到这个值到达。

这是包装对象的代码:

class PendingValue(object):
    def __init__(self):
        self._event = threading.Event()

    def get(self):
        self._event.wait()
        return self._value

    def set(self, value):
        self._value = value
        self._event.set()

这是缓存的代码:

class Cache(object):
    def __init__(self):
        self._dict = {}
        self._lock = threading.Lock()

    def __getitem__(self, key):
        self._lock.acquire()
        try:
            pv = self._dict[key]
            self._lock.release()
            return pv.get()
        except KeyError: #key not in cache
            pv = PendingValue()
            self._dict[key] = pv
            self._lock.release()
            value = retrieve_value_from_external_source()
            pv.set(value)
            return value

撰写回答