如何将PTransform的输出作为侧输入传递?

2024-04-20 13:15:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图使用一个变换的输出作为下一个变换的边输入。但是我看到侧面输入是空的。有人能帮我弄清楚这个问题吗

我的原始代码有点复杂,但这是一个示例

import logging
import apache_beam as beam
from apache_beam import pvalue

class transfrom(beam.DoFn):
    def process(self,element):
        yield pvalue.TaggedOutput("example",tuple(element.items()))

class Test(beam.DoFn):
    def process(self,element):
        


with beam.Pipeline() as p:
    read = (
            p | 'read' >> beam.io.Read(beam.io.BigQuerySource(query="""{}""".format(query_input_table),use_standard_sql=True)))

    trans = (read | 'transform' >> beam.Pardo(transform()).with_outputs())

    ((read | 'transform' >> beam.Pardo(Test(),pvalue.AsDict(trans))))

Tags: testimportselfreadapachedefaswith
1条回答
网友
1楼 · 发布于 2024-04-20 13:15:00

在这种情况下,transfromDoFn不需要输出特定标记上的元素。您可以执行以下操作:

class transfrom(beam.DoFn):
    def process(self,element):
        yield tuple(element.items())

并将元素作为侧输入传递:

with beam.Pipeline() as p:
    read = (
            p | 'read' >> beam.io.Read(beam.io.BigQuerySource(query="""{}""".format(query_input_table),use_standard_sql=True)))

    trans = (read | 'transform' >> beam.Pardo(transform()))

    ((read | 'transform' >> beam.Pardo(Test(),pvalue.AsDict(trans))))

现在,请确定:transfrom中的代码正确吗?您应该返回包含两个元素的元组,以便可以构建字典。这就是tuple(element.items())返回的结果吗

相关问题 更多 >