如何在列上使用dask groupby分隔文件

2024-04-23 06:20:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一大组csv文件(file_1.csvfile_2.csv),用时间段分隔,无法放入内存中。每个文件的格式如下所述。在


| instrument | time | code     | val           |
|------------|------|----------|---------------|
| 10         | t1   | c1_at_t1 | v_of_c1_at_t1 |
| 10         | t1   | c2_at_t1 | v_of_c2_at_t1 |
| 10         | t2   | c1_at_t2 | v_of_c1_at_t2 |
| 10         | t2   | c3_at_t2 | v_of_c3_at_t2 |
| 11         | t1   | c4_at_t1 | v_of_c4_at_t1 |
| 11         | t1   | c5_at_t1 | v_of_c5_at_t1 |
| 12         | t2   | c6_at_t2 | v_of_c6_at_t2 |
| 13         | t3   | c9_at_t3 | v_of_c9_at_t3 |

每个文件都是关于格式一致的仪器日志。有一组仪器可以在给定的时间戳(time)发出不同的代码(code)。给定仪器的给定code的值保存在val列中

我想使用instrument列(例如:file_1.csv)拆分每个文件,然后将为仪器提取的文件(例如:10)连接到所有文件(file_1.csvfile_2.csv

我正在考虑对instrument列使用daskgroupby操作。有没有其他更好的方法来代替使用groupby或者更好的方法来提取文件?在

上面的操作就是我写的代码

^{2}$

一旦我有了f'{v}_{f[:-4]}.parquet'格式的文件,我就可以使用从所有文件中提取的pandas对它们进行压缩(file_1.csvfile_2.csv

仪器10的最终文件应该如下所示,其中t7t9处的观测值与其他文件中仪器10的观测值串联在一起

time | code     | val           |
-----|----------|---------------|
t1   | c1_at_t1 | v_of_c1_at_t1 |
t1   | c2_at_t1 | v_of_c2_at_t1 |
t2   | c1_at_t2 | v_of_c1_at_t2 |
t2   | c3_at_t2 | v_of_c3_at_t2 |
t7   | c4_at_t7 | v_of_c4_at_t7 |
t9   | c5_at_t9 | v_of_c5_at_t9 |

Tags: 文件ofcsvcode仪器atfilet1
2条回答

如果每个文件都能放入内存,您可以尝试以下操作:

import dask.dataframe as dd
import pandas as pd
import numpy as np
import os

生成虚拟文件

^{pr2}$

定义函数

对于路径fldr_out/instrument=i/fileN.csv中的每个乐器,以下函数保存到parquet

def fun(x, fn, fldr_out):
    inst = x.instrument.unique()[0]
    filename = os.path.basename(fn)
    fn_out = f"{fldr_out}/instrument={inst}/{filename}"
    fn_out = fn_out.replace(".csv", ".parquet")
    os.makedirs(os.path.dirname(fn_out), exist_ok=True)
    x.drop("instrument", axis=1)\
     .to_parquet(fn_out, index=False)

你可以用它来分组

for f in files:
    fn = f"{fldr_in}/{f}"
    df = pd.read_csv(fn)
    df.groupby("instrument").apply(lambda x: fun(x, fn, fldr_out))

使用dask执行分析

现在您可以使用dask来读取结果并执行分析

df = dd.read_parquet(fldr_out)

我不太清楚你需要达到什么目标,但我认为你不需要任何团队来解决你的问题。在我看来这是一个简单的过滤问题。在

你可以在你的仪器上附加新的文件和文件。在

另外,我没有要实验的示例文件,但我认为您也可以使用chunksize的pandas来读取大型csv文件。在

示例:

import pandas as pd
import glob
import os

# maybe play around to get better performance 
chunksize = 1000000

files = glob.glob('./file_*.csv')
for f in files:

     for chunk in pd.read_csv(f, chunksize=chunksize):
         u_inst = chunk['instrument'].unique()

         for inst in u_inst:
             # filter instrument data
            inst_df = chunk[chunk.instrument == inst]
            # filter columns
            inst_df = inst_df[['time', 'code', 'val']]
            # append to instrument file
            # only write header if not exist yet
            inst_file = f'./instrument_{inst}.csv'
            file_exist = os.path.isfile(inst_file)
            inst_df.to_csv(inst_file, mode='a', header=not file_exist)

相关问题 更多 >