我如何利用dask的dataframe.read_csv和google storage globstring,同时为每个文件使用不同的skiprows值?

2024-06-17 18:09:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我在google bucket中有一个文件夹,其中包含CSV,我正试图读入dask.dataframe,以便对文件进行并行规范化。例如:其中一些数据帧可能缺少其他数据帧所拥有的列,因此我想将缺少的列插入到缺少它的每个数据帧中

我的问题

当使用globstring(例如ddfs = ddf.read_csv(f"gs://bucket/{folder}/*.csv"))时,我预期会收到pandas.errors.ParserErrors,因为不仅缺少一些文件的头,一些文件的头行可能不会从第一行开始。在使用带有dask.dataframe的globstring之前,我可以遍历目录并分析每个文件。下面是我在这种情况下使用的逻辑:

import pandas as pd
file_analysis = dict()
for filepath in files:
    skiprows = None
    while True:
        try:
            df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
            break
        except pd.errors.ParserError as e:
            try:
                start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                skiprows = int(start_row_index) - 1
            except IndexError:
                print("Could not locate start_row_index in pandas ParserError message")
                continue
    headers = df.columns.values.tolist()  # noqa
    skiprows = skiprows + 1 if skiprows else 1
    # store dictionary of pandas params that correspond to each file for `.read_csv()` calls
    file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))

但是,这会增加执行时间,特别是当某些目录有数千个文件时。即使这样,我也不确定如何将字典值传递给daskdataframe.read_csv

我的问题

是否有一种方法可以将函数传递给dask.dataframe.read_csv,该函数允许为函数提供的globstring的google bucket文件夹中的每个CSV文件提供动态skiprows和动态columns


Tags: 文件csv数据indataframepandasreadbucket
1条回答
网友
1楼 · 发布于 2024-06-17 18:09:02

AFAIK这不可能通过dd.read_csv实现,但是您可以通过使用.from_delayed来构造dask.dataframe,其中每个延迟都是一个包装器,围绕着一个函数来规范化csv文件并返回一个数据帧

注意from_delayed需要一致的列名和数据类型,因此这应该在函数内部处理

相关问题 更多 >