处理管道分隔文件中的列换行
我们有一个50GB的文本文件,里面的数据是用竖线(|)分隔的,格式如下:
列1|列2|列3|列4|列5
值1|值2|值3|值4|值5
问题是,当我们想把这个文件读入pyspark的数据框时,每一列的值在固定宽度后就会换行。如果每一列的值都用引号包起来,我就可以告诉spark使用多行模式,把引号里的内容当作一整块来处理,即使中间有换行。但在这种情况下我该怎么处理呢?我需要在加载到Spark之前先处理一下这个文件吗?
希望能找到一个解决方案来读取这个文件。
1 个回答
0
你可以使用 dask
来完成这个预处理的任务。下面的代码会把一个50GB的文件分成500MB的小块来处理,并且把结果写成5个部分。所有的操作都是延迟执行的,就像在spark中一样。处理完后告诉我结果如何。你可能需要先去掉数据中的表头行,然后再在你的spark数据框中提供这个表头。
安装dask的方法是:
pip install dask[complete]
import dask.bag as db
from dask.distributed import Client
from dask import delayed
def preprocess_line(line):
processed_line = '|'.join([f'"{field}"' for field in line.split('|')])
return processed_line
if __name__ == '__main__':
input_file = "../data/pipedelimited.csv"
client = Client(n_workers=8, threads_per_worker=4)
b = db.read_text(input_file, blocksize="500MB") # blocksize=None for streaming, read more on the options here
line_count = b.count()
line_count_computed = line_count.compute()
print(f"count of lines in whole file = {line_count_computed}")
delayed_partitions = b.to_delayed()
first_flag = True
first_line = None
second_line = None
processed_lines = []
for delayed_partition in delayed_partitions:
partition = delayed_partition.compute()
lines = iter(partition)
print(f"first line = {lines}")
try:
while True:
if first_flag:
first_line = next(lines)
first_flag = False
continue
else:
second_line = next(lines)
final_line = first_line + second_line.strip()
processedline = preprocess_line(final_line)
processed_lines.append(processedline)
#print(processedline)
first_flag = True
except StopIteration:
print("Reached the end of the list.")
processed_bag = db.from_sequence(processed_lines, npartitions=5)
output_path = "../dask_output/processed_corrected.csv"
processed_bag.to_textfiles(output_path)
输出结果如下:
count of lines in whole file = 2592
first line = <list_iterator object at 0x724570f54ac0>
Reached the end of the list.