我通过Pub/sub接收数据(json文件)
所以我的事件时间默认是在主题中发布的时间。
我想强制事件时间并改变它。
我在数据中添加了日期时间字段。
我想根据json文件的新时间戳字段进行聚合和组合。在
Ps:字段名为“timestamp”,它是一个字符串。这就是为什么我在数据流中将它转换为日期时间,然后转换为时间戳
def get_timestamp(data):
my_date = (data['timestamp']) # date : 2010-09-18......string
times = datetime.fromisoformat(my_date) #type: datetime.datetime
return beam.window.TimestampedValue(data, datetime.timestamp(times))
稍后,我将在管道中调用函数,然后再执行窗口操作
我从pubsub接收数据:
^{pr2}$然后进行处理:
(lines |'timestamp' >> beam.Map(get_timestamp)
| 'print timestamp' >> beam.ParDo(PrintFn2())
| 'window' >> beam.WindowInto(
window.FixedWindows(10),
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| 'CountGlobally' >> beam.CombineGlobally(
beam.combiners.CountCombineFn()
).without_defaults()
)
从PubSub读取时,为元素设置EvenTime的最佳方法是使用
Java withTimestampAttribute
Pythontimestamp_attribute
这将设置元素的时间戳并确保watermark signals具有良好的数据。在
如果这不是一个选项,您可以根据Adding Timestamps to a PCollection更改DoFn中元素的时间戳。但是,此方法不允许设置时间戳,然后设置当前水印。这就是为什么withTimestampAttribute方法是解决此模式的最佳方法。在
相关问题 更多 >
编程相关推荐