等等,快()在Apache Beam Python SDK版本上

2024-06-16 12:06:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Python上使用apachebeam,想问一下pythonsdk上apachebeamjava Wait.on()的等价物是什么?在

目前我对下面的代码片段有问题

    if len(output_pcoll) > 1:
        merged = (tuple(output_pcoll) |
                  'MergePCollections1' >> beam.Flatten())
    else:
        merged = output_pcoll[0]

    outlier_side_input = self.construct_outlier_side_input(merged)

    (merged |
     "RemoveOutlier" >>
     beam.ParDo(utils.Remove_Outliers(),
                beam.pvalue.AsDict(outlier_side_input)) |
     "WriteToCSV" >>
     beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
                         self.OUTPUT), num_shards=1))

apachebeam似乎不会等到self.construct_outlier_side_input上的代码执行完毕,并在下一个管道中执行“RemoveOutlier”时导致空端输入。在Java版本中,您可以使用Wait.On()等待{}完成执行,但是我在pythonsdk中找不到等价的方法。在

--编辑-- 我要达到的目标和这个环节几乎一样, https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task


Tags: selfinputoutputonmergedconstructsidebeam
1条回答
网友
1楼 · 发布于 2024-06-16 12:06:59

您可以使用梁的附加输出功能来完成此操作。在

下面是一个示例代码片段

results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
           .with_outputs('above_cutoff_lengths', 'marked strings',
                         main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings']  # indexing works as well

运行上述代码片段后,您将获得多个PCollections,如below、above和marked。然后,可以使用侧面输入进一步过滤或连接结果

希望有帮助。在

更新

基于这些评论,我想提到apachebeam有能力在ValueState和{}的帮助下进行状态处理。如果需求是通读一个PCollection,然后根据是否存在先验值做出决定,那么可以通过BagState来处理这些需求,如图所示以下:在

^{pr2}$

相关问题 更多 >