Python SDK流式数据流运行

2024-06-06 23:44:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个用Python编写的数据流,我正试图在GCP上运行。数据流一直以以下方式终止:

工作流失败。原因:未知消息代码。你知道吗

我的数据流管道中的主要代码是:

schema = 'Member_ID:INTEGER,First_Name:STRING,Last_Name:STRING,Gender:STRING,Age:INTEGER,Height:STRING,weight:INTEGER,Hours_Sleep:INTEGER,Calories_Consumed:INTEGER,Calories_Burned:INTEGER,Evt_Date:DATE,Height_Inches:INTEGER,Min_Sleep_Hours:INTEGER,Max_Sleep_Hours:INTEGER,Enough_Sleep:BOOL'

# read, transform and local source data
p = beam.Pipeline(options=options)

# Read from PubSub into a PCollection.
events = (p | 'Read PubSub' >> beam.io.ReadFromPubSub (topic='projects/prefab-envoy-220213/topics/health_event')
            | 'Parse CSV' >> beam.ParDo(getCSVFields())
            | 'Convert Types' >> beam.ParDo(ConvDataTypes())
            | 'Convert Height' >> beam.ParDo(ConvHeight())
            | 'Join CDC Sleep' >> beam.ParDo(CDCSleepJoin(), cdcsleep)
            | 'Create Row' >> beam.ParDo(CreateRow()) 
            | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
                    'prefab-envoy-220213:nhcdata.nhcevents', schema=schema,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
)

results = p.run()
results.wait_until_finish()

如果我删除

            | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
                    'prefab-envoy-220213:nhcdata.nhcevents', schema=schema,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))

然后数据流正常启动。你知道吗


Tags: iostringschemasleepinteger数据流writebeam
1条回答
网友
1楼 · 发布于 2024-06-06 23:44:36

beam.io.Write(beam.io.BigQuerySink())是本机数据流接收器,它仅适用于批处理作业。对于流式管道,应该改用^{}。用法:

beam.io.WriteToBigQuery(table ='table_name',dataset='dataset',project='project_id')

你可以在这个answer中找到一个很好的例子。你知道吗

相关问题 更多 >