有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

javascript如何从字符串中获取JSON元素(用JSON对象填充)。

我有一个JSON文件

{ "impressions":
        [
          {
            "impressionId": "7ad7a77fas346a7a2a1da6",
            "userId": "hafsa",
            "clientId": "400"
          },
          {
            "impressionId": "7ad7a77fas346a7a2a1da7",
            "userId": "asif",
            "clientId": "200"
          },
          {
            "impressionId": "7ad7a77fas346a7a2a1da8",
            "userId": "zadarov",
            "clientId": "300"
          },
          {
            "impressionId": "7ad7a77fas346a7a2a1da9",
            "userId": "julia",
            "clientId": "100"
          }
        ]
}

我正在研究卡夫卡,有一个NodeJS制作人和Java消费者。我必须在我的消费者中把每个印象作为一个单独的信息。我的nodeJS代码是:

console.log("Kafka is recieving JSON....");
var fs = require('fs');
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var producer = new Producer(client);


fs.readFile('data.json', 'utf8', function (err, data) {
    if (err) throw err;


    var jsonobj = JSON.parse(data);
    var countMessages = jsonobj.impressions.length;
    var impressionArr = [];
    impressionArr = jsonobj.impressions;
    payloads = [
        { topic: 'test', messages: impressionArr, partition: 0 }
    ];
    producer.on('ready', function(){
        producer.send(payloads, function(err, data){

            console.log(data);
        });
    });
    producer.on('error', function(err){
        console.log("Error: "+err)
    });


});

我的JAVA消费者是:

JavaInputDStream<String> messagesFrmSpecifiedOffset= KafkaUtils.createDirectStream(
                sc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                String.class,
                kafkaParams,
                fromOffset,
                new Function<MessageAndMetadata<String, String>, String>() {
                    public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {

                        return msgAndMd.message();
                    }
                }

        );

在JAVA consumer中,我得到的印象是单独的消息,如下所示

JavaDStream<Record> rdd_impressionRecords = messagesFrmSpecifiedOffset.map(
                new Function<String, Record>() {
                    public Record call(String impressions) throws Exception {

                        System.out.println("impressions: " + impressions);

                        Record sd = new Record();
                        return sd;
                    }
                });

但我的问题是: 我以对象形式获得输出,如:

impressions: [object Object]
impressions: [object Object]
impressions: [object Object]
impressions: [object Object]

我知道我正在以字符串的形式发送JSON和接收,但任何人都可以通过三种方式帮助我完成解决方案: 1.我可以从Java类中的[object object]获取键、值吗。 2.是否可以在NodeJS中以字符串形式发送每个数组元素(使用JSON.stringify),而无需任何循环。就像我现在做的那样直接转移。 3.我可以重写直接返回JSON obect的createDirectStream方法吗


共 (5) 个答案

  1. # 2 楼答案

    使用JSON。在将消息发送到卡夫卡总线之前进行字符串化。例如,您的代码更改为messages:JSON。使(印象派的)字符串化

  2. # 3 楼答案

    客户端代码可以是:

    var _ = require('lodash');
    
    var payloads = _.map(JSON.parse(data).impressions, function(imp){
      return JSON.stringify(imp); 
    });
    producer.send(payloads, function(err, data){
    
    });
    

    Spark代码可以使用json简单库:

    import org.json.simple.*;
    JSONObject imp = (JSONObject) JSONValue.parse(impression);
    String impressionId = (String) imp.get("impressionId");
    
  3. # 4 楼答案

    kafka节点不支持向kafka生成对象的AFAIK。如果您尝试这样做,对象本身的格式将是[object object],实际上是字符串“[object object]”。这就是为什么不能从Java反序列化。 在节点代码中生成时尝试JSON.stringify(impressionArr),在Java端消费时尝试一些JSON库

  4. # 5 楼答案

    除了Zdeněk Tisoň回答:

    a)首先使用JSON。stringify()创建字符串并通过网络发送

    b)在用户端,将该字符串转换为您想要的任何字符串。如果希望它是Json,可以使用Json。parse()