Python twisted:迭代器和生成器/内联回调

19 投票
3 回答
8387 浏览
提问于 2025-04-16 17:29

大家好,

我现在有点困惑,可能我提问的方式也不太对,但我还是试试:

我有一个使用了inlineCallbacks的twisted应用程序。现在我需要定义一个迭代器,这样调用者就能得到一个生成器。但是,这个迭代器不能用inlineCallbacks装饰,对吧?如果不能,那我该怎么写这样的代码呢?

为了更清楚一点:我的目标是每隔5秒调用一次process_loop,它每次只能处理一小块,比如10个,然后就得放手。不过,要知道这10个数据(存储在cached里,它是一个字典的字典),我需要调用一个返回deferred的函数。

@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield (call func here which returns deferred)
        if result is True:
            for k,v in cachedvalue.items():
                yield cachename, k, v

@inlineCallbacks
def process_chunk(myiter, num):
    try:
        for i in xrange(num):
            nextval = myiter.next()
            yield some_processing(nextval)
        returnValue(False)
    except StopIteration:
        returnValue(True)

@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    result = yield process_chunk(myiter, 10)
    if not result:
        print 'More left'
        reactor.callLater(5, process_loop, cached)
    else:
        print 'All done'

3 个回答

-1

试着把你的迭代器写成一个 DeferredGenerator

1

我觉得你想要做的是这个:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred() # some deferred you'd like evaluated
        if result is True:
            # here you want to return something, so you have to use returnValue
            # the generator you want to return can be written as a generator expression
            gen = ((cachename, k, v) for k,v in cachedvalue.items())
            returnValue(gen)

当一个生成表达式(genexp)无法表达你想要返回的内容时,你可以写一个闭包(closure):

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred()
        if result is True:
            # define the generator, saving the current values of the cache
            def gen(cachedvalue=cachedvalue, cachename=cachename):
                for k,v in cachedvalue.items():
                    yield cachename, k, v
            returnValue(gen()) # return it
13

你说得对,cacheiter 不能满足你的需求。使用 inlineCallbacks 装饰器后,你的函数就不能返回一个迭代器了。如果你给一个函数加上这个装饰器,它的结果总是返回一个 Deferred 对象。这就是它的作用。

这件事之所以复杂,是因为迭代器和异步代码不太兼容。如果你的迭代器生成元素时涉及到一个 Deferred,那么从迭代器中出来的元素最开始也是 Deferred。

你可以尝试这样做来解决这个问题:

@inlineCallbacks
def process_work():
    for element_deferred in some_jobs:
        element = yield element_deferred
        work_on(element)

这样做是可行的,但看起来有点奇怪。因为生成器只能把值返回给调用它的地方(而不能返回给调用者的调用者),所以 some_jobs 迭代器对此无能为力;只有在 process_work 内部的代码才能把一个 Deferred 传递给 inlineCallbacks 提供的“跳板”来等待。

如果你不介意这种模式,我们可以想象你的代码可以写成这样:

from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor

class cacheiter(object):
    def __init__(self, cached):
        self._cached = iter(cached.items())
        self._remaining = []

    def __iter__(self):
        return self


    @inlineCallbacks
    def next(self):
        # First re-fill the list of synchronously-producable values if it is empty
        if not self._remaining:
            for name, value in self._cached:
                # Wait on this Deferred to determine if this cache item should be included
                if (yield check_condition(name, value)):
                    # If so, put all of its values into the value cache so the next one
                    # can be returned immediately next time this method is called.
                    self._remaining.extend([(name, k, v) for (k, v) in value.items()])

        # Now actually give out a value, if there is one.
        if self._remaining:
            returnValue(self._remaining.pop())

        # Otherwise the entire cache has been visited and the iterator is complete.
        # Sadly we cannot signal completion with StopIteration, because the iterator
        # protocol isn't going to add an errback to this Deferred and check for
        # StopIteration.  So signal completion with a simple None value.
        returnValue(None)


@inlineCallbacks
def process_chunk(myiter, num):
    for i in xrange(num):
        nextval = yield myiter.next()
        if nextval is None:
            # The iterator signaled completion via the special None value.
            # Processing is complete.
            returnValue(True)
        # Otherwise process the value.
        yield some_processing(nextval)

    # Indicate there is more processing to be done.
    returnValue(False)


def sleep(sec):
    # Simple helper to delay asynchronously for some number of seconds.
    return deferLater(reactor, sec, lambda: None)


@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    while True:
        # Loop processing 10 items from myiter at a time, until process_chunk signals
        # there are no values left.
        result = yield process_chunk(myiter, 10)
        if result:
            print 'All done'
            break

        print 'More left'
        # Insert the 5 second delay before starting on the next chunk.
        yield sleep(5)

d = process_loop(cached)

不过,你也可以考虑使用 twisted.internet.task.cooperatecooperate 接受一个迭代器并逐步处理它,假设处理这个迭代器可能会很耗时,因此将任务分散到多个反应器的迭代中。根据上面的 cacheiter 定义:

from twisted.internet.task import cooperate

def process_loop(cached):
    finished = []

    def process_one(value):
        if value is None:
            finished.append(True)
        else:
            return some_processing(value)

    myiter = cacheiter(cached)

    while not finished:
        value_deferred = myiter.next()
        value_deferred.addCallback(process_one)
        yield value_deferred

task = cooperate(process_loop(cached))
d = task.whenDone()

撰写回答