使用pyspark数据框将所有数据收集到对应行的列中
我有一个数据表,里面的列就像下面的截图所示。我想添加一个新的列“all_data”,这个列里会包含所有其他列的数据。

这是我尝试过的方法:
from pyspark.sql.functions import collect_list, udf
from pyspark.sql.types import ArrayType, StringType
def read_file_content(file_path):
content = spark.read.json(file_path).rdd.map(lambda x: x[0]).collect()
return content
read_file_content_udf = udf(read_file_content, ArrayType(StringType()))
file_with_all_data = daftrame.withColumn("all_data", read_file_content_udf("file_name_input"))
不过用这种方法我遇到了错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 4 times, most recent failure: Lost task 0.3 in stage 63.0 (TID 5275) (10.99.0.10 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/.ipykernel/2377/command-3710246798592077-2084292290", line 12, in read_and_collect_data
File "/databricks/python/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 284, in _modified_open
return io_open(file, *args, **kwargs)
FileNotFoundError: [Errno 2] No such file or directory: 'abfss://soruce@storage_abs.dfs.core.windows.net/bite/searc/2024/03/asdaf-adase2-47217e-31-0150bda34e47_20240308_09-19-35.json'
而且文件是存在的,我可以在另一个数据表中读取到它。
所以最终的数据表应该是所有列都在一起,并且新增的“all_data”列里,每一行都存放着来自各个文件的数据。
列名“file_name_input”里有文件的位置,基本上是像这样的内容:“abfss://soruce@storage_abs.dfs.core.windows.net/bite/searc/2024/03/asdaf-adase2-47217e-31-0150bda34e47_20240308_09-19-35.json”,而且在“file_name_input”这一列里还有其他196个文件的名称和位置。
请问是否可以逐个读取这些文件,并把数据分别存储到新增的“all_data”列中呢?
1 个回答
暂无回答