python迭代器从迭代器列表进入内部迭代器的数据块队列

2024-04-19 01:34:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我有包含csv文件的目录->'data' 我使用chunksize参数将每个csv作为迭代器加载到dataframe中->;内心世界 它会产生迭代器列表->;陆上通信线 我想把每个内存块中的所有块都加载到队列中。 如何做到干净利落

目前我有:

import os

import pandas as pd


def sample_gen(df):
    yield next(df)


def get_next(df, qq):
    try:
        while True:
            z = next(df)
            print(z.shape)
    except StopIteration:
        pass
    finally:
        qq.append(z)
        return qq


ll = iter([pd.read_csv(os.path.join(f'data/{x}'), chunksize=10**6) for x in os.listdir('data')])
qq = []


def load_queue(ll, qq):
    try:
        inner_it = next(ll)
        qq = get_next(inner_it, qq)
    except StopIteration:
        load_queue(ll, qq)
    finally:
        return qq, ll

我不知道如何计算加载队列

编辑: 我决定平展迭代器列表,改用生成器。以下是我的最终解决方案:

import os
import threading
import concurrent.futures
import queue
import time
import pandas as pd


def producer(queue, event):
    ll = (pd.read_csv(os.path.join(f'data/{x}'), chunksize=10 ** 6) for x in os.listdir('data'))
    ll = (chunk for each_iterator in ll for chunk in each_iterator)

    while True:
        try:
            message = next(ll)
            queue.put(message, "P")
        except Exception as ex:
            print(ex)
            event.set()
            break
    print('producer got exit event')


def consumer(queue, event):
    while not event.is_set():
        message = queue.get()
        print(message.shape, 'C')
    print('consumer got exit event')


if __name__ == '__main__':
    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

Tags: csvimporteventdffordataqueueos