滑动windows Python Apache Beam复制d

2024-03-29 02:30:58 发布

您现在位置:Python中文网/ 问答频道 /正文

问题

每次系统从带有滑动窗口的pubsub接收消息时,它都会被复制


代码

 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))    
 | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
 | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

输出

如果我只从pub/sub发送一条消息,并尝试在滑动窗口完成后打印我的代码:

^{pr2}$

结果

('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000

如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))之前打印消息,我只得到一次


在“图形模式:

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=X===========|       :       :
  w2:               |==============|       :
  ...

消息X在滑动窗口开始时只发送了一次,它应该只接收一次,但正在接收两次

我尝试过使用两个累计模式值,也使用了trigger=AftyerWatermark,但无法解决问题。在

会出什么问题?


额外

对于FixedWindows,这是我的porpouse的正确代码:

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())

或者

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

Tags: 代码消息模式windowbeamaverageelemcalculating
1条回答
网友
1楼 · 发布于 2024-03-29 02:30:58

所有属于该窗口的元素都将被发射。如果一个元素属于多个窗口,它将在每个窗口中发出。在

累积模式只在您计划处理延迟数据/多触发触发时才起作用。在这种情况下,当触发器再次触发时,丢弃模式只提供窗口中的新元素,即仅发射自上次触发后到达同一窗口的元素,已发射的元素不会再次发射并被丢弃。在累加模式下,每次触发时都会发出整个窗口,其中包括上次已发出的旧元素和此后到达的新元素。在

如果我理解你的例子,你有滑动窗,它们有30秒的长度,每15秒启动一次。所以它们重叠了15秒:

  time:   t+00 -t+15 -t+30  t+45  t+60   >
             :      :      :       :       :
  w1:        |=============|       :       :
  w2:               |==============|       :
  w3:                      |===============|
  ...

因此,您的案例中的任何元素都至少属于两个窗口(第一个和最后一个窗口除外)。在

例如,在您的示例中,如果您的消息是在17:07:15和17:07:30之间发送的,则它将出现在两个窗口中。在

固定窗口不重叠,因此元素只能属于一个窗口:

^{pr2}$

有关windows的详细信息:https://beam.apache.org/documentation/programming-guide/#windowing

相关问题 更多 >