writestream聚合窗口水印数据帧不工作:

2024-04-28 07:00:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用CSV数据集作为输入,由readStream读取,如下所示:

inDF = spark \
    .readStream \
    .option("sep", ",") \
    .option("maxFilesPerTrigger", 1) \
    .schema(rawStreamSchema) \
    .csv(rawEventsDir)

在架构下:

^{pr2}$

我需要做一些汇总,如下所示:

byMeasureDF = inDF \
        .withWatermark('timeStamp', '600 seconds') \
        .groupBy(window(inDF.timeStamp, windowSize, windowStart)
                 , inDF.machine, inDF.module
                 , inDF.component, inDF.measure) \
        .agg(min(inDF.value).alias('minValue')
             , max(inDF.value).alias('maxValue')
             , avg(inDF.value).alias('avgValue')
             , stddev(inDF.value).alias('stdevValue'))

这是可行的,实际上输出模式是正确的:

byMeasureDF schema = 
root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- machine: string (nullable = true)
 |-- module: string (nullable = true)
 |-- component: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- minValue: double (nullable = true)
 |-- maxValue: double (nullable = true)
 |-- avgValue: double (nullable = true)
 |-- stdevValue: double (nullable = true)

但是,当我运行以下查询时:

q_byMeasure = byMeasureDF \
          .writeStream \
          .format('csv') \
          .option('delimiter', ',') \
          .option('header', 'true') \
          .outputMode('append') \
          .queryName('byMeasure') \
          .start(path = confStreamMiningByMeasureDir
                 , checkpointLocation = chkStreamMiningByMeasureDir)

我得到以下错误:

Traceback (most recent call last):
  File "/home/roberto/BOTT-G80216/Programs/Version_00.01.00/Python/2_fromRawToConformed.py", line 87, in <module>
    , checkpointLocation = chkStreamMiningByMeasureDir)
  File "/home/roberto/spark/python/pyspark/sql/streaming.py", line 844, in start
    return self._sq(self._jwrite.start(path))
  File "/home/roberto/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/roberto/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

请注意,如果我在控制台上编写相同的数据帧,它会很好地工作。在

好像在这个Spark版本中有一个bug。 有人知道可能的解决办法吗。 事先非常感谢


Tags: pyselftruehomestringvaluealiasstart