有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Apache Flink writeAsCsv()方法来编写对象元组

我遵循ApacheFlink教程来清理出租车事件流。生成的流将打印到控制台。现在我想把它写入csv文件

        // configure event-time processing
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // get the taxi ride data stream
        DataStream<TaxiRide> rides = env.addSource(
                new TaxiRideSource(path, maxEventDelay, servingSpeedFactor));

        DataStream<TaxiRide> filteredRides = rides
                // filter out rides that do not start or stop in NYC
                .filter(new RideCleansing.NYCFilter());

        filteredRides.print();

我尝试了以下操作,但出现了错误:java.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.

DataStreamSink<TaxiRide> rides = filteredRides.writeAsCsv("/resources").setParallelism(1);

当我做DataSet<Tuple1<TaxiRide>> rides1 = filteredRides.writeAsCsv("/resources").setParallelism(1);时,它会导致编译器错误

我应该怎么做才能将生成的已清理滑行设备对象流写入csv文件


共 (1) 个答案

  1. # 1 楼答案

    DataStreamDataSet属于不同的API,不能混合使用。因此,出现了编译错误

    错误消息“writeAsCsv()方法只能用于元组的数据流。”这意味着,必须将DataStream<TaxiRide>对象转换为DataStream元组,才能将其写入CSV文件。 这可以通过一个简单的MapFunction来实现:

    DataStream<Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> rideTuples = filteredRides
       .map(new TupleConverter());
    

    其中TupleConverter被定义为

    class TupleConverter implements MapFunction<TaxiRide, Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> {
    
      public Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short> map(TaxiRide ride) {
         return Tuple9.of(ride.rideId, ride.isStart, ...);
      }
    }
    

    一旦有了DataStream{},就可以将其写入CSV文件