有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Flink SQL结果字段与LocalDateTime上请求的类型错误不匹配

当我在下面分组时,选择它获取类型加工错误。我已经尝试转换为时间戳,并尝试更改POJO LocalDateTime类型。大多数示例代码都转换为行。类找不到任何自定义类示例

SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name

抛出错误:

Exception in thread "main" org.apache.flink.table.api.TableException: Result field 'zaman' does not match requested type. Requested: GenericType<java.time.LocalDateTime>; Actual: LocalDateTime

代码:

    tableEnvironment.registerDataStream("STOCK", messageStream, "name, price, rowtime.rowtime");

    Table result = tableEnvironment.sqlQuery(
            "SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name");

    result.printSchema();

    FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(kp.getProducerProperties().getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
            "STOCKGROUP", new SimpleStringSchema());

    myProducer.setWriteTimestampToKafka(true);

    DataStream<Tuple2<Boolean, StockGroup>> stream = tableEnvironment.toRetractStream(result, StockGroup.class);

    stream.map(x -> x.f1.toString()).addSink(myProducer);

股票集团。POJO类

public String name;
public Double minPrice;
public Double maxPrice;
public Double avarage;
public Long sayi;
public LocalDateTime zaman;

印刷图式

root
 |-- name: STRING
 |-- minPrice: DOUBLE
 |-- maxPrice: DOUBLE
 |-- avarage: DOUBLE
 |-- sayi: BIGINT NOT NULL
 |-- zaman: TIMESTAMP(3) *ROWTIME*

共 (0) 个答案