一个背景Twisted服务器的模式,用于填充传入消息队列并清空传出消息队列?

5 投票
1 回答
3185 浏览
提问于 2025-04-16 06:57

我想做类似这样的事情:

twistedServer.start() # This would be a nonblocking call

while True:
   while twistedServer.haveMessage():
      message = twistedServer.getMessage()
      response = handleMessage(message)
      twistedServer.sendResponse(response)
   doSomeOtherLogic()

我想要做的关键是把服务器放在一个后台线程里运行。我希望用线程来实现这个,而不是用多进程或队列,因为我的应用已经有一层消息传递机制了,我想避免再加一层。我提这个是因为我已经知道怎么在单独的进程中做到这一点,但我想知道怎么在一个线程中做到,或者说我是否可以这样做。或者也许还有其他的方法可以实现同样的效果,比如自己写一个reactor.run的方法。谢谢大家的帮助!

1 个回答

10

我想做的关键事情是让服务器在后台线程中运行。

不过你没有解释为什么这很关键。一般来说,“使用线程”这样的说法只是实现细节。也许线程是合适的,也许不是,但实际的目标和这个问题无关。你的目标是什么?是同时处理多个客户端吗?还是要同时处理这种消息和来自其他来源的事件(比如,一个网页服务器)?如果不知道最终的目标,就没办法判断我建议的实现方法是否有效。

考虑到这一点,这里有两种可能性。

首先,你可以不考虑线程。这意味着你需要把事件处理的逻辑定义为仅仅事件处理的部分。获取事件的部分可以交给应用程序的其他部分来处理,可能是基于某种反应器API(例如,你可以设置一个TCP服务器,接受消息并将其转化为你正在处理的事件,这样你就可以通过某种方式调用reactor.listenTCP来开始)。

所以你的例子可能会变成这样(我会增加一些细节来提高教学价值):

from twisted.internet import reactor

class MessageReverser(object):
    """
    Accept messages, reverse them, and send them onwards.
    """
    def __init__(self, server):
        self.server = server

    def messageReceived(self, message):
        """
        Callback invoked whenever a message is received.  This implementation
        will reverse and re-send the message.
        """
        self.server.sendMessage(message[::-1])
        doSomeOtherLogic()

def main():
    twistedServer = ...
    twistedServer.start(MessageReverser(twistedServer))
    reactor.run()

main()

关于这个例子,有几个要点需要注意:

  • 我不太确定你的twistedServer是怎么定义的。我想象它以某种方式与网络交互。你版本的代码可能是接收消息并将其缓冲,直到你的循环处理它们。而这个版本可能没有缓冲,而是当消息到达时,立即调用传递给start的对象的messageReceived方法。如果你想的话,仍然可以在messageReceived方法中添加某种缓冲。

  • 现在有一个reactor.run的调用,这会阻塞。你也可以把这段代码写成twistd插件或.tac文件,这样你就不需要直接负责启动反应器。不过,必须有人启动反应器,否则大多数Twisted的API都不会起作用。reactor.run会阻塞,直到有人调用reactor.stop

  • 这个方法没有使用线程。Twisted的合作式多任务处理方式意味着你仍然可以同时做多件事,只要你注意合作(这通常意味着偶尔返回到反应器)。

  • 调用doSomeOtherLogic函数的确切时机稍有变化,因为没有“缓冲区现在为空”和“我刚处理了一个消息”这样的概念。你可以改变这个,使得这个函数每秒调用一次,或者在处理每N条消息后调用,或者根据需要进行调整。

第二种可能性是真正使用线程。这可能看起来和前面的例子非常相似,但你会在另一个线程中调用reactor.run,而不是主线程。例如:

from Queue import Queue
from threading import Thread

class MessageQueuer(object):
    def __init__(self, queue):
        self.queue = queue

    def messageReceived(self, message):
        self.queue.put(message)

def main():
    queue = Queue()
    twistedServer = ...
    twistedServer.start(MessageQueuer(queue))
    Thread(target=reactor.run, args=(False,)).start()

    while True:
        message = queue.get()
        response = handleMessage(message)
        reactor.callFromThread(twistedServer.sendResponse, response)

main()

这个版本假设twistedServer的工作方式类似,但使用线程让你可以有while True:循环。注意:

  • 如果你使用线程,必须调用reactor.run(False),以防止Twisted尝试安装任何信号处理程序,而Python只允许在主线程中安装。这意味着Ctrl-C的处理将被禁用,reactor.spawnProcess也不会可靠地工作。

  • MessageQueuerMessageReverser有相同的接口,只是它的messageReceived实现不同。它使用线程安全的Queue对象在反应器线程(它将在其中被调用)和你的主线程(while True:循环在运行)之间进行通信。

  • 你必须使用reactor.callFromThread将消息发送回反应器线程(假设twistedServer.sendResponse实际上是基于Twisted的API)。Twisted的API通常不是线程安全的,必须在反应器线程中调用。这就是reactor.callFromThread为你做的事情。

  • 你可能需要实现某种方式来停止循环和反应器。Python进程在你调用reactor.stop之前不会干净地退出。

需要注意的是,虽然线程版本给你提供了熟悉的、想要的while True循环,但它实际上并没有比非线程版本做得更好。它只是更复杂。因此,考虑一下你是否真的需要线程,或者它们只是可以用其他东西替代的实现技术。

撰写回答