解析数据fram以添加新列并更新列pysp

2024-04-26 18:00:38 发布

您现在位置:Python中文网/ 问答频道 /正文

下面的代码创建了一个数据帧,如下所示:

ratings = spark.createDataFrame(
    sc.textFile("myfile.json").map(lambda l: json.loads(l)),
)



ratings.registerTempTable("mytable")

final_df = sqlContext.sql("select * from mytable");

The data frame look something like this

我正在将created_atuser_id存储到一个列表中:

user_id_list = final_df.select('user_id').rdd.flatMap(lambda x: x).collect()
created_at_list = final_df.select('created_at').rdd.flatMap(lambda x: x).collect()

并通过一个列表进行解析以调用另一个函数:

for i in range(len(user_id_list)):
    status=get_status(user_id_list[I],created_at_list[I])

我想在数据框中创建一个名为status的新列,并更新相应的user_id_listcreated_at_list value的值

我知道我需要使用这个功能-但不知道如何继续

final_df.withColumn('status', 'give the condition here') 

Tags: 数据lambdaidjsondf列表statusmytable
1条回答
网友
1楼 · 发布于 2024-04-26 18:00:38

不要创建列表。只需给dataframe一个UDF函数

import pyspark.sql.functions as F
status_udf = F.udf(lambda x: get_status(x[0], x[1]))
df = df.select(df.columns + [status_udf(F.col('user_id_list'), \
               F.col('created_at_list value')).alias('status')])

相关问题 更多 >