有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Kafka:在Java Producer发送消息之后,控制台使用者上没有看到任何消息

我是卡夫卡的新手。我在本地机器上创建了一个java producer,并在网络上的另一台机器(比如M2)上设置了一个Kafka代理(我可以ping、SSH,连接到这台机器)。在Eclipse控制台的生产者端,我得到“messagesent”。但当我检查机器M2上的控制台耗电元件时,我看不到这些消息

我的java producer代码是:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;


import java.util.HashMap;
import java.util.Map;

public class KafkaMessageProducer  {

    /**
     * @param args
     */
    public static void main(String[] args) {

        KafkaMessageProducer reportObj = new KafkaMessageProducer();
        reportObj.send();

    }

    public void send(){

        Map<String, Object> config = new HashMap<String, Object>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "135.113.133.60:9092");
        config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
        int maxMessages = 5;
        int count = 0;
        while(count < maxMessages){
            producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++));
            System.out.println("Message send.."+count);
        }
        producer.close();
    }

}

你能告诉我哪里出了问题吗?我可以从控制台生成器在机器M2上本地发送消息。 注意:即使我将IP地址更改为Kafka代理的完整主机名,它仍然存在相同的问题

更新:我还认为制作人能够连接到Kafka代理并发送消息,但Kafka代理不会将这些消息传递给消费者。如果我将IP地址或端口更改为Zookeeper(它与Kafka代理在同一节点上运行),并查看Zookeeper的日志,它将获得生产者ping,然后拒绝会话

更新2:我创建了一个Producer jar,并在机器M2上运行了这个jar,它工作了。因此,制片人试图与卡夫卡经纪人建立联系的方式似乎有问题。还不确定是什么问题


共 (3) 个答案

  1. # 1 楼答案

    您可以尝试使用如下代码来读取kafka主题的元数据信息,以查看代理是否收到了消息。这有助于调试

    SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), 100000,
          64 * 1024, "your_group_id");
    List<String> topics = new ArrayList<>();
    topics.add(topic);
    TopicMetadataRequest req = new TopicMetadataRequest(topics);
    
    TopicMetadataResponse resp = simpleConsumer.send(req);
    if (resp.topicsMetadata().size() != 1) {
      throw new RuntimeException("Expected one metadata for topic "
          + topic + " found " + resp.topicsMetadata().size());
    }
    
    TopicMetadata topicMetaData = resp.topicsMetadata().get(0);
    
  2. # 2 楼答案

    就像调试的想法一样-试试producer.send(/* record */).get(); 也就是说,等待从send()方法返回的Future的结果。可能是制作人方面有一个例外,它在后台被忽略了

  3. # 3 楼答案

    我终于找到了答案,我在这里发布,以防其他人也有同样的问题。使用公布的Kafka代理设置。尝试远程连接时的主机名。这对我有用