处理管道分隔文件中的列换行

0 投票
1 回答
55 浏览
提问于 2025-04-14 15:59

我们有一个50GB的文本文件,里面的数据是用竖线(|)分隔的,格式如下:

列1|列2|列3|列4|列5

值1|值2|值3|值4|值5

问题是,当我们想把这个文件读入pyspark的数据框时,每一列的值在固定宽度后就会换行。如果每一列的值都用引号包起来,我就可以告诉spark使用多行模式,把引号里的内容当作一整块来处理,即使中间有换行。但在这种情况下我该怎么处理呢?我需要在加载到Spark之前先处理一下这个文件吗?

希望能找到一个解决方案来读取这个文件。

1 个回答

0

你可以使用 dask 来完成这个预处理的任务。下面的代码会把一个50GB的文件分成500MB的小块来处理,并且把结果写成5个部分。所有的操作都是延迟执行的,就像在spark中一样。处理完后告诉我结果如何。你可能需要先去掉数据中的表头行,然后再在你的spark数据框中提供这个表头。

安装dask的方法是:

pip install dask[complete]

import dask.bag as db
from dask.distributed import Client
from dask import delayed


def preprocess_line(line):
    processed_line = '|'.join([f'"{field}"' for field in line.split('|')])
    return processed_line


if __name__ == '__main__':

    input_file = "../data/pipedelimited.csv"

    client = Client(n_workers=8, threads_per_worker=4)
    b = db.read_text(input_file, blocksize="500MB")  # blocksize=None for streaming, read more on the options here
    line_count = b.count()
    line_count_computed = line_count.compute()
    print(f"count of lines in whole file = {line_count_computed}")

    delayed_partitions = b.to_delayed()

    first_flag = True
    first_line = None
    second_line = None
    processed_lines = []


    for delayed_partition in delayed_partitions:
        partition = delayed_partition.compute()
        lines = iter(partition)
        print(f"first line = {lines}")

        try:
            while True:
                if first_flag:
                    first_line = next(lines)
                    first_flag = False
                    continue
                else:
                    second_line = next(lines)
                    final_line = first_line + second_line.strip()
                    processedline = preprocess_line(final_line)
                    processed_lines.append(processedline)
                    #print(processedline)
                    first_flag = True

        except StopIteration:
            print("Reached the end of the list.")


    processed_bag = db.from_sequence(processed_lines, npartitions=5)

    output_path = "../dask_output/processed_corrected.csv"
    processed_bag.to_textfiles(output_path)

输出结果如下:

count of lines in whole file = 2592
first line = <list_iterator object at 0x724570f54ac0>
Reached the end of the list.

撰写回答