下面的代码创建了一个数据帧,如下所示:
ratings = spark.createDataFrame(
sc.textFile("myfile.json").map(lambda l: json.loads(l)),
)
ratings.registerTempTable("mytable")
final_df = sqlContext.sql("select * from mytable");
The data frame look something like this
我正在将created_at
和user_id
存储到一个列表中:
user_id_list = final_df.select('user_id').rdd.flatMap(lambda x: x).collect()
created_at_list = final_df.select('created_at').rdd.flatMap(lambda x: x).collect()
并通过一个列表进行解析以调用另一个函数:
for i in range(len(user_id_list)):
status=get_status(user_id_list[I],created_at_list[I])
我想在数据框中创建一个名为status的新列,并更新相应的user_id_list
和created_at_list value
的值
我知道我需要使用这个功能-但不知道如何继续
final_df.withColumn('status', 'give the condition here')
不要创建列表。只需给dataframe一个UDF函数
相关问题 更多 >
编程相关推荐