我使用CSV数据集作为输入,由readStream
读取,如下所示:
inDF = spark \
.readStream \
.option("sep", ",") \
.option("maxFilesPerTrigger", 1) \
.schema(rawStreamSchema) \
.csv(rawEventsDir)
在架构下:
^{pr2}$我需要做一些汇总,如下所示:
byMeasureDF = inDF \
.withWatermark('timeStamp', '600 seconds') \
.groupBy(window(inDF.timeStamp, windowSize, windowStart)
, inDF.machine, inDF.module
, inDF.component, inDF.measure) \
.agg(min(inDF.value).alias('minValue')
, max(inDF.value).alias('maxValue')
, avg(inDF.value).alias('avgValue')
, stddev(inDF.value).alias('stdevValue'))
这是可行的,实际上输出模式是正确的:
byMeasureDF schema =
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- machine: string (nullable = true)
|-- module: string (nullable = true)
|-- component: string (nullable = true)
|-- measure: string (nullable = true)
|-- minValue: double (nullable = true)
|-- maxValue: double (nullable = true)
|-- avgValue: double (nullable = true)
|-- stdevValue: double (nullable = true)
但是,当我运行以下查询时:
q_byMeasure = byMeasureDF \
.writeStream \
.format('csv') \
.option('delimiter', ',') \
.option('header', 'true') \
.outputMode('append') \
.queryName('byMeasure') \
.start(path = confStreamMiningByMeasureDir
, checkpointLocation = chkStreamMiningByMeasureDir)
我得到以下错误:
Traceback (most recent call last):
File "/home/roberto/BOTT-G80216/Programs/Version_00.01.00/Python/2_fromRawToConformed.py", line 87, in <module>
, checkpointLocation = chkStreamMiningByMeasureDir)
File "/home/roberto/spark/python/pyspark/sql/streaming.py", line 844, in start
return self._sq(self._jwrite.start(path))
File "/home/roberto/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/roberto/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
请注意,如果我在控制台上编写相同的数据帧,它会很好地工作。在
好像在这个Spark版本中有一个bug。 有人知道可能的解决办法吗。 事先非常感谢
目前没有回答
相关问题 更多 >
编程相关推荐