Python - 如何将这段代码变为异步的?
这里有一些代码,展示了我遇到的问题:
def blocking1():
while True:
yield 'first blocking function example'
def blocking2():
while True:
yield 'second blocking function example'
for i in blocking1():
print 'this will be shown'
for i in blocking2():
print 'this will not be shown'
我有两个函数,它们里面都有一个 while True
的循环。这些循环会产生一些数据,然后我会把这些数据记录到某个地方(很可能是一个sqlite数据库)。
我一直在尝试使用线程,并且已经让它工作了。不过,我并不是很喜欢这种方式……我想要做的是把我的阻塞函数变成异步的。就像这样:
def blocking1(callback):
while True:
callback('first blocking function example')
def blocking2(callback):
while True:
callback('second blocking function example')
def log(data):
print data
blocking1(log)
blocking2(log)
我该如何在Python中实现这个呢?我看到标准库里有asyncore,而这个领域的大名鼎鼎的库是Twisted,但这两个似乎都是用来处理网络连接的。
那么,我该如何让我的那些不涉及网络的阻塞函数变成异步的呢?
5 个回答
Twisted框架不仅仅是处理网络连接的工具。它还有很多异步适配器,可以用在各种场景中,包括和子进程的互动。我建议你仔细看看这个框架,它能实现你想要做的事情。
你可以用生成器来实现协作式多任务处理,但你需要自己写一个主循环来在这些生成器之间切换控制权。
下面是一个(非常简单的)例子,基于你上面的例子:
def blocking1():
while True:
yield 'first blocking function example'
def blocking2():
while True:
yield 'second blocking function example'
tasks = [blocking1(), blocking2()]
# Repeat until all tasks have stopped
while tasks:
# Iterate through all current tasks. Use
# tasks[:] to copy the list because we
# might mutate it.
for t in tasks[:]:
try:
print t.next()
except StopIteration:
# If the generator stops, remove it from the task list
tasks.remove(t)
你还可以进一步改进,让生成器能够产生新的生成器,这样就可以把它们添加到任务中。不过希望这个简化的例子能让你大致明白这个概念。
阻塞函数是指那些不会返回结果的函数,但会让你的程序处于闲置状态,无法继续完成其他工作。
你希望我们把你的阻塞函数变成非阻塞的。然而,除非你在编写操作系统,否则你并没有真正的阻塞函数。你可能有一些函数因为调用了阻塞的系统调用而导致阻塞,或者因为进行大量计算而“阻塞”。
要把前一种类型的函数变成非阻塞的,实际上是不可能的,除非你把底层的系统调用也变成非阻塞的。根据这个系统调用的具体情况,可能很难做到这一点,而不需要在你的程序中添加事件循环;你不仅需要让这个调用不阻塞,还需要进行另一个调用,以确定这个调用的结果会被送到一个你可以关联的地方。
这个问题的答案是一个非常长的Python程序,以及对不同操作系统接口及其工作原理的详细解释。不过幸运的是,我已经在另一个网站上写过这个答案;我把它叫做Twisted。如果你的特定任务已经被Twisted反应器支持,那你就幸运了。否则,只要你的任务能够映射到某个现有的操作系统概念上,你就可以扩展一个反应器来支持它。实际上,这里只有两种机制:在每个合理的操作系统上都有的文件描述符,以及Windows上的I/O完成端口。
在另一种情况下,如果你的函数消耗了大量的CPU,因此没有返回,它们实际上并不是阻塞的;你的程序仍然在运行并完成工作。处理这种情况有三种方法:
- 使用单独的线程
- 使用单独的进程
- 如果你有一个事件循环,可以编写一个任务,定期让出控制权,方法是让这个任务做一些工作,然后请求事件循环在不久的将来恢复它,以便让其他任务运行。
在Twisted中,最后一种技术可以通过多种方式实现,但这里有一个语法上方便的小技巧,可以让它变得简单:
from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
@inlineCallbacks
def slowButSteady():
result = SomeResult()
for something in somethingElse:
result.workHardForAMoment(something)
yield deferLater(reactor, 0, lambda : None)
returnValue(result)