能解释一下apache-beam流处理管道60秒固定窗口的输出吗?
我写了一个数据处理流程,它从PubSub读取实时数据,汇总结果并把结果写入BigQuery。这个流程运行得很好,能把汇总结果写入BigQuery表里。不过,我不太明白它是怎么计算结果的。
下面是我用Apache Beam写的代码:
import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform
table_spec1 = bigquery.TableReference(
projectId=<PROJECT_ID>,
datasetId='training',
tableId='dflow_agg')
SCHEMA = {
"fields": [
{
"name": 'Name',
"type": "STRING",
},
{
"name": 'avg_sal',
"type": "FLOAT64"
},
{
"name": 'window_start',
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": 'window_end',
"type": "STRING",
"mode": "NULLABLE"
}
]
}
pipeline_options = PipelineOptions( streaming=True)
class ProcessWords(beam.DoFn):
def process(self, ele):
yield eval(ele)
def run():
with beam.Pipeline(options=pipeline_options) as p:
out= (
p
| "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
| "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
|"Formatting " >> beam.ParDo(ProcessWords()) #.with_output_types(CommonLog)
| "Create beam Row" >> beam.Map(lambda x: beam.Row(Name=str(x[0]),Stream=str(x[1]),Salary=int(x[2])))
|"window" >> beam.WindowInto(beam.window.FixedWindows(30))
| SqlTransform(
"""
SELECT
Name,
AVG(Salary) AS avg_sal
FROM PCOLLECTION
GROUP BY Name
""")
| "Assemble Dictionary" >> beam.Map(
lambda row,
window=beam.DoFn.WindowParam: {
"Name": row.Name,
"avg_sal": row.avg_sal,
"window_start": window.start.to_rfc3339(),
"window_end": window.end.to_rfc3339()
})
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
table=table_spec1,
dataset='training',
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
# |beam.MapTuple(lambda N,S,M : {"Name":N,"Stream":S,"Marks":M})
# | beam.Map(print)
)
# p.run()
if __name__ == '__main__':
run()
我有一段Python代码,它每5秒向一个PubSub主题发布消息。下面是我的发布者代码。
import time
import random
from google.cloud import pubsub_v1
project_id = "<PROJECT_ID>"
topic_name = "Test"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
i=0
while i < 101:
name=['A','B','C']
sal=[10,12,11,23,77,54,23,21,4,9,5,22,19]
stream=["Stream1","Stream2","Stream3"]
data = '{}'.format((random.choice(name),random.choice(stream),random.choice(sal)))
# Data must be a bytestring
data = data.encode("utf-8")
future = publisher.publish(topic_path, data)
print(data)
future.result()
time.sleep(5)
i+=1
我打印了发布到主题上的消息,下面是开始时的20条消息。
b"('B', 'Stream2', 23)"
b"('A', 'Stream3', 77)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream2', 10)"
b"('B', 'Stream3', 10)"
b"('B', 'Stream2', 19)"
b"('C', 'Stream1', 11)"
b"('C', 'Stream2', 22)"
b"('A', 'Stream2', 12)"
b"('B', 'Stream1', 11)"
b"('A', 'Stream2', 23)"
b"('C', 'Stream3', 23)"
b"('A', 'Stream2', 4)"
b"('C', 'Stream2', 22)"
b"('B', 'Stream2', 4)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream3', 11)"
b"('C', 'Stream2', 10)"
b"('C', 'Stream2', 22)"
b"('A', 'Stream2', 19)"
在BigQuery中按窗口开始时间排序的结果如下:
select * from training.dflow_agg order by window_start
现在,我想了解一下,平均值是怎么计算的。
案例1:如果窗口大小是60秒,那么第一个窗口将包含以下元素:
b"('B', 'Stream2', 23)"
b"('A', 'Stream3', 77)"
b"('C', 'Stream2', 10)"
b"('B', 'Stream2', 10)"
b"('B', 'Stream3', 10)"
b"('B', 'Stream2', 19)"
所以,A的平均值是77——这是正确的,因为只有一个A,单个元素的平均值就是它本身。
B的平均值应该是23+10+10+10+19=72,除以4等于18(因为总共有4个B)。
为什么C没有和A、B在同一个窗口的开始和结束时间内呢?因为C也是在窗口时间的30秒内发布的(见上面的PubSub输出)。
有人能解释一下这个输出吗?
谢谢大家。
以上是我尝试过的所有内容。
0 个回答
暂无回答