有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用sparksql和Spark流

尝试理解SparkSql与Spark结构化流媒体的关系
Spark会话从kafka主题读取事件,将数据聚合到按不同列名分组的计数,并将其打印到控制台。
原始输入数据的结构如下:

+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
|.  sourceTypes|                Guid|  platform|datacenter|pagesId|     eventTimestamp|              Id1234|  Id567890|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
| Notififcation|....................|   ANDROID|       dev|     aa|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................|   ANDROID|       dev|     ab|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:46|0ee089c1-d5da-3b3...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|57c18964-40c9-311...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:52|d9fc4cfa-0934-3e9...|{"id":...|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+---------+

需要对sourceTypesplatformdatacenterpageId进行计数

使用以下代码聚合数据:

Dataset<Row> query = sourceDataset
        .withWatermark("eventTimestamp", watermarkInterval)
        .select(
            col("eventTimestamp"),
            col("datacenter"),
            col("platform"),
            col("pageId")
        )
        .groupBy(
            window(col("eventTimestamp"), windowInterval),
            col("datacenter"),
            col("platform"),
            col("pageId")
        )
        .agg(
            max(col("eventTimestamp"))
        );

这里watermarkInterval=45secondswindowInterval=15seconds&triggerInterval=15seconds

正在使用新的聚合数据集:

aggregatedDataset
        .writeStream()
        .outputMode(OutputMode.Append())
        .format("console")
        .trigger(Trigger.ProcessingTime(triggerInterval))
        .start();

有几个问题:

  1. 输出数据未打印每个groupBy的计数,如平台、页面ID等。

  2. 如何以json格式打印输出?我尝试在控制台上输出数据时使用select(to_json(struct("*")).as("value")),但不起作用


共 (1) 个答案

  1. # 1 楼答案

    您可以使用以下代码片段解决问题:

    .outputMode("complete")