我在google bucket中有一个文件夹,其中包含CSV,我正试图读入dask.dataframe
,以便对文件进行并行规范化。例如:其中一些数据帧可能缺少其他数据帧所拥有的列,因此我想将缺少的列插入到缺少它的每个数据帧中
当使用globstring(例如ddfs = ddf.read_csv(f"gs://bucket/{folder}/*.csv")
)时,我预期会收到pandas.errors.ParserErrors
,因为不仅缺少一些文件的头,一些文件的头行可能不会从第一行开始。在使用带有dask.dataframe
的globstring之前,我可以遍历目录并分析每个文件。下面是我在这种情况下使用的逻辑:
import pandas as pd
file_analysis = dict()
for filepath in files:
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist() # noqa
skiprows = skiprows + 1 if skiprows else 1
# store dictionary of pandas params that correspond to each file for `.read_csv()` calls
file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
但是,这会增加执行时间,特别是当某些目录有数千个文件时。即使这样,我也不确定如何将字典值传递给dask
的dataframe.read_csv
是否有一种方法可以将函数传递给dask.dataframe.read_csv
,该函数允许为函数提供的globstring的google bucket文件夹中的每个CSV文件提供动态skiprows
和动态columns
AFAIK这不可能通过
dd.read_csv
实现,但是您可以通过使用.from_delayed
来构造dask.dataframe
,其中每个延迟都是一个包装器,围绕着一个函数来规范化csv文件并返回一个数据帧注意
from_delayed
需要一致的列名和数据类型,因此这应该在函数内部处理相关问题 更多 >
编程相关推荐