带有postgres配置的kafka avro二进制用户

kafka-avro-binary-consumer的Python项目详细描述


首先,我们需要启动Confluent Kafka服务器,使用本文,了解如何做到这一点: https://docs.confluent.io/3.0.0/control-center/docs/quickstart.html

本项目的目的是: -使用二进制avro,将其分成不同的主题

要安装此项目,您需要: -安装所有包,包文件夹中的内容 -设置Postgres服务器并执行create_config_tables.sql和insert_to_config_tables.sql文件 -将binary_avro_consumer.py和conf.cnf放在服务器上,用命令执行python文件 python3.6 binary_avro_consumer.py(参数)

有关控制台执行参数的详细信息:

create tables语句存储在create_config_tables.sql文件中。 insert in to config tables语句存储在insert_to_config_tables.sql文件中。 您应该执行create语句,并将您的设置插入到这些表中。 你有这样的表格结构:

  config_key         |  config_value
 --------------------+----------------


  topic_name  | field_name
--------------+-------------

下面是它们的填充示例:

--The config key, means the key of some setting, there an explanation of their meaning--

     bootstrap_server_from - The bootstrap server from what we have messages, can be multiple times in db, because of multiple bootstrap server, kafka is cluster.
     bootstrap_server_from_port - The port of those bootstrap servers, usually bootstrap servers have the same port.\
     schema_registry - The schema registry url, should starts with http:// or https://
     schema_registry_port - The schema registry port
     topic_read - The topic, from what we need to read messages, so this topic is from `bootstrap_server_from` server.
     group_id - Usually uses default name `example_avro`, this parameter required  for consuming
     bootstrap_server_to - The server to what we writes messages, what we read and modified in `bootstrap_server_from`
     bootstrap_server_to_port - The port of `bootstrap_server_to`
     from_beginning - start consuming from beginning 1 - true, 0 - false 
     count_messages_consume - count of messages, what consumes per one iteration

             config_key         |  config_value
    ----------------------------+----------------
     bootstrap_server_from      | localhost
     bootstrap_server_from_port | 9092
     schema_registry            | http://0.0.0.0
     schema_registry_port       | 8081
     topic_read                 | avro-test
     group_id                   | example_avro
     bootstrap_server_to        | localhost
     bootstrap_server_to_port   | 9092
     from_beginning             | 1
     count_messages_consume     | 100

 topic_name  | field_name
--------------+-------------
 first_topic  | uid
 first_topic  | somefield
 second_topic | options hel
 second_topic | options mel


For example, you have such avro schema:

  "namespace" : "my.com.ns",
  "name": "myrecord",
  "type" :  "record",
  "fields" : [
     {"name": "uid", "type": "int"},
     {"name": "somefield", "type": "string"},
     {"name": "options", "type": {
        "type": "array",
        "items": {
            "type": "record",
            "name": "lvl2_record",
            "fields": [
                {"name": "hel", "type": "string"},
                {"name": "mel", "type": "string"}
                 }
               ]
            }
        }
     }

  ]
}

You need to extract such values from this schema: 

uid, somefield, options->hel, options->mel, and you need to store this values in first_topic and second_topic, so for example, we store uid and somefield in first_topic, 
    and options->hel, options->mel in second_topic. options->hel, options->mel means that field hel is a child of options, the same for mel.

So we write to db: first_topic uid,somefield  , what means, plz store uid and somefield in first_topic, the same for second_topic.

如何理解avro模式实际存储在模式注册表中的位置? 给你一个答案:

假设您创建了一个名为test的主题,并将架构注册到架构注册表,以了解架构是什么,并跟踪该架构是否已更改,您需要: 若要在终端中执行此命令,架构注册表服务器应正常工作(而不是http://localhost:8081/subjects您应放置架构注册表url): curl-x获取http://localhost:8081/subjects

Output of curl: ["Kafka-value","Kafka-key","test-value"]

You see, that your test topic also created 'test-value' subject, so the schema what you need is
http://localhost:8081/subjects/test-value/versions/latest",
in some cases your schema can be situated by `Kafka-value` path, so your url will be:  http://localhost:8081/subjects/Kafka-value/versions/latest"

Change this url conf.cnf file, more about conf.cnf:

conf.cnf, is config for python script:

    On FIRST line is schema registry url.
    On SECOND line is DB name.
    On THIRD line is username.
    On fourth line is password.
    On sixth line is host.
On the seventh line is the number of avro messages to produce by AvroProducer.

在所有这些之后,您需要启动名为pushpop_complex_avro.py的脚本, 通过这样的命令python3.6 pushpop_complex_avro.py(可选的params-d-i-e,分别用于debug、info和error)。 脚本运行后,它将等待消息,因此您需要生成要读取主题的消息: Python3.6 avro_producer.py 请看,这些信息被分割并指向第一个和第二个主题。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何在收到spring mqtt集成中停止重复订阅保留的消息   java改造安卓在textview中显示数据   读取CSV Java时获取While循环中的最后一行数据   将Java程序转换为小程序的部署   java如何在不向Src添加文件的情况下将文件放入GitLab测试   java包含字节数组列表的方法   java十进制转换错误   eclipse是由java引起的。lang.ClassNotFoundException:org。springframework。js。资源ResourceServlet   继承Java向上与向下   java自动调整标签内容的大小   java从Android中的AlertDialog返回信息   java在声明按钮时仿真程序中出现错误   java编写刷新线程的最佳方法   Java初学者错误   java需要帮助重置JComboBox   java如何用Spring/MultipartFile捕获中断流   java是否可以在JUnit中设置被测试类的方法的返回值?