从Java producer到Mongodb的json主题数据
我用java生成数据并将其放入Kafka topic,然后我希望将这些数据放入MongoDB。 当我通过JAVA以JSON的形式发送数据时,由于这个错误,它不会存储到MongoDB中
[2020-08-15 18:42:19,164] ERROR WorkerSinkTask{id=Kafka_ops-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JSON reader was expecting a value but found 'siqdj'. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'siqdj'.
at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
at com.mongodb.kafka.connect.sink.converter.SinkConverter.convert(SinkConverter.java:44)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:229)
at java.util.ArrayList.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:228)
at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:169)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
at java.util.ArrayList.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
at java.util.HashMap.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2020-08-15 18:42:19,166] ERROR WorkerSinkTask{id=Kafka_ops-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'siqdj'.
at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
at com.mongodb.kafka.connect.sink.converter.SinkConverter.convert(SinkConverter.java:44)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:229)
at java.util.ArrayList.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:228)
at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:169)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
at java.util.ArrayList.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
at java.util.HashMap.forEach(Unknown Source)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
... 10 more
以下是我通过java程序在Kafka consumer中发送的数据
{"name":"This is a test","dept":"siqdj","studentId":1}
{"name":"This is another","dept":"siqdj","studentId":2}
每行代表一条记录
这是我的配置文件
独立连接。属性
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
plugin.path=/plugins
MongoSinkConnector。属性
name=Kafka_ops
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=TestTopic4
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
database=student_kafka
collection=students
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# 1 楼答案
塔里克——我不是这方面的专家。但我在JDBC sink adapter和Oracle数据库上也做过类似的尝试
您发送到主题的数据格式在我看来不正确。因此,您可能会遇到错误。由于您使用的是JsonConverter,因此主题中的每一行都应采用以下格式,以便接收器适配器解析并写入数据存储。 当前,您的数据在有效负载中没有架构。因此出现了错误
请将下面的内容传递给主题,看看它是否会进入MongoDB