我试图使用一个变换的输出作为下一个变换的边输入。但是我看到侧面输入是空的。有人能帮我弄清楚这个问题吗
我的原始代码有点复杂,但这是一个示例
import logging
import apache_beam as beam
from apache_beam import pvalue
class transfrom(beam.DoFn):
def process(self,element):
yield pvalue.TaggedOutput("example",tuple(element.items()))
class Test(beam.DoFn):
def process(self,element):
with beam.Pipeline() as p:
read = (
p | 'read' >> beam.io.Read(beam.io.BigQuerySource(query="""{}""".format(query_input_table),use_standard_sql=True)))
trans = (read | 'transform' >> beam.Pardo(transform()).with_outputs())
((read | 'transform' >> beam.Pardo(Test(),pvalue.AsDict(trans))))
在这种情况下,
transfrom
DoFn不需要输出特定标记上的元素。您可以执行以下操作:并将元素作为侧输入传递:
现在,请确定:
transfrom
中的代码正确吗?您应该返回包含两个元素的元组,以便可以构建字典。这就是tuple(element.items())
返回的结果吗相关问题 更多 >
编程相关推荐