如何更改apachebeam中的事件时间?

2024-04-26 22:38:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我通过Pub/sub接收数据(json文件) 所以我的事件时间默认是在主题中发布的时间。 我想强制事件时间并改变它。 我在数据中添加了日期时间字段。
我想根据json文件的新时间戳字段进行聚合和组合。在

Ps:字段名为“timestamp”,它是一个字符串。这就是为什么我在数据流中将它转换为日期时间,然后转换为时间戳

def get_timestamp(data):
    my_date = (data['timestamp']) # date : 2010-09-18......string
    times = datetime.fromisoformat(my_date) #type: datetime.datetime
    return beam.window.TimestampedValue(data, datetime.timestamp(times))

稍后,我将在管道中调用函数,然后再执行窗口操作

我从pubsub接收数据:

^{pr2}$

然后进行处理:

 (lines |'timestamp' >> beam.Map(get_timestamp)
           | 'print timestamp' >> beam.ParDo(PrintFn2())
           | 'window' >> beam.WindowInto(
            window.FixedWindows(10),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
    )

Tags: 文件jsondatagetdatetimedatemy时间
1条回答
网友
1楼 · 发布于 2024-04-26 22:38:47

从PubSub读取时,为元素设置EvenTime的最佳方法是使用

Java withTimestampAttribute

Pythontimestamp_attribute

这将设置元素的时间戳并确保watermark signals具有良好的数据。在

如果这不是一个选项,您可以根据Adding Timestamps to a PCollection更改DoFn中元素的时间戳。但是,此方法不允许设置时间戳,然后设置当前水印。这就是为什么withTimestampAttribute方法是解决此模式的最佳方法。在

相关问题 更多 >