能解释一下apache-beam流处理管道60秒固定窗口的输出吗?

0 投票
0 回答
24 浏览
提问于 2025-04-12 01:37

我写了一个数据处理流程,它从PubSub读取实时数据,汇总结果并把结果写入BigQuery。这个流程运行得很好,能把汇总结果写入BigQuery表里。不过,我不太明白它是怎么计算结果的。

下面是我用Apache Beam写的代码:

import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform

table_spec1 = bigquery.TableReference(
    projectId=<PROJECT_ID>,
    datasetId='training',
    tableId='dflow_agg')


SCHEMA = {
        "fields": [
            {
                "name": 'Name',
                "type": "STRING",
                
            },
            {
                "name": 'avg_sal',
                "type": "FLOAT64"
            },
            {
                "name": 'window_start',
                "type": "STRING",
                "mode": "NULLABLE"
            },
              {
                "name": 'window_end',
                "type": "STRING",
                "mode": "NULLABLE"
            }
        ]
    }

pipeline_options = PipelineOptions( streaming=True)

class ProcessWords(beam.DoFn):
  def process(self, ele):
    yield eval(ele)





def run():
    with beam.Pipeline(options=pipeline_options) as p:


        out= (
            p
            | "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
            | "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
            |"Formatting " >> beam.ParDo(ProcessWords()) #.with_output_types(CommonLog)
            | "Create beam Row" >> beam.Map(lambda x: beam.Row(Name=str(x[0]),Stream=str(x[1]),Salary=int(x[2])))
            
            |"window" >> beam.WindowInto(beam.window.FixedWindows(30))  
            | SqlTransform(
                """
                    SELECT
                    Name,
                    AVG(Salary) AS avg_sal
                    FROM PCOLLECTION
                    GROUP BY Name
                    
                """)
   
        | "Assemble Dictionary" >> beam.Map(
        lambda row,
        window=beam.DoFn.WindowParam: {
            "Name": row.Name,
            "avg_sal": row.avg_sal,
            "window_start": window.start.to_rfc3339(),
            "window_end": window.end.to_rfc3339()
        })



            | "Write to BigQuery" >> beam.io.WriteToBigQuery(
                table=table_spec1,
                dataset='training',
                schema=SCHEMA,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
            # |beam.MapTuple(lambda N,S,M : {"Name":N,"Stream":S,"Marks":M})
            # | beam.Map(print)
        )
    

    # p.run()
if __name__ == '__main__':
  run()

我有一段Python代码,它每5秒向一个PubSub主题发布消息。下面是我的发布者代码。

import time
import random
from google.cloud import pubsub_v1

project_id = "<PROJECT_ID>"
topic_name = "Test"
publisher = pubsub_v1.PublisherClient()

topic_path = publisher.topic_path(project_id, topic_name)
i=0
while i < 101:
    name=['A','B','C']
    sal=[10,12,11,23,77,54,23,21,4,9,5,22,19]
    stream=["Stream1","Stream2","Stream3"]

    data = '{}'.format((random.choice(name),random.choice(stream),random.choice(sal)))
    # Data must be a bytestring
    data = data.encode("utf-8")
    future = publisher.publish(topic_path, data)
    print(data)
    future.result()
    time.sleep(5)
    i+=1

我打印了发布到主题上的消息,下面是开始时的20条消息。

b"('B', 'Stream2', 23)"
b"('A', 'Stream3', 77)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream2', 10)"
b"('B', 'Stream3', 10)"
b"('B', 'Stream2', 19)"
b"('C', 'Stream1', 11)"
b"('C', 'Stream2', 22)"
b"('A', 'Stream2', 12)"
b"('B', 'Stream1', 11)"
b"('A', 'Stream2', 23)"
b"('C', 'Stream3', 23)"
b"('A', 'Stream2', 4)"
b"('C', 'Stream2', 22)"
b"('B', 'Stream2', 4)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream3', 11)"
b"('C', 'Stream2', 10)"
b"('C', 'Stream2', 22)"
b"('A', 'Stream2', 19)"

在BigQuery中按窗口开始时间排序的结果如下:

select * from training.dflow_agg order by window_start

BigQuery表数据

现在,我想了解一下,平均值是怎么计算的。

案例1:如果窗口大小是60秒,那么第一个窗口将包含以下元素:

b"('B', 'Stream2', 23)"
b"('A', 'Stream3', 77)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream2', 10)"
b"('B', 'Stream3', 10)"
b"('B', 'Stream2', 19)"

所以,A的平均值是77——这是正确的,因为只有一个A,单个元素的平均值就是它本身。

B的平均值应该是23+10+10+10+19=72,除以4等于18(因为总共有4个B)。

为什么C没有和A、B在同一个窗口的开始和结束时间内呢?因为C也是在窗口时间的30秒内发布的(见上面的PubSub输出)。

有人能解释一下这个输出吗?

谢谢大家。

以上是我尝试过的所有内容。

0 个回答

暂无回答

撰写回答