java Flink SQL结果字段与LocalDateTime上请求的类型错误不匹配
当我在下面分组时,选择它获取类型加工错误。我已经尝试转换为时间戳,并尝试更改POJO LocalDateTime类型。大多数示例代码都转换为行。类找不到任何自定义类示例
SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name
抛出错误:
Exception in thread "main" org.apache.flink.table.api.TableException: Result field 'zaman' does not match requested type. Requested: GenericType<java.time.LocalDateTime>; Actual: LocalDateTime
代码:
tableEnvironment.registerDataStream("STOCK", messageStream, "name, price, rowtime.rowtime");
Table result = tableEnvironment.sqlQuery(
"SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name");
result.printSchema();
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(kp.getProducerProperties().getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
"STOCKGROUP", new SimpleStringSchema());
myProducer.setWriteTimestampToKafka(true);
DataStream<Tuple2<Boolean, StockGroup>> stream = tableEnvironment.toRetractStream(result, StockGroup.class);
stream.map(x -> x.f1.toString()).addSink(myProducer);
股票集团。POJO类
public String name;
public Double minPrice;
public Double maxPrice;
public Double avarage;
public Long sayi;
public LocalDateTime zaman;
印刷图式
root
|-- name: STRING
|-- minPrice: DOUBLE
|-- maxPrice: DOUBLE
|-- avarage: DOUBLE
|-- sayi: BIGINT NOT NULL
|-- zaman: TIMESTAMP(3) *ROWTIME*
共 (0) 个答案