有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

迭代Spark数据集的行并在JavaAPI中应用操作

刚刚接触Spark(2.4.x)并使用JavaAPI(notScala!!!)

我有一个Dataset是从CSV文件中读取的。它有一个模式(命名列),如下所示:

id (integer)  |  name (string)  |  color (string)  |  price (double)  |  enabled (boolean)

示例行:

23 | "hotmeatballsoup" | "blue" | 3.95 | true

数据集中有许多(上万)行。我想使用适当的Java/Spark API编写一个表达式,它滚动每一行并对每一行应用以下两个操作:

  1. 如果价格为null,则默认为0.00;然后
  2. 如果颜色列值为“红色”,则在价格中添加2.55

因为我对Spark如此陌生,我甚至不知道从哪里开始!到目前为止,我最好的尝试肯定是错误的,但我想这至少是一个起点:

Dataset csvData = sparkSession.read()
    .format("csv")
    .load(fileToLoad.getAbsolutePath());

// ??? get rows somehow
Seq<Seq<String>> csvRows = csvData.getRows(???, ???);

// now how to loop through rows???
for (Seq<String> row : csvRows) {
    // how apply two operations specified above???
    if (row["price"] == null) {
        row["price"] = 0.00;
    }

    if (row["color"].equals("red")) {
        row["price"] = row["price"] + 2.55;
    }
}

有人能帮我朝正确的方向走吗


共 (1) 个答案

  1. # 1 楼答案

    您可以使用spark sql api来实现它。也可以使用来自DataFrameNaFunctions.fill()值替换空值。否则,您可以将Dataframe转换为Dataset,并在.map中执行这些步骤,但在这种情况下,sql api更好、更有效

    + -+       -+  -+  -+   -+
    | id|           name|color|price|enabled|
    + -+       -+  -+  -+   -+
    | 23|hotmeatballsoup| blue| 3.95|   true|
    | 24|            abc|  red|  1.0|   true|
    | 24|            abc|  red| null|   true|
    + -+       -+  -+  -+   -+
    

    在类声明之前导入sql函数:

    import static org.apache.spark.sql.functions.*;
    

    sql api:

    df.select(
            col("id"), col("name"), col("color"),
            when(col("color").equalTo("red").and(col("price").isNotNull()), col("price").plus(2.55))
            .when(col("color").equalTo("red").and(col("price").isNull()), 2.55)
            .otherwise(col("price")).as("price")
            ,col("enabled")
    ).show();
    

    或使用临时视图和sql查询:

    df.createOrReplaceTempView("df");
    spark.sql("select id,name,color, case when color = 'red' and price is not null then (price + 2.55) when color = 'red' and price is null then 2.55 else price end as price, enabled from df").show();
    

    输出:

    + -+       -+  -+  -+   -+
    | id|           name|color|price|enabled|
    + -+       -+  -+  -+   -+
    | 23|hotmeatballsoup| blue| 3.95|   true|
    | 24|            abc|  red| 3.55|   true|
    | 24|            abc|  red| 2.55|   true|
    + -+       -+  -+  -+   -+