Streamparse连续调用下一个元组

2024-03-29 14:48:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试使用Streamparse在Python中编写一个简单的Storm拓扑。一切都在为我工作,除了我写的简单的卡夫卡喷口-它似乎只是不断地叫“下一个元组”。我的螺栓相当慢,所以系统似乎很快在内存中爆炸。在

启动拓扑时,我试图设置topology.max.pulot。挂起为1,以防止它向拓扑添加过多的消息。在

lein run -m streamparse.commands.run/-main topologies/.clj -t 100 --option 'topology.max.spout.pending=1' --option 'topology.workers=1' --option 'topology.acker.executors=1' 

然而,结果仍然是这样,尽管螺栓速度慢得多:

^{pr2}$

我简单的卡夫卡嘴:

class MetadataSpout(Spout):

    def initialize(self, stormconf, context):
        self.log('----CONFIG: %s----' % stormconf)
        k = KafkaClient(os.getenv('KAFKA'))
        self.consumer = SimpleConsumer(k, 'vacuum', 'metadata')

    def next_tuple(self):
        self.log('----NEXT TUPLE----')
        messages = self.consumer.get_messages(count=os.getenv('BATCH_COUNT', 20))
        self.emit([json.dumps([m.message.value for m in messages])])

我的bolts只有默认的配置,但是要完成process()方法需要很长时间。我不知道他们怎么会是问题,但我可以张贴,如果他们是相关的。在


Tags: runselflogconsumerosdefmaxgetenv
1条回答
网友
1楼 · 发布于 2024-03-29 14:48:37

Solved, thanks to the great Streamparse team

““topology.max.pulot.pending只有当你的嘴可靠时才有效果。您需要指定要发出的可选tup_id参数,以便为每个元组提供一个唯一的id。完成此操作后,一切都会正常。”

在为发出的元组指定UUID之后,这个问题就解决了。在

相关问题 更多 >