我正在通过apachebeam编写一个简单的python管道来聚集用户投票。在
在输入中,我有如下逗号分隔的行:
pollA,answerB
pollA,answerC
pollB,answerA
pollB,answerB
pollC,answerE
pollA,answerB
接下来,我使用ParDo函数将每一行转换成这样的对象:
输出:
^{pr2}$功能:
class Split(beam.DoFn):
def process(self, element):
pollId, answerId = element.split(",")
return [{
'pollId': pollId,
'answerId': answerId,
'votes': 1
}]
现在,假设我得到了3个答案b,我想按答案id对它们进行分组,并对它们进行计数,从而得出如下结果:
{
pollId: pollA,
answerId: answerB,
votes: 3
}
我是python和apache beam的新手,希望能得到一些帮助:)
一个答案是认识到你的每一个记录都可以描述为:
pollId + answerId
1 // The vote
如果有一个PCollection是这种形式的键/值对,那么可以对该集合执行一个
CombinePerKey(sum)
,该集合将使用相同的键对其值进行相加,从而生成一个新的PCollection,该集合由新的键/值对组成,其中它们的值是具有相同pollId
的所有记录的总和和answerId
。在例如,请参阅CombinePerKeyPython文档以了解此函数的用法。在
相关问题 更多 >
编程相关推荐