使用Twisted阻塞Thrift调用

1 投票
2 回答
1627 浏览
提问于 2025-04-17 04:35

我有一个使用TTwisted协议的Twisted/Thrift服务器。我想保持与客户端的连接,直到发生某个特定事件(我通过Twisted的反应器接收到这个事件的通知,方式是通过一个回调函数)。

class ServiceHandler(object):
    interface.implements(Service.Iface)

    def listenForEvent(self):
        """
        A method defined in my Thrift interface that should block the Thrift
        response until my event of interest occurs.
        """
        # I need to somehow return immediately to free up the reactor thread,
        # but I don't want the Thrift call to be considered completed. The current
        # request needs to be marked as "waiting" somehow.

    def handleEvent(self):
        """
        A method called when the event of interest occurs. It is a callback method
        registered with the Twisted reactor.
        """
        # Find all the "waiting" requests and notify them that the event occurred.
        # This will cause all of the Thrift requests to complete.

我该如何快速从我的处理器对象的方法中返回,同时保持看起来像是一个阻塞的Thrift调用呢?


我在Twisted启动时初始化了Thrift处理器:

def on_startup():
    handler = ServiceHandler()                      
    processor = Service.Processor(handler)                                   
    server = TTwisted.ThriftServerFactory(processor=processor,                  
        iprot_factory=TBinaryProtocol.TBinaryProtocolFactory())                 
    reactor.listenTCP(9160, server)

我的PHP客户端通过以下方式连接:

  $socket = new TSocket('localhost', 9160);
  $transport = new TFramedTransport($socket);
  $protocol = new TBinaryProtocol($transport);
  $client = new ServiceClient($protocol);
  $transport->open();
  $client->listenForEvent();

最后这个调用($client->listenForEvent())成功地发送到了服务器,并运行了ServiceHandler.listenForEvent,但是即使那个服务器方法返回了一个twisted.internet.defer.Deferred()实例,客户端还是立刻收到了一个空数组,并且我得到了这个异常:

异常 'TTransportException',信息为 'TSocket: 从localhost:9160读取4个字节时超时,目标端口38395'

2 个回答

1

你看到的这个错误提示,似乎是说传输的数据没有被框架化(Twisted需要这样做,才能提前知道每条消息的长度)。另外,Thrift服务器支持从处理程序返回延迟对象,这样就更奇怪了。你有没有试过返回defer.succeed("某个值"),看看延迟对象是否真的能正常工作?然后你可以继续检查一下,确保它完全正常:

    d = defer.Deferred()
    reactor.callLater(0, d.callback, results)
    return d
2

你应该能从 listenForEvent 这个函数返回一个 Deferred。然后,稍后的 handleEvent 函数应该会触发这个返回的 Deferred(或者那些返回的多个 Deferred),这样才能真正产生响应。

撰写回答