迭代Spark数据集的行并在JavaAPI中应用操作
刚刚接触Spark(2.4.x)并使用JavaAPI(notScala!!!)
我有一个Dataset
是从CSV文件中读取的。它有一个模式(命名列),如下所示:
id (integer) | name (string) | color (string) | price (double) | enabled (boolean)
示例行:
23 | "hotmeatballsoup" | "blue" | 3.95 | true
数据集中有许多(上万)行。我想使用适当的Java/Spark API编写一个表达式,它滚动每一行并对每一行应用以下两个操作:
- 如果价格为
null
,则默认为0.00
;然后 - 如果颜色列值为“红色”,则在价格中添加
2.55
因为我对Spark如此陌生,我甚至不知道从哪里开始!到目前为止,我最好的尝试肯定是错误的,但我想这至少是一个起点:
Dataset csvData = sparkSession.read()
.format("csv")
.load(fileToLoad.getAbsolutePath());
// ??? get rows somehow
Seq<Seq<String>> csvRows = csvData.getRows(???, ???);
// now how to loop through rows???
for (Seq<String> row : csvRows) {
// how apply two operations specified above???
if (row["price"] == null) {
row["price"] = 0.00;
}
if (row["color"].equals("red")) {
row["price"] = row["price"] + 2.55;
}
}
有人能帮我朝正确的方向走吗
# 1 楼答案
您可以使用spark sql api来实现它。也可以使用来自
DataFrameNaFunctions
的.fill()
值替换空值。否则,您可以将Dataframe转换为Dataset,并在.map
中执行这些步骤,但在这种情况下,sql api更好、更有效在类声明之前导入sql函数:
sql api:
或使用临时视图和sql查询:
输出: