Python生成器的'yield'在单独函数中使用

2 投票
3 回答
975 浏览
提问于 2025-04-17 01:04

我正在开发一个工具库,这个库有点像任务管理器,目的是在谷歌云计算服务的分布式环境中运行。(它结合了任务队列和内存缓存来执行后台处理)。我打算使用生成器来控制任务的执行,基本上是通过在用户代码中使用yield来实现一种非抢占式的“并发”。

举个简单的例子——处理一堆数据库实体,可能是这样的:

class EntityWorker(Worker):
    def setup():
        self.entity_query = Entity.all()
    def run():
        for e in self.entity_query:
            do_something_with(e)
            yield

我们知道,yield是一个双向通信的通道,可以将值传递给使用生成器的代码。这使得我们可以模拟一个“抢占式的API”,比如下面的SLEEP调用:

def run():
    for e in self.entity_query:
        do_something_with(e)
        yield Worker.SLEEP, timedelta(seconds=1)

但这样看起来很糟糕。如果能把yield隐藏在一个单独的函数里,并且能简单地调用就好了:

self.sleep(timedelta(seconds=1))

问题是,把yield放在sleep函数里会把这个函数变成一个生成器函数。因此,上面的调用只会返回另一个生成器。只有在添加.next()并且再用yield返回后,我们才能得到之前的结果:

yield self.sleep(timedelta(seconds=1)).next()

这显然比之前更复杂,而且显得冗长。

所以我想问:有没有办法把yield放进一个函数里,而不把它变成生成器函数,同时又能让其他生成器使用它来返回计算出的值?

3 个回答

1

我建议你看看ndb。它使用生成器作为协程(就像你在这里提到的那样),让你可以编写与RPC异步工作的程序。

这个API通过用另一个函数包装生成器来实现这一点,这个函数会“启动”生成器(它会立即调用.next(),让代码开始执行)。这些任务小块(tasklets)也被设计成可以与App Engine的RPC基础设施一起工作,这样你就可以使用任何现有的异步API调用。

在ndb使用的并发模型中,你可以yield一个未来对象(类似于pep-3148中描述的)或者一个App Engine的RPC对象。当这个RPC完成后,之前yield这个对象的函数的执行就可以继续了。

如果你使用的是从ndb.model.Model派生的模型,那么下面的代码可以让你异步地遍历一个查询:

from ndb import tasklets

@tasklets.tasklet
def run():
it = iter(Entity.query())
# Other tasklets will be allowed to run if the next call has to wait for an rpc.
while (yield it.has_next_async()):
  entity = it.next()
  do_something_with(entity)

虽然ndb仍然被认为是实验性的(它的一些错误处理代码还需要改进),但我还是推荐你看看它。我在最近的两个项目中使用过它,发现它是一个很棒的库。

确保你仔细阅读主页面链接的文档,还有关于任务小块的相关文档

3

你似乎忽略了一些明显的东西:

class EntityWorker(Worker):
    def setup(self):
        self.entity_query = Entity.all()

    def run(self):
        for e in self.entity_query:
            do_something_with(e)
            yield self.sleep(timedelta(seconds=1))

    def sleep(self, wait):
        return Worker.SLEEP, wait

yield这个关键词把函数变成生成器的,不能不使用它。

如果想要隐藏这个yield,你需要一个更高级的函数,在你的例子中就是map

from itertools import imap

def slowmap(f, sleep, *iters):
    for row in imap(f, self.entity_query):
        yield Worker.SLEEP, wait

def run():
    return slowmap(do_something_with, 
                   (Worker.SLEEP, timedelta(seconds=1)),
                   self.entity_query)
2

唉,这个方法不行。不过有个“折中方案”可能可以:

def sleepjob(*a, **k):
    if a:
        return Worker.SLEEP, a[0]
    else:
        return Worker.SLEEP, timedelta(**k)

所以

yield self.sleepjob(timedelta(seconds=1))
yield self.sleepjob(seconds=1)

对我来说看起来不错。

撰写回答