Python生成器的'yield'在单独函数中使用
我正在开发一个工具库,这个库有点像任务管理器,目的是在谷歌云计算服务的分布式环境中运行。(它结合了任务队列和内存缓存来执行后台处理)。我打算使用生成器来控制任务的执行,基本上是通过在用户代码中使用yield
来实现一种非抢占式的“并发”。
举个简单的例子——处理一堆数据库实体,可能是这样的:
class EntityWorker(Worker):
def setup():
self.entity_query = Entity.all()
def run():
for e in self.entity_query:
do_something_with(e)
yield
我们知道,yield
是一个双向通信的通道,可以将值传递给使用生成器的代码。这使得我们可以模拟一个“抢占式的API”,比如下面的SLEEP
调用:
def run():
for e in self.entity_query:
do_something_with(e)
yield Worker.SLEEP, timedelta(seconds=1)
但这样看起来很糟糕。如果能把yield
隐藏在一个单独的函数里,并且能简单地调用就好了:
self.sleep(timedelta(seconds=1))
问题是,把yield
放在sleep
函数里会把这个函数变成一个生成器函数。因此,上面的调用只会返回另一个生成器。只有在添加.next()
并且再用yield
返回后,我们才能得到之前的结果:
yield self.sleep(timedelta(seconds=1)).next()
这显然比之前更复杂,而且显得冗长。
所以我想问:有没有办法把yield
放进一个函数里,而不把它变成生成器函数,同时又能让其他生成器使用它来返回计算出的值?
3 个回答
我建议你看看ndb。它使用生成器作为协程(就像你在这里提到的那样),让你可以编写与RPC异步工作的程序。
这个API通过用另一个函数包装生成器来实现这一点,这个函数会“启动”生成器(它会立即调用.next(),让代码开始执行)。这些任务小块(tasklets)也被设计成可以与App Engine的RPC基础设施一起工作,这样你就可以使用任何现有的异步API调用。
在ndb使用的并发模型中,你可以yield
一个未来对象(类似于pep-3148中描述的)或者一个App Engine的RPC对象。当这个RPC完成后,之前yield这个对象的函数的执行就可以继续了。
如果你使用的是从ndb.model.Model
派生的模型,那么下面的代码可以让你异步地遍历一个查询:
from ndb import tasklets
@tasklets.tasklet
def run():
it = iter(Entity.query())
# Other tasklets will be allowed to run if the next call has to wait for an rpc.
while (yield it.has_next_async()):
entity = it.next()
do_something_with(entity)
虽然ndb仍然被认为是实验性的(它的一些错误处理代码还需要改进),但我还是推荐你看看它。我在最近的两个项目中使用过它,发现它是一个很棒的库。
确保你仔细阅读主页面链接的文档,还有关于任务小块的相关文档。
你似乎忽略了一些明显的东西:
class EntityWorker(Worker):
def setup(self):
self.entity_query = Entity.all()
def run(self):
for e in self.entity_query:
do_something_with(e)
yield self.sleep(timedelta(seconds=1))
def sleep(self, wait):
return Worker.SLEEP, wait
是yield
这个关键词把函数变成生成器的,不能不使用它。
如果想要隐藏这个yield,你需要一个更高级的函数,在你的例子中就是map
:
from itertools import imap
def slowmap(f, sleep, *iters):
for row in imap(f, self.entity_query):
yield Worker.SLEEP, wait
def run():
return slowmap(do_something_with,
(Worker.SLEEP, timedelta(seconds=1)),
self.entity_query)
唉,这个方法不行。不过有个“折中方案”可能可以:
def sleepjob(*a, **k):
if a:
return Worker.SLEEP, a[0]
else:
return Worker.SLEEP, timedelta(**k)
所以
yield self.sleepjob(timedelta(seconds=1))
yield self.sleepjob(seconds=1)
对我来说看起来不错。