并行读取文件并参数化类参数

2024-06-16 11:32:49 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有一个类,想从磁盘并行读取几个文件,并参数化类参数。最正确的方法是什么(以及如何做)?在

  • 主线程应该等待load_data()操作结束,然后再进行其他操作。在

我考虑过线程,因为它只是I/O操作。在

非并行实现示例(1线程):

import pandas as pd


class DataManager(object):
    def __init__(self):
        self.a = None
        self.b = None
        self.c = None
        self.d = None
        self.e = None
        self.f = None

    def load_data(self):
        self.a = pd.read_csv('a.csv')
        self.b = pd.read_csv('b.csv')
        self.c = pd.read_csv('c.csv')
        self.d = pd.read_csv('d.csv')
        self.e = pd.read_csv('e.csv')
        self.f = pd.read_csv('f.csv')

if __name__ == '__main__':
    dm = DataManager()
    dm.load_data()
    # Main thread is waiting for load_data to finish.
    print("finished loading data")

Tags: 文件csvselfnonereaddata参数def
2条回答

在大多数情况下,I/O操作不受CPU的限制,因此使用多个进程是一种过度消耗。使用多个线程是很好的,但是pb.read_csv不仅读取文件,而且解析它,因为它可以是CPU限制的。我建议您在一开始使用asyncio从磁盘读取文件。下面是这样做的代码:

import asyncio
import aiofiles


async def read_file(file_name):
    async with aiofiles.open(file_name, mode='rb') as f:
        return await f.read()


def read_files_async(file_names: list) -> list:
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(
        asyncio.gather(*[read_file(file_name) for file_name in file_names]))


if __name__ == '__main__':
    contents = read_files_async([f'files/file_{i}.csv' for i in range(10)])
    print(contents)

函数read_files_async返回文件内容列表(字节缓冲区),您可以将其传递给pd.read_csv。在

我认为只对文件读取进行优化就足够了,但是你可以用多个进程并行解析文件内容(线程和异步不会提高解析过程的性能):

^{pr2}$

您应该根据您的机器规格设置NUMBER_OF_CORES

用Python3ThreadPoolExecutor可能的解决方案

    from concurrent.futures import ThreadPoolExecutor
    import queue
    import pandas as pd

    def load_data_worker(data_queue, file_name):
        data_queue.put(pd.read_csv(file_name))

    class DataManager(object):
        def __init__(self):
            self.data_queue = queue.Queue()
            self.data_arr = []

        def load_data(self):
            with ThreadPoolExecutor() as executor:
                executor.submit(load_data_woker, self.data_queue, 'a.csv')
                executor.submit(load_data_woker, self.data_queue, 'b.csv')
                # ... 
                executor.submit(load_data_woker, self.data_queue, 'f.csv')
           # dumping Queue of loaded data to array 
           self.data_arr = list(self.data_queue.queue)



    if __name__ == '__main__':
        dm = DataManager()
        dm.load_data()
        # Main thread is waiting for load_data to finish.
        print("finished loading data")

相关问题 更多 >