Python Twisted推送生成器的非规律传输问题

2 投票
2 回答
788 浏览
提问于 2025-04-16 04:19

我想用Twisted从一个队列中传输数据。目前我使用的是一个推送生产者,它会定期检查队列中的项目,然后把数据写入传输通道。

class Producer:

    implements(interfaces.IPushProducer)

    def __init__(self, protocol, queue):
        self.queue = queue
        self.protocol = protocol

    def resumeProducing(self):
        self.paused = False
        while not self.paused:
            try:
                data = self.queue.get_nowait()
                logger.debug("Transmitting: '%s'", repr(data))
                data = cPickle.dumps(data)
                self.protocol.transport.write(data + "\r\n")
            except Empty:
                pass

    def pauseProducing(self):
        logger.debug("Transmitter paused.")
        self.paused = True

    def stopProducing(self):
        pass

问题是,数据发送得非常不规律。如果队列里只有一个项目,数据根本不会被发送。看起来Twisted会等到要传输的数据量达到一定的大小才会开始发送。我这样实现我的生产者是对的吗?我能不能强制Twisted立刻发送数据现在

我也试过使用拉取生产者,但Twisted根本没有调用它的resumeProducing()方法。使用拉取生产者时,我需要从外部调用resumeProducer()方法吗?

2 个回答

-1

这里有两种可能的解决方案:

1) 定期检查你的本地应用程序,看看是否有额外的数据需要发送。

注意:这个方法依赖于一个定期的异步回调,来自于twisted中的deferLater方法。如果你需要一个能快速响应的应用程序,能够根据需求发送数据,或者有一个长时间运行的阻塞操作(比如使用自己事件循环的用户界面),这个方法可能不太合适。

代码:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time

# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
  while True:
    protocol.send(b"Hello World\n")
    yield deferLater(reactor, 2, lambda: None)

# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):

   def connectionMade(self):
     self.transport.registerProducer(self, True)
     gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
     self.task = cooperate(gen)

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def pauseProducing(self):
     print 'Workload paused'
     self.task.pause()

   def resumeProducing(self):
     print 'Workload resumed'
     self.task.resume()

   def stopProducing(self):
     print 'Workload stopped'
     self.task.stop()

   def connectionLost(self, reason):
     print 'Connection lost'
     try:
       self.task.stop()
     except:
       pass

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

2) 手动跟踪协议实例,并从不同的线程中使用reactor.callFromThread()。这样可以在另一个线程中进行长时间的阻塞操作(比如用户界面事件循环)。

代码:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading

# Connection
protocol = None

# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
  def run(self):
    while True:
      print("Thread loop")
      time.sleep(random.randint(0, 4))
      if protocol is not None:
        reactor.callFromThread(self.dispatch)
  def dispatch(self):
    global protocol
    protocol.send("Hello World\n")

# Push protocol
class PushProtocol(Protocol):

   def connectionMade(self):
     global protocol
     protocol = self

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def connectionLost(self, reason):
     print 'Connection lost'

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Start thread
other = SomeThread()
other.start()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

就我个人而言,我觉得IPushProducer和IPullProducer需要定期回调这一点,让它们的用处减少了。不过也有人有不同的看法…… 耸肩。你可以自己选择。

2

要说清楚你的生产者为什么不好用,得看看完整的例子,包括怎么把它和消费者连接起来的代码,以及往队列里放东西的代码。

不过,你可能会遇到一个问题,就是当你调用 resumeProducing 时,如果队列是空的,那么就不会有任何数据写入消费者。而当有东西放进队列时,它们会一直待在那儿,因为消费者不会再调用你的 resumeProducing 方法了。

这个问题也适用于其他情况,比如队列里的数据不够多,导致消费者不会调用 pauseProducing 来暂停你的生产者。作为一个推送型的生产者,你需要自己持续产生数据,直到消费者调用 pauseProducing(或 stopProducing)。

针对这种情况,建议你在往队列里放东西之前,先停下来检查一下生产者是否没有被暂停。如果没有被暂停,就直接把数据写给消费者而不是放进队列。只有在生产者被暂停时,才往队列里放东西。

撰写回答