Python twisted:迭代器和生成器/内联回调
大家好,
我现在有点困惑,可能我提问的方式也不太对,但我还是试试:
我有一个使用了inlineCallbacks的twisted应用程序。现在我需要定义一个迭代器,这样调用者就能得到一个生成器。但是,这个迭代器不能用inlineCallbacks装饰,对吧?如果不能,那我该怎么写这样的代码呢?
为了更清楚一点:我的目标是每隔5秒调用一次process_loop,它每次只能处理一小块,比如10个,然后就得放手。不过,要知道这10个数据(存储在cached里,它是一个字典的字典),我需要调用一个返回deferred的函数。
@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
for cachename,cachevalue in cached.items():
result = yield (call func here which returns deferred)
if result is True:
for k,v in cachedvalue.items():
yield cachename, k, v
@inlineCallbacks
def process_chunk(myiter, num):
try:
for i in xrange(num):
nextval = myiter.next()
yield some_processing(nextval)
returnValue(False)
except StopIteration:
returnValue(True)
@inlineCallbacks
def process_loop(cached):
myiter = cacheiter(cached)
result = yield process_chunk(myiter, 10)
if not result:
print 'More left'
reactor.callLater(5, process_loop, cached)
else:
print 'All done'
3 个回答
试着把你的迭代器写成一个 DeferredGenerator
。
我觉得你想要做的是这个:
@inlineCallbacks
def cacheiter(cached):
for cachename,cachevalue in cached.items():
result = yield some_deferred() # some deferred you'd like evaluated
if result is True:
# here you want to return something, so you have to use returnValue
# the generator you want to return can be written as a generator expression
gen = ((cachename, k, v) for k,v in cachedvalue.items())
returnValue(gen)
当一个生成表达式(genexp)无法表达你想要返回的内容时,你可以写一个闭包(closure):
@inlineCallbacks
def cacheiter(cached):
for cachename,cachevalue in cached.items():
result = yield some_deferred()
if result is True:
# define the generator, saving the current values of the cache
def gen(cachedvalue=cachedvalue, cachename=cachename):
for k,v in cachedvalue.items():
yield cachename, k, v
returnValue(gen()) # return it
你说得对,cacheiter
不能满足你的需求。使用 inlineCallbacks
装饰器后,你的函数就不能返回一个迭代器了。如果你给一个函数加上这个装饰器,它的结果总是返回一个 Deferred
对象。这就是它的作用。
这件事之所以复杂,是因为迭代器和异步代码不太兼容。如果你的迭代器生成元素时涉及到一个 Deferred,那么从迭代器中出来的元素最开始也是 Deferred。
你可以尝试这样做来解决这个问题:
@inlineCallbacks
def process_work():
for element_deferred in some_jobs:
element = yield element_deferred
work_on(element)
这样做是可行的,但看起来有点奇怪。因为生成器只能把值返回给调用它的地方(而不能返回给调用者的调用者),所以 some_jobs
迭代器对此无能为力;只有在 process_work
内部的代码才能把一个 Deferred 传递给 inlineCallbacks
提供的“跳板”来等待。
如果你不介意这种模式,我们可以想象你的代码可以写成这样:
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor
class cacheiter(object):
def __init__(self, cached):
self._cached = iter(cached.items())
self._remaining = []
def __iter__(self):
return self
@inlineCallbacks
def next(self):
# First re-fill the list of synchronously-producable values if it is empty
if not self._remaining:
for name, value in self._cached:
# Wait on this Deferred to determine if this cache item should be included
if (yield check_condition(name, value)):
# If so, put all of its values into the value cache so the next one
# can be returned immediately next time this method is called.
self._remaining.extend([(name, k, v) for (k, v) in value.items()])
# Now actually give out a value, if there is one.
if self._remaining:
returnValue(self._remaining.pop())
# Otherwise the entire cache has been visited and the iterator is complete.
# Sadly we cannot signal completion with StopIteration, because the iterator
# protocol isn't going to add an errback to this Deferred and check for
# StopIteration. So signal completion with a simple None value.
returnValue(None)
@inlineCallbacks
def process_chunk(myiter, num):
for i in xrange(num):
nextval = yield myiter.next()
if nextval is None:
# The iterator signaled completion via the special None value.
# Processing is complete.
returnValue(True)
# Otherwise process the value.
yield some_processing(nextval)
# Indicate there is more processing to be done.
returnValue(False)
def sleep(sec):
# Simple helper to delay asynchronously for some number of seconds.
return deferLater(reactor, sec, lambda: None)
@inlineCallbacks
def process_loop(cached):
myiter = cacheiter(cached)
while True:
# Loop processing 10 items from myiter at a time, until process_chunk signals
# there are no values left.
result = yield process_chunk(myiter, 10)
if result:
print 'All done'
break
print 'More left'
# Insert the 5 second delay before starting on the next chunk.
yield sleep(5)
d = process_loop(cached)
不过,你也可以考虑使用 twisted.internet.task.cooperate
。cooperate
接受一个迭代器并逐步处理它,假设处理这个迭代器可能会很耗时,因此将任务分散到多个反应器的迭代中。根据上面的 cacheiter
定义:
from twisted.internet.task import cooperate
def process_loop(cached):
finished = []
def process_one(value):
if value is None:
finished.append(True)
else:
return some_processing(value)
myiter = cacheiter(cached)
while not finished:
value_deferred = myiter.next()
value_deferred.addCallback(process_one)
yield value_deferred
task = cooperate(process_loop(cached))
d = task.whenDone()