如何对Python Twisted透视代理的远程调用进行排队?

11 投票
2 回答
6254 浏览
提问于 2025-04-15 22:52

Twisted(一个用于Python的库)最强大的地方在于它的异步框架。我写了一个图像处理服务器,通过Perspective Broker接收请求。只要一次处理的图像数量不超过几百张,它的运行效果都很好。不过,有时候会突然收到几百张几乎同时到来的图像请求。因为它试图同时处理所有这些请求,结果服务器就崩溃了。

为了解决这个问题,我想在服务器上排队处理这些远程调用,这样它一次只处理大约100张图像。看起来Twisted可能已经有类似的功能,但我找不到相关的信息。有没有什么建议可以帮助我开始实现这个功能?谢谢!

2 个回答

-2

你可能会对我写的 txRDQ(可调整大小的调度队列)感兴趣。可以在网上搜索一下,它在 LaunchPad 的 tx 系列里。抱歉我没时间详细回复,马上就要上台了。

特里

29

一个现成的选项是 twisted.internet.defer.DeferredSemaphore,它可以帮助你。这是一个异步版本的普通信号量,如果你做过多线程编程,可能已经听说过这种信号量。

信号量(计数信号量)和互斥锁(mutex)很像。互斥锁只能被获取一次,直到释放。而信号量可以设置为允许在需要释放之前,任意数量的获取成功。

下面是一个使用 DeferredSemaphore 的例子,它可以同时运行十个异步操作,但最多只允许三个同时进行:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore 还有明确的 acquirerelease 方法,但 run 方法非常方便,几乎总是你想要的。它会调用 acquire 方法,这个方法返回一个 Deferred 对象。然后,它会在这个 Deferred 上添加一个回调,调用你传入的函数(以及任何位置参数或关键字参数)。如果这个函数返回一个 Deferred,那么在第二个 Deferred 上会添加一个回调,调用 release 方法。

同步的情况也会处理,直接调用 release。错误也会被处理,允许它们传播,但确保必要的 release 被执行,以保持 DeferredSemaphore 的一致状态。传给 run 的函数结果(或者它返回的 Deferred 的结果)将成为 run 返回的 Deferred 的结果。

另一种可能的方法是基于 DeferredQueuecooperateDeferredQueue 和普通队列很像,但它的 get 方法返回一个 Deferred。如果在调用时队列里没有项目,这个 Deferred 不会触发,直到有项目被添加。

下面是一个例子:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

注意,async 工作函数和第一个例子中的是一样的。不过这次,还有一个 worker 函数,它明确地从 DeferredQueue 中取出任务,并用 async 处理它们(通过将 async 添加为 get 返回的 Deferred 的回调)。worker 生成器由 cooperate 驱动,在每个触发的 Deferred 后迭代一次。然后,主循环启动三个这样的工作生成器,这样在任何时候都有三个任务在进行。

这种方法的代码比 DeferredSemaphore 方法多一些,但有一些可能有趣的好处。首先,cooperate 返回一个 CooperativeTask 实例,它有一些有用的方法,比如 pauseresume 等等。此外,所有分配给同一个合作器的任务会在调度上相互“合作”,以避免过载事件循环(这也是这个API名字的由来)。在 DeferredQueue 方面,还可以设置待处理项目的数量限制,以避免完全过载服务器(例如,如果你的图像处理器卡住了,停止完成任务)。如果调用 put 的代码处理队列溢出异常,你可以利用这个压力尝试停止接受新任务(也许将它们转移到另一台服务器,或者提醒管理员)。用 DeferredSemaphore 做类似的事情会有点棘手,因为没有办法限制有多少任务在等待获取信号量。

撰写回答