Python中的异步流处理

3 投票
2 回答
2684 浏览
提问于 2025-04-15 15:14

我们先来看一个简单的例子。HTTP数据流的格式如下:

MESSAGE_LENGTH, 2 bytes
MESSAGE_BODY, 
REPEAT...

目前,我使用urllib2来获取和处理流数据,代码如下:

length = response.read(2)
while True:
    data = response.read(length)
    DO DATA PROCESSING

这个方法是可以的,但因为每条消息的大小在50到100字节之间,上面的方法在每次读取时都会限制缓冲区的大小,这可能会影响性能。

有没有可能使用不同的线程来分别处理数据的获取和处理呢?

2 个回答

1

是的,这个是可以做到的,而且并不难,只要你的格式基本上是固定的。

我在使用Python 2.2.3的httplib时发现,性能非常糟糕,因为我们是通过一些黑科技的方式把它拼凑起来的(基本上是把一个基于select()的套接字层加到httplib里)。

关键在于要自己处理套接字和缓冲,这样就不会和中间层争抢缓冲区(当我们有httplib的缓冲用于分块HTTP解码时,套接字层的缓冲用于读取,这样就导致性能很差)。

然后你需要一个状态机,当需要新数据时从套接字获取数据,并把完成的数据块放入一个Queue.Queue中,这样可以供你的处理线程使用。

我用这个来传输文件,在一个额外的线程中计算校验和(使用zlib.ADLER32),然后在第三个线程中把它们写入文件系统。这样在我的本地机器上,通过套接字和HTTP/分块的开销,能保持大约40 MB/s的持续传输速度。

0

当然可以,有很多不同的方法来实现这个目标。通常,你会有一组专门用来获取数据的进程,然后你会不断增加这些进程的数量,直到带宽用完为止。这些进程会把数据存储在某个地方,然后你会有其他的进程或线程去取出这些数据并进行处理。

所以你问的问题的答案是“可以”,接下来你可能会问“怎么做”,而那些真正擅长这方面的人会想知道更多的细节。:-)

如果你是在大规模上进行这个操作,那就会变得非常复杂,你不希望这些进程互相干扰。Python里有一些模块可以帮助你完成这些工作。具体该怎么做,主要取决于你要处理的规模,比如你是想在多个处理器上运行,还是在完全不同的机器上运行,以及你要处理的数据量有多大。

我只做过一次,而且规模不算太大。那次我有一个进程负责获取需要处理的长长的URL列表,另一个进程则把这个列表分发给一组独立的进程,方法是把包含URL的文件放在不同的目录里,这些目录就像是“队列”。那些独立的进程会在自己的队列目录中查找URL,获取后再把它放到另一个“输出队列”目录里,而我还有一个进程负责把这些文件分发到另一些处理进程的队列目录中。

这个方法运行得很好,如果需要的话可以通过NFS在网络上运行(虽然我们从来没有尝试过),而且如果需要的话也可以扩展到很多进程在很多机器上运行(不过我们也没有这样做过)。

可能还有更聪明的方法。

撰写回答