Python twisted:迭代器和yiels/inlineCallbacks

2024-04-26 05:15:31 发布

您现在位置:Python中文网/ 问答频道 /正文

各位, 我完全糊涂了,所以有可能我问的问题都不对,但这里有:

我有一个使用内联回调的扭曲应用程序。现在我需要定义一个迭代器,它意味着一个生成器被返回给调用者。但是,迭代器不能被内联回调修饰,对吗?如果不是,那我该如何编写这样的代码。

只需澄清一下:目标是进程循环需要每5秒调用一次,它只能处理其中的一块,比如10秒,然后它必须放手。但是,要知道10块(存储在cached中,这是dict的dict),它需要调用一个返回deferred的函数。

@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield (call func here which returns deferred)
        if result is True:
            for k,v in cachedvalue.items():
                yield cachename, k, v

@inlineCallbacks
def process_chunk(myiter, num):
    try:
        for i in xrange(num):
            nextval = myiter.next()
            yield some_processing(nextval)
        returnValue(False)
    except StopIteration:
        returnValue(True)

@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    result = yield process_chunk(myiter, 10)
    if not result:
        print 'More left'
        reactor.callLater(5, process_loop, cached)
    else:
        print 'All done'

Tags: inforheredefitemsresultprocessdict
3条回答

我想你是想这么做:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred() # some deferred you'd like evaluated
        if result is True:
            # here you want to return something, so you have to use returnValue
            # the generator you want to return can be written as a generator expression
            gen = ((cachename, k, v) for k,v in cachedvalue.items())
            returnValue(gen)

当genexp无法表达您试图返回的内容时,可以编写一个闭包:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred()
        if result is True:
            # define the generator, saving the current values of the cache
            def gen(cachedvalue=cachedvalue, cachename=cachename):
                for k,v in cachedvalue.items():
                    yield cachename, k, v
            returnValue(gen()) # return it

尝试将迭代器编写为^{}

你说得对,你不能用cacheiter来表达你想表达的东西。装饰器不允许有返回迭代器的函数。如果用它修饰函数,则结果是始终返回Deferred的函数。这就是它的用途。

造成这种困难的部分原因是迭代器不能很好地处理异步代码。如果生成迭代器的元素涉及延迟,那么迭代器中的元素将首先延迟。

你可以这样解释:

@inlineCallbacks
def process_work():
    for element_deferred in some_jobs:
        element = yield element_deferred
        work_on(element)

这可以工作,但看起来特别奇怪。由于生成器只能向其调用者屈服(例如,不能向其调用者的调用者屈服),因此some_jobs迭代器对此无能为力;只有在process_work内的词汇代码才能产生延迟到inlineCallbacks提供的蹦床等待的延迟。

如果您不介意这种模式,那么我们可以想象您的代码正在被编写如下:

from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor

class cacheiter(object):
    def __init__(self, cached):
        self._cached = iter(cached.items())
        self._remaining = []

    def __iter__(self):
        return self


    @inlineCallbacks
    def next(self):
        # First re-fill the list of synchronously-producable values if it is empty
        if not self._remaining:
            for name, value in self._cached:
                # Wait on this Deferred to determine if this cache item should be included
                if (yield check_condition(name, value)):
                    # If so, put all of its values into the value cache so the next one
                    # can be returned immediately next time this method is called.
                    self._remaining.extend([(name, k, v) for (k, v) in value.items()])

        # Now actually give out a value, if there is one.
        if self._remaining:
            returnValue(self._remaining.pop())

        # Otherwise the entire cache has been visited and the iterator is complete.
        # Sadly we cannot signal completion with StopIteration, because the iterator
        # protocol isn't going to add an errback to this Deferred and check for
        # StopIteration.  So signal completion with a simple None value.
        returnValue(None)


@inlineCallbacks
def process_chunk(myiter, num):
    for i in xrange(num):
        nextval = yield myiter.next()
        if nextval is None:
            # The iterator signaled completion via the special None value.
            # Processing is complete.
            returnValue(True)
        # Otherwise process the value.
        yield some_processing(nextval)

    # Indicate there is more processing to be done.
    returnValue(False)


def sleep(sec):
    # Simple helper to delay asynchronously for some number of seconds.
    return deferLater(reactor, sec, lambda: None)


@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    while True:
        # Loop processing 10 items from myiter at a time, until process_chunk signals
        # there are no values left.
        result = yield process_chunk(myiter, 10)
        if result:
            print 'All done'
            break

        print 'More left'
        # Insert the 5 second delay before starting on the next chunk.
        yield sleep(5)

d = process_loop(cached)

不过,您可以采取的另一种方法是使用twisted.internet.task.cooperatecooperate接受一个迭代器并使用它,假设使用它潜在的成本很高,并在多个reactor迭代中分割作业。从上面得到cacheiter的定义:

from twisted.internet.task import cooperate

def process_loop(cached):
    finished = []

    def process_one(value):
        if value is None:
            finished.append(True)
        else:
            return some_processing(value)

    myiter = cacheiter(cached)

    while not finished:
        value_deferred = myiter.next()
        value_deferred.addCallback(process_one)
        yield value_deferred

task = cooperate(process_loop(cached))
d = task.whenDone()

相关问题 更多 >

    热门问题