高效读取Python中的15M行CSV文件的方法

2024-04-25 18:08:03 发布

您现在位置:Python中文网/ 问答频道 /正文

对于我的应用程序,我需要读取多个文件,每个文件有15米的行,将它们存储在一个数据帧中,并将数据帧保存为HDFS5格式。在

我已经尝试过不同的方法,特别是熊猫.read_csv有chunksize和dtype规范,以及数据帧. 它们都需要90秒来处理一个文件,所以我想知道是否有一种方法可以有效地处理这些文件。在下面,我将展示我所做的一些测试代码。在

import pandas as pd
import dask.dataframe as dd
import numpy as np
import re 

# First approach
store = pd.HDFStore('files_DFs.h5')

chunk_size = 1e6

df_chunk = pd.read_csv(file,
                sep="\t",
                chunksize=chunk_size,
                usecols=['a', 'b'],
                converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                            "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
                skiprows=15
           )              
chunk_list = [] 


for chunk in df_chunk:
      chunk_list.append(chunk)


df = pd.concat(chunk_list, ignore_index=True)

store[dfname] = df
store.close()

# Second approach

df = dd.read_csv(
        file,
        sep="\t",
        usecols=['a', 'b'],
        converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                    "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
        skiprows=15
     )
store.put(dfname, df.compute())
store.close()

文件如下所示(空白由文本选项卡组成):

^{pr2}$

Tags: 文件csv数据storelambdaimportredf
2条回答

首先,让我们回答问题的标题

1-如何有效读取包含浮动的15米csv行

我建议您使用modin

生成样本数据:

import modin.pandas as mpd
import pandas as pd
import numpy as np

frame_data = np.random.randint(0, 10_000_000, size=(15_000_000, 2)) 
pd.DataFrame(frame_data*0.0001).to_csv('15mil.csv', header=False)
^{pr2}$

现在看基准测试:

%%timeit -r 3 -n 1 -t
global df1
df1 = pd.read_csv('15mil.csv', header=None)
    9.7 s ± 95.1 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
%%timeit -r 3 -n 1 -t
global df2
df2 = mpd.read_csv('15mil.csv', header=None)
    3.07 s ± 685 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
(df2.values == df1.values).all()
    True

所以我们可以看到,在我的设置中,莫丁的速度大约是我的3倍。在


现在来回答你的具体问题

2-清除包含非数字字符的csv文件,然后读取该文件

正如人们所指出的,你的瓶颈可能是转换器。你给那些羔羊打了三千万次电话。即使函数调用开销在这个规模上也变得非常重要。在

让我们来解决这个问题。在

生成脏数据集:

!sed 's/.\{4\}/&)/g' 15mil.csv > 15mil_dirty.csv

方法

首先,我尝试在converters参数中使用modin。然后,我尝试了一种调用regexp次数更少的不同方法:

首先,我将创建一个类似文件的对象,通过regexp过滤所有内容:

class FilterFile():
    def __init__(self, file):
        self.file = file
    def read(self, n):
        return re.sub(r"[^\d.,\n]", "", self.file.read(n))
    def write(self, *a): return self.file.write(*a) # needed to trick pandas
    def __iter__(self, *a): return self.file.__iter__(*a) # needed

然后我们将其作为read_csv中的第一个参数传递给pandas:

with open('15mil_dirty.csv') as file:
    df2 = pd.read_csv(FilterFile(file))

基准:

%%timeit -r 1 -n 1 -t
global df1
df1 = pd.read_csv('15mil_dirty.csv', header=None,
        converters={0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))}
           )
    2min 28s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 -t
global df2
df2 = mpd.read_csv('15mil_dirty.csv', header=None,
        converters={0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))}
           )
    38.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 -t
global df3
df3 = pd.read_csv(FilterFile(open('15mil_dirty.csv')), header=None,)
    1min ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

看来莫丁又赢了! 不幸的是,modin还没有实现从缓冲区读取,所以我设计了最终的方法。在

最终方法:

%%timeit -r 1 -n 1 -t
with open('15mil_dirty.csv') as f, open('/dev/shm/tmp_file', 'w') as tmp:
    tmp.write(f.read().translate({ord(i):None for i in '()'}))
df4 = mpd.read_csv('/dev/shm/tmp_file', header=None)
    5.68 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

这使用了translate,它比re.sub快得多,还使用了{},这是ubuntu(和其他linuxes)通常提供的内存文件系统。在那里写的任何文件都不会进入磁盘,所以速度很快。 最后,它使用modin读取文件,解决modin的缓冲区限制。 这种方法比你的方法快30倍左右,而且非常简单。在

好吧,我的发现与大熊猫没有太大关系,而是一些常见的陷阱。在

Your code: 
(genel_deneme) ➜  derp time python a.py
python a.py  38.62s user 0.69s system 100% cpu 39.008 total
  1. 预编译正则表达式
^{pr2}$
  1. 试着找到一个比直接使用更好的方法np.浮动32,因为它比我想的慢6-10倍。下面不是你想要的,但我只想在这里说明问题。在
replace np.float32 with float and run your code. 
My Result:  
(genel_deneme) ➜  derp time python a.py
python a.py  14.79s user 0.60s system 102% cpu 15.066 total

找到另一种方法来实现浮点数的结果。 关于这个问题的更多信息https://stackoverflow.com/a/6053175/37491

  1. 如果可以,将文件和工作划分为子流程。你已经开始研究大小恒定的独立块了。所以基本上你可以使用多进程或线程来划分文件并在不同的进程中处理作业。在

相关问题 更多 >