Python中大数据文件的并发I/O与处理

5 投票
2 回答
3034 浏览
提问于 2025-04-17 22:57

我打算写一个Python程序,这个程序会从一个文件中读取数据块,处理这些数据块,然后把处理后的数据追加到一个新文件里。因为要处理的文件通常比可用的内存要大,所以需要分块读取。

def read_chunk(file_object, chunksize):
    # read the data from the file object and return the chunk
    return chunk

def process_chunk(chunk):
    #process the chunk and return the processed data
    return data

def write_chunk(data, outputfile):
    # write the data tothe output file.

def main(file):
    # This will do the work
    for i in range(0, numberofchunks, chunksize):
        chunk = read_chunk(file_obj, chunksize)
        data = process_chunk(chunk)
        write_chunk(data, out_file)

我想知道的是,我能否同时执行这三个方法,它们是怎么工作的呢?

也就是说,一个线程负责读取数据,一个线程负责处理数据,还有一个线程负责写入数据。当然,读取数据的线程需要始终比处理数据的线程快一步,而处理数据的线程又要比写入数据的线程快一步……

如果能同时执行这些操作,并且把它们分配到多个处理器上,那就太好了……

关于具体问题的更多细节:我会使用GDAL库从一个光栅文件中读取数据。这会把数据块/行读入一个numpy数组中。处理的过程其实就是对每个光栅单元的值和它的邻居进行一些逻辑比较(比如哪个邻居的值比测试单元的值低,哪个是最低的)。我会创建一个同样大小的新数组(边缘会被赋予任意值)来存放结果,然后把这个数组写入一个新的光栅文件。我预计除了GDAL之外,唯一会用到的库是numpy,这样的例程也很适合进行“cython化”。

有没有什么建议可以让我继续进行?

编辑:

我应该指出,我们之前已经实现过类似的功能,并且我们知道处理所花的时间相比于输入输出会很显著。还有一点是,我们将用来读取文件的库(GDAL)支持并发读取……

2 个回答

-2

我真心建议你现在不要太担心优化的问题(可以看看“过早优化”这个概念)。

除非你要进行很多操作(从你的描述来看似乎不是这样),否则很有可能输入输出(I/O)等待的时间会比处理的时间要长得多,甚至是“长得多”。这就意味着,I/O可能是你的瓶颈。

在这种情况下,用多个线程来处理是没什么用的。你提到的把I/O和处理分开,最多也只能让你节省 n*proc_time 的时间,其中 n 是你处理的次数,proc_time 是每次操作的处理时间(不包括I/O时间)。如果 proc_time 远远小于I/O时间,你也不会得到太大的好处。

我建议你先按顺序实现这个功能,看看I/O和CPU的使用情况,然后再考虑优化,只有在确实有必要的时候再去做。你也可以尝试一次性读取更多的数据块(也就是缓存)。

0

使用协程来处理数据流吗?这个模板可以帮助你入门,同时尽量减少内存的使用。你可以在这个基础上添加一个队列和虚拟的“假线程”管理器,这样就可以处理多个文件了。

#!/usr/bin/env python3

import time
from functools import wraps, partial, reduce

def coroutine(func_gen):
    @wraps(func_gen)
    def starter(*args, **kwargs):
        cr = func_gen(*args, **kwargs)
        _ = next(cr)
        return cr
    return starter


@coroutine
def read_chunk(file_object, chunksize, target):
    """
    read enless stream with a .read method
    """
    while True:
        buf = file_object.read(chunksize)
        if not buf:
            time.sleep(1.0)
            continue
        target.send(buf)

@coroutine
def process_chunk(target):
    def example_process(thing):
        k = range(100000000) # waste time and memory
        _ = [None for _ in k]
        value = str(type(thing))
        print("%s -> %s" % (thing, value))
        return thing

    while True:
        chunk = (yield)
        data  = example_process(chunk)
        target.send(data)

@coroutine
def write_chunk(file_object):
    while True:
        writable = (yield)
        file_object.write(writable)
        file_object.flush()


def main(src, dst):
    r = open(src, 'rb')
    w = open(dst, 'wb')

    g = reduce(lambda a, b: b(a),
               [w, write_chunk, process_chunk,
                partial(read_chunk, r, 16)]
              )
    while True:
        _ = next(g)

main("./stackoverflow.py", "retval.py")

撰写回答