如何对Python Twisted透视代理的远程调用进行排队?
Twisted(一个用于Python的库)最强大的地方在于它的异步框架。我写了一个图像处理服务器,通过Perspective Broker接收请求。只要一次处理的图像数量不超过几百张,它的运行效果都很好。不过,有时候会突然收到几百张几乎同时到来的图像请求。因为它试图同时处理所有这些请求,结果服务器就崩溃了。
为了解决这个问题,我想在服务器上排队处理这些远程调用,这样它一次只处理大约100张图像。看起来Twisted可能已经有类似的功能,但我找不到相关的信息。有没有什么建议可以帮助我开始实现这个功能?谢谢!
2 个回答
你可能会对我写的 txRDQ(可调整大小的调度队列)感兴趣。可以在网上搜索一下,它在 LaunchPad 的 tx 系列里。抱歉我没时间详细回复,马上就要上台了。
特里
一个现成的选项是 twisted.internet.defer.DeferredSemaphore
,它可以帮助你。这是一个异步版本的普通信号量,如果你做过多线程编程,可能已经听说过这种信号量。
信号量(计数信号量)和互斥锁(mutex)很像。互斥锁只能被获取一次,直到释放。而信号量可以设置为允许在需要释放之前,任意数量的获取成功。
下面是一个使用 DeferredSemaphore
的例子,它可以同时运行十个异步操作,但最多只允许三个同时进行:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
还有明确的 acquire
和 release
方法,但 run
方法非常方便,几乎总是你想要的。它会调用 acquire
方法,这个方法返回一个 Deferred
对象。然后,它会在这个 Deferred
上添加一个回调,调用你传入的函数(以及任何位置参数或关键字参数)。如果这个函数返回一个 Deferred
,那么在第二个 Deferred
上会添加一个回调,调用 release
方法。
同步的情况也会处理,直接调用 release
。错误也会被处理,允许它们传播,但确保必要的 release
被执行,以保持 DeferredSemaphore
的一致状态。传给 run
的函数结果(或者它返回的 Deferred
的结果)将成为 run
返回的 Deferred
的结果。
另一种可能的方法是基于 DeferredQueue
和 cooperate
。DeferredQueue
和普通队列很像,但它的 get
方法返回一个 Deferred
。如果在调用时队列里没有项目,这个 Deferred
不会触发,直到有项目被添加。
下面是一个例子:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
注意,async
工作函数和第一个例子中的是一样的。不过这次,还有一个 worker
函数,它明确地从 DeferredQueue
中取出任务,并用 async
处理它们(通过将 async
添加为 get
返回的 Deferred
的回调)。worker
生成器由 cooperate
驱动,在每个触发的 Deferred
后迭代一次。然后,主循环启动三个这样的工作生成器,这样在任何时候都有三个任务在进行。
这种方法的代码比 DeferredSemaphore
方法多一些,但有一些可能有趣的好处。首先,cooperate
返回一个 CooperativeTask
实例,它有一些有用的方法,比如 pause
、resume
等等。此外,所有分配给同一个合作器的任务会在调度上相互“合作”,以避免过载事件循环(这也是这个API名字的由来)。在 DeferredQueue
方面,还可以设置待处理项目的数量限制,以避免完全过载服务器(例如,如果你的图像处理器卡住了,停止完成任务)。如果调用 put
的代码处理队列溢出异常,你可以利用这个压力尝试停止接受新任务(也许将它们转移到另一台服务器,或者提醒管理员)。用 DeferredSemaphore
做类似的事情会有点棘手,因为没有办法限制有多少任务在等待获取信号量。