我在Python上使用apachebeam,想问一下pythonsdk上apachebeamjava Wait.on()
的等价物是什么?在
目前我对下面的代码片段有问题
if len(output_pcoll) > 1:
merged = (tuple(output_pcoll) |
'MergePCollections1' >> beam.Flatten())
else:
merged = output_pcoll[0]
outlier_side_input = self.construct_outlier_side_input(merged)
(merged |
"RemoveOutlier" >>
beam.ParDo(utils.Remove_Outliers(),
beam.pvalue.AsDict(outlier_side_input)) |
"WriteToCSV" >>
beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
self.OUTPUT), num_shards=1))
apachebeam似乎不会等到self.construct_outlier_side_input
上的代码执行完毕,并在下一个管道中执行“RemoveOutlier”时导致空端输入。在Java版本中,您可以使用Wait.On()
等待{
--编辑-- 我要达到的目标和这个环节几乎一样, https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task
您可以使用梁的附加输出功能来完成此操作。在
下面是一个示例代码片段
运行上述代码片段后,您将获得多个PCollections,如below、above和marked。然后,可以使用侧面输入进一步过滤或连接结果
希望有帮助。在
更新
基于这些评论,我想提到apachebeam有能力在}的帮助下进行状态处理。如果需求是通读一个PCollection,然后根据是否存在先验值做出决定,那么可以通过
^{pr2}$ValueState
和{BagState
来处理这些需求,如图所示以下:在相关问题 更多 >
编程相关推荐