在Twisted中等待其他请求的事件
我有一个简单的Twisted服务器,它处理请求的方式是这样的(当然是异步的)
global SomeSharedMemory
if SomeSharedMemory is None:
SomeSharedMemory = LoadSharedMemory()
return PickSomething(SomeSharedMemory)
其中SomeSharedMemory是从数据库加载的。
我想避免多次从数据库加载SomeSharedMemory。具体来说,当服务器第一次启动时,如果有两个请求同时到达,可能会出现以下情况:
请求1:检查SomeSharedMemory,没找到 请求1:发起数据库查询来加载SSM 请求2:检查SSM,没找到 请求2:发起数据库查询来加载SSM 请求1:查询返回,存储SSM 请求1:返回结果 请求2:查询返回,存储SSM 请求2:返回结果
如果有更多的请求同时到来,数据库就会被频繁访问。我想做一些类似这样的事情(见http://docs.python.org/library/threading.html#event-objects):
global SomeSharedMemory, SSMEvent
if SomeSharedMemory is None:
if not SSMEvent.isSet():
SSMEvent.wait()
else:
# assumes that the event is initialized "set"
SSMEvent.clear()
SomeSharedMemory = LoadSharedMemory()
SSMEvent.set()
return PickSomething(SomeSharedMemory)
这样的话,如果一个请求正在加载共享内存,其他请求就会礼貌地等待,直到查询完成,而不是自己再发起重复的数据库查询。
在Twisted中,这可能吗?
1 个回答
根据你提供的例子,很难理解你所描述的问题是怎么产生的。如果在第一个请求调用LoadSharedMemory
并返回之前,第二个请求就到达了你的Twisted服务器,那么第二个请求会在处理之前先等着。当它最终被处理时,SomeSharedMemory
会被初始化,这样就不会出现重复的情况。
不过,我想也许LoadSharedMemory
是异步的,并且返回一个Deferred
对象,这样你的代码看起来更像是这样:
def handleRequest(request):
if SomeSharedMemory is None:
d = initSharedMemory()
d.addCallback(lambda ignored: handleRequest(request))
else:
d = PickSomething(SomeSharedMemory)
return d
在这种情况下,确实有可能在initSharedMemory
正在执行的时候,第二个请求到达。这样的话,你就会有两个任务试图初始化同一个状态。
当然,解决这个问题的方法是注意到你有第三种状态。除了未初始化和初始化了之外,还有一种状态是初始化中。所以也要表示这个状态。我会把它隐藏在initSharedMemory
函数里面,这样请求处理器就会保持简单:
initInProgress = None
def initSharedMemory():
global initInProgress
if initInProgress is None:
initInProgress = _reallyInit()
def initialized(result):
global initInProgress, SomeSharedMemory
initInProgress = None
SomeSharedMemory = result
initInProgress.addCallback(initialized)
d = Deferred()
initInProgress.chainDeferred(d)
return d
这样做有点麻烦,因为到处都是全局变量。这里有一个稍微干净一点的版本:
from twisted.internet.defer import Deferred, succeed
class SharedResource(object):
def __init__(self, initializer):
self._initializer = initializer
self._value = None
self._state = "UNINITIALIZED"
self._waiting = []
def get(self):
if self._state == "INITIALIZED":
# Return the already computed value
return succeed(self._value)
# Create a Deferred for the caller to wait on
d = Deferred()
self._waiting.append(d)
if self._state == "UNINITIALIZED":
# Once, run the setup
self._initializer().addCallback(self._initialized)
self._state = "INITIALIZING"
# Initialized or initializing state here
return d
def _initialized(self, value):
# Save the value, transition to the new state, and tell
# all the previous callers of get what the result is.
self._value = value
self._state = "INITIALIZED"
waiting, self._waiting = self._waiting, None
for d in waiting:
d.callback(value)
SomeSharedMemory = SharedResource(initializeSharedMemory)
def handleRequest(request):
return SomeSharedMemory.get().addCallback(PickSomething)
现在有三种状态,状态之间的转换非常明确,没有全局状态需要更新(至少如果你给SomeSharedMemory
一个非全局的作用域),而且handleRequest
对此一无所知,它只需要请求一个值,然后使用它。