使用GroupByKey解决问题,我相信可以追溯到一个类型问题。我已经看了一段时间,并跟踪了一些堆栈跟踪,但我不清楚为什么下面的代码是错误的
@beam.typehints.with_output_types(beam.typehints.Tuple[long, float])
class MultiMap(beam.DoFn):
def process(self, element):
items = element.split(',')
print items
r = (long(items[0]), float(items[10]))
print r
return r
pipeline = beam.Pipeline()
pcoll = pipeline | 'start' >> beam.Create(['14172425165068797305,3,0,3,0.07,0.36,1,4,4,3705.00765154,0.235002550513','2746375035268210383,3,0,3,0.07,0.36,2,5,5,3789.1391067,0.263046368899','16101396351712676789,3,0,3,0.07,0.37,1,4,3,3639.26112282,0.213087040939'])
multi = pcoll | "Multimap" >> beam.ParDo(MultiMap()).with_output_types(beam.typehints.Tuple[long, float])
使用DirectRunner时,我得到以下异常。在
^{pr2}$需要弄清楚为什么无法将ParDo的输出传递到GroupByKey。在
好吧,我找到了一个解决办法,虽然我不清楚为什么需要改变。我所做的只是将process方法中的'return'改为'yield',它就起作用了。似乎返回时没有创建完整的pcollection。在删除示例中的类型提示时,也可以使用yield或return works。但是,使用类型提示只能产生效果。在
这是相当令人惊讶的行为,而且很难调试。beam docs on ParDo似乎使用了use return并产生了可互换的结果,而没有解释何时使用它们。在
这是一个bug还是丢失了文档?在
相关问题 更多 >
编程相关推荐