有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

无法反序列化'java'的实例。节点中的lang.String`。js服务器到卡夫卡连接

我使用curl向我们的Kafka Connect服务提交一条JSON请求消息,其中包含关于连接器的信息,它正在成功工作

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"database.server.name\": \"dbserver1\", \"database.whitelist\": \"inventory\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.inventory\" } }'

现在我使用node。js服务器将数据发送到kafka connect服务器

  var body = {
  "name": "abc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
};

  var options = {
      method: 'PUT',
      uri: 'http://localhost/connectors/abc/config',
      headers: {
          'User-Agent': 'Request-Promise'
      },
      json: true ,
      body: body
  };

  rp(options)
      .then(function (data) {
          return res.status(200).json({ 'data': data});
      })
      .catch(function (err) {
        console.log(err);
          return res.status(500).json({ error: err});
      });

然而,代码抛出了一个错误:说

{ StatusCodeError: 500 - {"error_code":500,"message":"Cannot deserialize instance of `java.lang.String` out of START_OBJECT token\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 42] (through reference chain: java.util.LinkedHashMap[\"config\"])"}

API描述来自https://docs.confluent.io/current/connect/references/restapi.html

enter image description here


共 (1) 个答案

  1. # 1 楼答案

    如果我正确阅读了confluent文档,那么您确实混淆了两个不同的API端点

    在代码中,使用端点/connectors/abc/config,根据文档,它将单个配置对象作为顶层,如下所示:

    {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "tasks.max": "1",
      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "dbz",
      "database.server.id": "184054",
      "database.server.name": "dbserver1",
      "database.whitelist": "inventory",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "schema-changes.inventory"
    }
    

    但是您的JSON对象看起来像是针对/connectors端点的

    更改端点或JSON对象以匹配您选择的端点可能会解决此问题