GroupByKey的光束类型

2024-04-29 11:16:35 发布

您现在位置:Python中文网/ 问答频道 /正文

使用GroupByKey解决问题,我相信可以追溯到一个类型问题。我已经看了一段时间,并跟踪了一些堆栈跟踪,但我不清楚为什么下面的代码是错误的

@beam.typehints.with_output_types(beam.typehints.Tuple[long, float])
class MultiMap(beam.DoFn):
   def process(self, element):
      items = element.split(',')
      print items
      r =  (long(items[0]), float(items[10]))
      print r
      return r


   pipeline = beam.Pipeline()
   pcoll = pipeline | 'start' >> beam.Create(['14172425165068797305,3,0,3,0.07,0.36,1,4,4,3705.00765154,0.235002550513','2746375035268210383,3,0,3,0.07,0.36,2,5,5,3789.1391067,0.263046368899','16101396351712676789,3,0,3,0.07,0.37,1,4,3,3639.26112282,0.213087040939'])
   multi = pcoll | "Multimap" >> beam.ParDo(MultiMap()).with_output_types(beam.typehints.Tuple[long, float])

使用DirectRunner时,我得到以下异常。在

^{pr2}$

需要弄清楚为什么无法将ParDo的输出传递到GroupByKey。在


Tags: outputpipelinewithitemselementfloatlongtypes
1条回答
网友
1楼 · 发布于 2024-04-29 11:16:35

好吧,我找到了一个解决办法,虽然我不清楚为什么需要改变。我所做的只是将process方法中的'return'改为'yield',它就起作用了。似乎返回时没有创建完整的pcollection。在删除示例中的类型提示时,也可以使用yield或return works。但是,使用类型提示只能产生效果。在

这是相当令人惊讶的行为,而且很难调试。beam docs on ParDo似乎使用了use return并产生了可互换的结果,而没有解释何时使用它们。在

这是一个bug还是丢失了文档?在

相关问题 更多 >