在多个线程中共享的Python内存数组(类似于进程空间中的memcached)
我有一个多线程的程序,简单来说,它的功能是下载网页、处理这些网页并存储结果。处理网页所用的规则等信息存储在一个数据库里。最开始的时候,这个数据库的请求量非常大(处理每个网页需要1到50次请求)。第一步是把这些信息缓存到memcached中(如果某个域名没有规则,它就返回一个空字符串""),这样比每处理一个项目都要请求数据库1到50次要好得多。但是我现在还是在频繁请求memcached,这样会增加网络延迟(每处理一个项目需要1到50次往返请求,这个数量很快就累积起来了,即使是在本地以太网中也是如此)。
所以我想把结果缓存到进程空间中的一个数组里,基本上就是在内存中复制memcached。数据量还好,我打算用Python的集合来基本复制键值存储(这很简单)。
但问题是:通常一堆线程会同时访问同一个网站,并需要相同的规则集,所以我想避免“雷鸣般的群体问题”(也就是说,10个线程都在尝试获取example.com的规则,如果这些规则不在本地缓存中,也不在memcached中,就会导致数据库被请求,虽然不是特别严重,但还是有点影响)。
设置一个线程(称为“update_thread”)来更新内存中的数组,建立一个工作队列。如果某个线程无法从本地缓存中获取某个域名的规则,它就把这个域名写入工作队列,然后睡眠一段时间再尝试,直到本地内存缓存中有一个空字符串""或者一组规则可用为止。线程“update_thread”会读取工作队列,从memcached中获取规则,如果那里没有,就从数据库中获取,并将它们写入memcached和本地缓存(如果没有规则,就在值中传播一个空字符串"")。这样做的缺点是增加了一个线程;会增加全局解释器锁(GIL)的竞争,稍微会有延迟(我们必须等“update_thread”运行,因为我们受制于GIL)。此外,还增加了另一个线程和工作队列的复杂性。只有“update_thread”可以写入内存缓存数组,所以不需要加锁等操作。
我们使用锁来控制对内存中缓存数组的写入访问。如果某个线程找不到规则集,它会尝试从memcached中获取规则集,如果那里也没有,就请求数据库。一旦找到规则,它会锁定内存数组并将规则(或者空字符串""作为值)写入内存缓存。缺点是:我们可能仍然会遇到“雷鸣般的群体问题”,但可以通过写入一个特殊值,比如“正在获取规则,请稍等一秒”,来缓解这个问题,这样其他线程就会等待。
有没有其他人能想到其他解决方案,或者对我提出的两个方案发表意见?我觉得我可能会选择第二个方案,因为加锁加上“正在获取规则,请稍等一秒”似乎比增加一个线程和工作队列要简单。或者我是不是漏掉了什么显而易见且简单的解决方案?
2 个回答
从一个独立的、受控的进程空间跳到共享内存和某种互斥锁,这个变化可真不小。
如果你的问题是因为要处理某些事情而产生的五十次往返延迟,那为什么不使用多重获取(multiget),一次性处理所有呢?
如果我理解得没错,问题是多个线程会同时从memcached获取相同的数据。你希望能协调这些线程,让一个线程去获取数据,而其他线程则等待,等数据到达后再共享这些数据。
你可以为想要缓存的对象创建一个包装类。在开始通过网络获取值之前,先在缓存中放一个空的包装对象。如果其他线程也在寻找相同的数据,它们会被阻塞,直到这个值到达。
这是包装对象的代码:
class PendingValue(object):
def __init__(self):
self._event = threading.Event()
def get(self):
self._event.wait()
return self._value
def set(self, value):
self._value = value
self._event.set()
这是缓存的代码:
class Cache(object):
def __init__(self):
self._dict = {}
self._lock = threading.Lock()
def __getitem__(self, key):
self._lock.acquire()
try:
pv = self._dict[key]
self._lock.release()
return pv.get()
except KeyError: #key not in cache
pv = PendingValue()
self._dict[key] = pv
self._lock.release()
value = retrieve_value_from_external_source()
pv.set(value)
return value