在Twisted中顺序排队多个deferred

4 投票
2 回答
1255 浏览
提问于 2025-04-17 20:14

目前我还是一个初学者,在使用twisted时遇到了一些困扰。

我通过TCP发送一系列命令,并等待lineRecieved读取器的响应。这个响应可能需要几秒钟才能处理并到达,所以我把它包裹在一个defered里。第一个defered工作得很好,但第二个defered在第一个还在处理的时候就触发了,这导致了错误的结果,因为终端一次只能处理一个命令。这在异步系统中是预期的行为,但这并不是我想要的结果。如果我只有一两个命令,我可以使用deferedChain来处理,但因为我可能需要顺序执行几十个命令,我担心这会变得难以维护,像一团乱麻。

有什么好的方法可以做到这一点呢?

非常感谢

示例代码

def connectionMade(self):
    self.fire_def('command1')
    print'fire command 2'
    self.fire_def('command2')#Fires when command one is running

def fire_def(self,request):
    d = self.getInfo(request)
    d.addCallback(self.print_result)
    return d

def print_result(result):
    print result


def getInfo(self,request):
    print 'sending', request
    self.d  = defer.Deferred()
    self.sendLine(request)
    return self.d

def lineReceived(self, line):
    line = line.strip()
     self.buffer.append(line)
    if self.d is None:
        return
    if  'result_I_want' in self.buffer:
        print 'Firing Callback'
        self.d.callback(self.buffer)

2 个回答

2

简单来说,如果你想让一个任务在另一个任务完成后执行,就需要把它们的结果互相传递。

比如说,你希望d2在d1完成后才执行,那就要在d1的回调函数中返回d2。

换句话说,根据你的例子,你需要在command1的回调函数快结束的时候调用command2。

5

你提到的代码只会跟踪一个 Deferred 对象。如果应用程序的代码在没有足够时间让第一个请求完成并返回结果的情况下,连续调用 getInfo 两次,就会搞乱它内部的状态:

def getInfo(self,request):
    print 'sending', request
    self.d  = defer.Deferred()
    self.sendLine(request)
    return self.d

d_foo = getInfo(foo)
d_bar = getInfo(bar)

在这个过程中,d_food_bar 是两个不同的 Deferred 实例。但是,当第二次调用 getInfo 时,属性 self.d 的值从 d_foo 变成了 d_bar。这样一来,d_fooDeferred 就丢失了。之后,当 `lineReceived` 被调用时:

def lineReceived(self, line):
    line = line.strip()
    self.buffer.append(line)
    if self.d is None:
        return
    if  'result_I_want' in self.buffer:
        print 'Firing Callback'
        self.d.callback(self.buffer)

此时 self.dd_bar,尽管这行数据可能是对 foo 请求的响应。这意味着 d_bar 会收到 foo 的响应,而 d_foo 将根本收不到任何响应。

要解决这个问题,可以考虑在协议中保持一个 Deferred 实例的列表(或队列)。当发起新的信息请求时,就把它添加到列表中;当收到响应时,就从列表的前面取出一个。 (我不太清楚你在实现什么协议,所以不知道你怎么决定多少行数据算作一个响应。如果协议没有定义这一点,那就说明这个协议有问题,你可能需要换一个更好的协议。)

如果你解决了这个问题,那么响应至少会被正确地发送到不同的 Deferred 实例。

你还提到过一个关于强制顺序操作的问题。我可以从几个角度理解这个问题。一个理解是,你只想让网络上同时有一个请求处于“待处理”状态。换句话说,你不希望 getInfolineReceived 将响应数据传递给上一个 getInfo 调用返回的 Deferred 之前,发送新的请求。

在这种情况下,使用 Deferred 链接正好合适。尽管你有 N 个 Deferred,但在施加这个顺序限制时,实际上你只有一系列的两个 Deferred。一个是先运行的 Deferred,另一个是只有在第一个有结果后才运行的 Deferred。你可以通过将后面的 Deferred 视为新一对中的前一个 Deferred,并让第三个 Deferred 成为新的后一个 Deferred,来扩展到 N。

换句话说,如果你有 D1、D2、D3 和 D4,你可以这样链接它们:

D2 is chained to D1 and only runs when D1 is complete
D3 is chained to D2 and only runs when D2 is complete
D4 is chained to D3 and only runs when D3 is complete

不过,虽然这样可以工作,但其实并不是实现序列化的最简单方法。相反,我建议在 getInfo 中明确地排队工作,并在 lineReceived 中明确地取消排队:

def _sendRequest(self, d, request):
    print 'sending', request
    self.d = d
    self.sendLine(request)

def getInfo(self,request):
    if self.d is None:
        d = defer.Deferred()
        self._sendRequest(d, request)
        return d
    else:
        queued_d = defer.Deferred()
        self._requests.append((request, queued_d))
        return queued_d


def lineReceived(self, line):
    line = line.strip()
    self.buffer.append(line)
    if self.d is None:
        return
    if  'result_I_want' in self.buffer:
        print 'Firing Callback'
        now_d = self.d
        self.d = None
        buffer = self.buffer
        self.buffer = []
        if self._requests:
            request, queued_d = self._requests.pop(0)
            self._sendRequest(queued_d, request)
        now_d.callback(buffer)

注意在 lineReceived 中,代码在 now_d.callback(buffer) 这一行之前,确保一切处于一致的状态。这是一个微妙但重要的点。可能会有回调在 now_d 上,这会影响协议,比如再次调用 getInfo。在让这段代码运行之前,确保协议处于一致的状态是很重要的,否则它可能会混淆,比如发送请求的顺序错乱,或者在应该发送的时候却排队请求。这是一个让代码安全应对 重入 的例子。这个概念并不是 Twisted 特有的,但由于人们通常把重入的概念与多线程程序联系在一起,所以在编写基于 Twisted 的代码时,往往会忽视这个可能性。

撰写回答