java Apache Flink writeAsCsv()方法来编写对象元组
我遵循ApacheFlink教程来清理出租车事件流。生成的流将打印到控制台。现在我想把它写入csv文件
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
new TaxiRideSource(path, maxEventDelay, servingSpeedFactor));
DataStream<TaxiRide> filteredRides = rides
// filter out rides that do not start or stop in NYC
.filter(new RideCleansing.NYCFilter());
filteredRides.print();
我尝试了以下操作,但出现了错误:java.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.
DataStreamSink<TaxiRide> rides = filteredRides.writeAsCsv("/resources").setParallelism(1);
当我做DataSet<Tuple1<TaxiRide>> rides1 = filteredRides.writeAsCsv("/resources").setParallelism(1);
时,它会导致编译器错误
我应该怎么做才能将生成的已清理滑行设备对象流写入csv文件
# 1 楼答案
DataStream
和DataSet
属于不同的API,不能混合使用。因此,出现了编译错误错误消息“writeAsCsv()方法只能用于元组的数据流。”这意味着,必须将
DataStream<TaxiRide>
对象转换为DataStream
元组,才能将其写入CSV文件。 这可以通过一个简单的MapFunction
来实现:其中
TupleConverter
被定义为一旦有了},就可以将其写入CSV文件
DataStream
{