在Twisted中等待其他请求的事件

0 投票
1 回答
722 浏览
提问于 2025-04-16 12:54

我有一个简单的Twisted服务器,它处理请求的方式是这样的(当然是异步的)

global SomeSharedMemory
if SomeSharedMemory is None:
    SomeSharedMemory = LoadSharedMemory()
return PickSomething(SomeSharedMemory)

其中SomeSharedMemory是从数据库加载的。

我想避免多次从数据库加载SomeSharedMemory。具体来说,当服务器第一次启动时,如果有两个请求同时到达,可能会出现以下情况:

请求1:检查SomeSharedMemory,没找到 请求1:发起数据库查询来加载SSM 请求2:检查SSM,没找到 请求2:发起数据库查询来加载SSM 请求1:查询返回,存储SSM 请求1:返回结果 请求2:查询返回,存储SSM 请求2:返回结果

如果有更多的请求同时到来,数据库就会被频繁访问。我想做一些类似这样的事情(见http://docs.python.org/library/threading.html#event-objects):

global SomeSharedMemory, SSMEvent
if SomeSharedMemory is None:
    if not SSMEvent.isSet():
        SSMEvent.wait()
    else:
        # assumes that the event is initialized "set"
        SSMEvent.clear()
        SomeSharedMemory = LoadSharedMemory()
        SSMEvent.set()
return PickSomething(SomeSharedMemory)

这样的话,如果一个请求正在加载共享内存,其他请求就会礼貌地等待,直到查询完成,而不是自己再发起重复的数据库查询。

在Twisted中,这可能吗?

1 个回答

2

根据你提供的例子,很难理解你所描述的问题是怎么产生的。如果在第一个请求调用LoadSharedMemory并返回之前,第二个请求就到达了你的Twisted服务器,那么第二个请求会在处理之前先等着。当它最终被处理时,SomeSharedMemory会被初始化,这样就不会出现重复的情况。

不过,我想也许LoadSharedMemory是异步的,并且返回一个Deferred对象,这样你的代码看起来更像是这样:

def handleRequest(request):
    if SomeSharedMemory is None:
        d = initSharedMemory()
        d.addCallback(lambda ignored: handleRequest(request))
    else:
        d = PickSomething(SomeSharedMemory)
    return d

在这种情况下,确实有可能在initSharedMemory正在执行的时候,第二个请求到达。这样的话,你就会有两个任务试图初始化同一个状态。

当然,解决这个问题的方法是注意到你有第三种状态。除了初始化和初始化之外,还有一种状态是初始化。所以也要表示这个状态。我会把它隐藏在initSharedMemory函数里面,这样请求处理器就会保持简单:

initInProgress = None

def initSharedMemory():
    global initInProgress
    if initInProgress is None:
        initInProgress = _reallyInit()
        def initialized(result):
            global initInProgress, SomeSharedMemory
            initInProgress = None
            SomeSharedMemory = result
        initInProgress.addCallback(initialized)
    d = Deferred()
    initInProgress.chainDeferred(d)
    return d

这样做有点麻烦,因为到处都是全局变量。这里有一个稍微干净一点的版本:

from twisted.internet.defer import Deferred, succeed

class SharedResource(object):
    def __init__(self, initializer):
        self._initializer = initializer
        self._value = None
        self._state = "UNINITIALIZED"
        self._waiting = []


    def get(self):
        if self._state == "INITIALIZED":
            # Return the already computed value
            return succeed(self._value)

        # Create a Deferred for the caller to wait on
        d = Deferred()
        self._waiting.append(d)

        if self._state == "UNINITIALIZED":
            # Once, run the setup
            self._initializer().addCallback(self._initialized)
            self._state = "INITIALIZING"

        # Initialized or initializing state here
        return d


     def _initialized(self, value):
         # Save the value, transition to the new state, and tell
         # all the previous callers of get what the result is.
         self._value = value
         self._state = "INITIALIZED"
         waiting, self._waiting = self._waiting, None
         for d in waiting:
             d.callback(value)


SomeSharedMemory = SharedResource(initializeSharedMemory)

def handleRequest(request):
    return SomeSharedMemory.get().addCallback(PickSomething)

现在有三种状态,状态之间的转换非常明确,没有全局状态需要更新(至少如果你给SomeSharedMemory一个非全局的作用域),而且handleRequest对此一无所知,它只需要请求一个值,然后使用它。

撰写回答