不接收信息与融合卡夫卡简单生产者/消费者的例子?

2024-05-23 23:09:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用confluent-kafka-0.9.2(主分支)python绑定运行kafka_2.11-0.10.1.1,它使用librdkafka-0.9.2。我的机器运行ubuntu-16.04x86_64。我在端口zookeeper-3.4.8-1上运行zookeeper-3.4.8-1。我运行confluentproducer示例如下:

$ cd confluent-kafka-python/examples
$ python producer.py localhost:9095 confluent-01
first message
2nd msg

以及消费者

^{pr2}$

所有东西都在我的机器上本地运行,它不运行任何防火墙。在

备注

  • 主题已在Zookeeper上成功创建
  • 代理已成功接收生产者消息:
  • 使用者设置如下conf{'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'api.version.request': True }
  • 在一开始,producer/consumer可以很好地工作一段时间,直到我得到Receive failed: Disconnected的制作人。行使:

$ python producer.py  localhost:9095 confluent-02
asd
% Message delivered to confluent-02 [0]
1234123
890890
% Message delivered to confluent-02 [0]
%3|1485791262.420|FAIL|rdkafka#producer-1| [thrd:obscura.ax.example.com:9095/3]: obscura.ax.example.com:9095/3: Receive failed: Disconnected

问题:过了一段时间,我对消费者没有任何看法

问题:

  1. 我做错什么了?在
  2. 如何验证已在代理端接收到生产者消息? 在代理端正确地接收到生产者消息。在
  3. 如何调试用户端? 我将'debug': "cgrp, topic, fetch"添加到consumer conf中。在哪里可以读取日志?在

Tags: producerkafkapylocalhost消息代理topicconsumer
2条回答

我有两个建议:

1)尝试在consumer命令中添加从头到尾的选项

2)代理的默认端口是9092,因此请检查要使用的正确端口

希望这有帮助。在

我终于成功了。最初我运行了confluent-kafka tutorial,它:

  • 不捕获ctrl+cSIGINT信号
  • poll()时不超时

在消费者守则中。因此,我不得不在我的linux机器上ctrl+z然后kill %1。我相信这个终端并没有关闭一段时间保持打开状态的套接字(TIME_WAIT)。当我重新启动消费者时,它从旧插座中取出垃圾,卡住了。在

我添加了try: [...] except KeyboardInterrupt: consumer.close()来捕获{}并干净地关闭套接字。不再面对这个问题。在

我希望这对将来的人有帮助。在

相关问题 更多 >