支持尾部调用优化的twisted协同路由
txcoroutine的Python项目详细描述
协同流程控制
说明
用@txcoroutine.coroutine包装的生成器与用 @twisted.internet.defer.inlineCallbacks,但是,它返回的对象是 txcoroutine.Coroutine是twisted.internet.defer.Deferred的子类。
Coroutine对象提供了与Deferred对象相同的api,但是,调用pause, unpause或cancel在Coroutine对象上透明地对所有嵌套的^{tt6}应用相同的操作$ 当前递归等待的对象。
简单示例
调用Deferred-返回函数的单个协程。当 郊游停止了。
from __future__ import print_function from twisted.internet import reactor from twisted.internet.defer import Deferred def get_message(): d = Deferred(canceller=lambda _: ( print("cancelled getting a message"), heavylifting.cancel(), )) print("getting a message...") heavylifting = reactor.callLater(1.0, d.callback, 'dummy-message') return d @coroutine def some_process(): try: while True: msg = yield get_message() print("processing message: %s" % (msg,)) finally: # could use `except GeneratorExit` but `finally` is more illustrative print("coroutine stopped, cleaning up") def main(): proc = some_process() reactor.callLater(3, proc.cancel) # stop the coroutine 3 seconds later. reactor.callWhenRunning(main) reactor.run()
输出:
getting a message... processing message: dummy-message getting a message... processing message: dummy-message ... cancelled getting a message coroutine stopped, cleaning up
具有多级协同和级联流量控制的高级示例
from __future__ import print_function from twisted.internet import reactor, task from twisted.internet.defer import Deferred @coroutine def level3_process(): basetime = reactor.seconds() seconds_passed = lambda: int(round(reactor.seconds() - basetime)) try: while True: print("iterating: %ss passed" % seconds_passed()) yield sleep(1.0) finally: # could use `except GeneratorExit` but `finally` is more illustrative print("level3_process stopped; cleaning up...") @coroutine def level2_process(): try: yield level3_process() finally: print("level2_process stopped; cleaning up...") @coroutine def root_process(): try: yield level2_process() finally: print("root_process stopped; cleaning up...") def main(): proc = root_process() reactor.callLater(3, proc.pause) # pause the coroutine 3 seconds later. reactor.callLater(6, proc.unpause) # then pause 3 seconds later reactor.callLater(9, proc.cancel) # then finally stop it 3 seconds later def sleep(seconds, reactor=reactor): """A simple helper for asynchronously sleeping a certain amount of time.""" return task.deferLater(reactor, seconds, lambda: None) reactor.callWhenRunning(main) reactor.run()
输出:
iterating: 0s passed iterating: 1s passed iterating: 2s passed <<NOTHING PRINTED FOR 4 SECONDS>> iterating: 6s passed iterating: 7s passed iterating: 8s passed level3_process stopped; cleaning up... level2_process stopped; cleaning up... root_process stopped; cleaning up...
尾部呼叫优化
示例:
def fact(n, result=1): if n <= 1: returnValue(result) else: noreturn(fact(n - 1, n * result)) yield # make sure it's a generator n = coroutine(fact)(10000).result
注意,fact本身不应该用coroutine修饰,否则递归调用只会创建 又一次联程。这仍然支持无限递归,但效率较低,消耗稍多。 每引入一个新级别的内存,因为在内部,所有延迟都是活动的,并且相互链接。
这主要是为了在长时间运行的进程中递归地和无限地交换出行为。为了 非协同/非生成器tco,也可以通过直接委托函数调用来实现更简单的方法 去蹦床。但是,这不在这个包的范围内。
操作说明
调用者保存的内存会在调出它自己换成另一个进程时立即释放,而Deferred 最初返回的仍然绑定到正在进行的处理。
@coroutine def process(): big_obj = SomeBigObject() noreturn(process_state1()) # big_obj is released immediately yield def process_state1(): another_big_obj = SomeBigObject() noreturn(process_state2()) # another_big_obj is released immediately yield def process_state2(): yield do_something() returnValue(123) def some_other_coroutine(): yield process() # will block until state2 has returned 123
在满足这两个需求的情况下,使用纯@inlineCallbacks无法实现这一点。
使用@inlineCallbacks:
的内存高效解决方案@inlineCallbacks def process(): big_obj = SomeBigObject() process_state1() # big_obj is released immediately but the `Deferred` returned by process is fired immediately yield
具有@inlineCallbacks保持Deferred一致性但不释放内存的解决方案:
@inlineCallbacks def process(): big_obj = SomeBigObject() yield process_state1() # big_obj is not released until process_state1 completes
其他
另见http://racecondev.wordpress.com/2012/08/17/a-coroutine-decorator-for-twisted/ 不过,这篇博文并没有提到尾号优化。