Python - 如何将这段代码变为异步的?

9 投票
5 回答
12616 浏览
提问于 2025-04-16 11:37

这里有一些代码,展示了我遇到的问题:

def blocking1():
    while True:
        yield 'first blocking function example'

def blocking2():
    while True:
        yield 'second blocking function example'

for i in blocking1():
    print 'this will be shown'

for i in blocking2():
    print 'this will not be shown'

我有两个函数,它们里面都有一个 while True 的循环。这些循环会产生一些数据,然后我会把这些数据记录到某个地方(很可能是一个sqlite数据库)。

我一直在尝试使用线程,并且已经让它工作了。不过,我并不是很喜欢这种方式……我想要做的是把我的阻塞函数变成异步的。就像这样:

def blocking1(callback):
    while True:
        callback('first blocking function example')

def blocking2(callback):
    while True:
        callback('second blocking function example')

def log(data):
    print data

blocking1(log)
blocking2(log)

我该如何在Python中实现这个呢?我看到标准库里有asyncore,而这个领域的大名鼎鼎的库是Twisted,但这两个似乎都是用来处理网络连接的。

那么,我该如何让我的那些不涉及网络的阻塞函数变成异步的呢?

5 个回答

2

Twisted框架不仅仅是处理网络连接的工具。它还有很多异步适配器,可以用在各种场景中,包括和子进程的互动。我建议你仔细看看这个框架,它能实现你想要做的事情。

12

你可以用生成器来实现协作式多任务处理,但你需要自己写一个主循环来在这些生成器之间切换控制权。

下面是一个(非常简单的)例子,基于你上面的例子:

def blocking1():
    while True:
        yield 'first blocking function example'

def blocking2():
    while True:
        yield 'second blocking function example'


tasks = [blocking1(), blocking2()]

# Repeat until all tasks have stopped
while tasks:
    # Iterate through all current tasks. Use
    # tasks[:] to copy the list because we
    # might mutate it.
    for t in tasks[:]:
        try:
            print t.next()
        except StopIteration:
            # If the generator stops, remove it from the task list
            tasks.remove(t)

你还可以进一步改进,让生成器能够产生新的生成器,这样就可以把它们添加到任务中。不过希望这个简化的例子能让你大致明白这个概念。

33

阻塞函数是指那些不会返回结果的函数,但会让你的程序处于闲置状态,无法继续完成其他工作。

你希望我们把你的阻塞函数变成非阻塞的。然而,除非你在编写操作系统,否则你并没有真正的阻塞函数。你可能有一些函数因为调用了阻塞的系统调用而导致阻塞,或者因为进行大量计算而“阻塞”。

要把前一种类型的函数变成非阻塞的,实际上是不可能的,除非你把底层的系统调用也变成非阻塞的。根据这个系统调用的具体情况,可能很难做到这一点,而不需要在你的程序中添加事件循环;你不仅需要让这个调用不阻塞,还需要进行另一个调用,以确定这个调用的结果会被送到一个你可以关联的地方。

这个问题的答案是一个非常长的Python程序,以及对不同操作系统接口及其工作原理的详细解释。不过幸运的是,我已经在另一个网站上写过这个答案;我把它叫做Twisted。如果你的特定任务已经被Twisted反应器支持,那你就幸运了。否则,只要你的任务能够映射到某个现有的操作系统概念上,你就可以扩展一个反应器来支持它。实际上,这里只有两种机制:在每个合理的操作系统上都有的文件描述符,以及Windows上的I/O完成端口。

在另一种情况下,如果你的函数消耗了大量的CPU,因此没有返回,它们实际上并不是阻塞的;你的程序仍然在运行并完成工作。处理这种情况有三种方法:

  • 使用单独的线程
  • 使用单独的进程
  • 如果你有一个事件循环,可以编写一个任务,定期让出控制权,方法是让这个任务做一些工作,然后请求事件循环在不久的将来恢复它,以便让其他任务运行。

在Twisted中,最后一种技术可以通过多种方式实现,但这里有一个语法上方便的小技巧,可以让它变得简单:

from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue

@inlineCallbacks
def slowButSteady():
    result = SomeResult()
    for something in somethingElse:
        result.workHardForAMoment(something)
        yield deferLater(reactor, 0, lambda : None)
    returnValue(result)

撰写回答