带有postgres配置的kafka avro二进制用户

kafka-avro-binary-consumer的Python项目详细描述


首先,我们需要启动Confluent Kafka服务器,使用本文,了解如何做到这一点: https://docs.confluent.io/3.0.0/control-center/docs/quickstart.html

本项目的目的是: -使用二进制avro,将其分成不同的主题

要安装此项目,您需要: -安装所有包,包文件夹中的内容 -设置Postgres服务器并执行create_config_tables.sql和insert_to_config_tables.sql文件 -将binary_avro_consumer.py和conf.cnf放在服务器上,用命令执行python文件 python3.6 binary_avro_consumer.py(参数)

有关控制台执行参数的详细信息:

create tables语句存储在create_config_tables.sql文件中。 insert in to config tables语句存储在insert_to_config_tables.sql文件中。 您应该执行create语句,并将您的设置插入到这些表中。 你有这样的表格结构:

  config_key         |  config_value
 --------------------+----------------


  topic_name  | field_name
--------------+-------------

下面是它们的填充示例:

--The config key, means the key of some setting, there an explanation of their meaning--

     bootstrap_server_from - The bootstrap server from what we have messages, can be multiple times in db, because of multiple bootstrap server, kafka is cluster.
     bootstrap_server_from_port - The port of those bootstrap servers, usually bootstrap servers have the same port.\
     schema_registry - The schema registry url, should starts with http:// or https://
     schema_registry_port - The schema registry port
     topic_read - The topic, from what we need to read messages, so this topic is from `bootstrap_server_from` server.
     group_id - Usually uses default name `example_avro`, this parameter required  for consuming
     bootstrap_server_to - The server to what we writes messages, what we read and modified in `bootstrap_server_from`
     bootstrap_server_to_port - The port of `bootstrap_server_to`
     from_beginning - start consuming from beginning 1 - true, 0 - false 
     count_messages_consume - count of messages, what consumes per one iteration

             config_key         |  config_value
    ----------------------------+----------------
     bootstrap_server_from      | localhost
     bootstrap_server_from_port | 9092
     schema_registry            | http://0.0.0.0
     schema_registry_port       | 8081
     topic_read                 | avro-test
     group_id                   | example_avro
     bootstrap_server_to        | localhost
     bootstrap_server_to_port   | 9092
     from_beginning             | 1
     count_messages_consume     | 100

 topic_name  | field_name
--------------+-------------
 first_topic  | uid
 first_topic  | somefield
 second_topic | options hel
 second_topic | options mel


For example, you have such avro schema:

  "namespace" : "my.com.ns",
  "name": "myrecord",
  "type" :  "record",
  "fields" : [
     {"name": "uid", "type": "int"},
     {"name": "somefield", "type": "string"},
     {"name": "options", "type": {
        "type": "array",
        "items": {
            "type": "record",
            "name": "lvl2_record",
            "fields": [
                {"name": "hel", "type": "string"},
                {"name": "mel", "type": "string"}
                 }
               ]
            }
        }
     }

  ]
}

You need to extract such values from this schema: 

uid, somefield, options->hel, options->mel, and you need to store this values in first_topic and second_topic, so for example, we store uid and somefield in first_topic, 
    and options->hel, options->mel in second_topic. options->hel, options->mel means that field hel is a child of options, the same for mel.

So we write to db: first_topic uid,somefield  , what means, plz store uid and somefield in first_topic, the same for second_topic.

如何理解avro模式实际存储在模式注册表中的位置? 给你一个答案:

假设您创建了一个名为test的主题,并将架构注册到架构注册表,以了解架构是什么,并跟踪该架构是否已更改,您需要: 若要在终端中执行此命令,架构注册表服务器应正常工作(而不是http://localhost:8081/subjects您应放置架构注册表url): curl-x获取http://localhost:8081/subjects

Output of curl: ["Kafka-value","Kafka-key","test-value"]

You see, that your test topic also created 'test-value' subject, so the schema what you need is
http://localhost:8081/subjects/test-value/versions/latest",
in some cases your schema can be situated by `Kafka-value` path, so your url will be:  http://localhost:8081/subjects/Kafka-value/versions/latest"

Change this url conf.cnf file, more about conf.cnf:

conf.cnf, is config for python script:

    On FIRST line is schema registry url.
    On SECOND line is DB name.
    On THIRD line is username.
    On fourth line is password.
    On sixth line is host.
On the seventh line is the number of avro messages to produce by AvroProducer.

在所有这些之后,您需要启动名为pushpop_complex_avro.py的脚本, 通过这样的命令python3.6 pushpop_complex_avro.py(可选的params-d-i-e,分别用于debug、info和error)。 脚本运行后,它将等待消息,因此您需要生成要读取主题的消息: Python3.6 avro_producer.py 请看,这些信息被分割并指向第一个和第二个主题。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java在JSP中添加自定义隐式对象   java MasterMindResource泄漏?   不同的c#java结果加密   java为什么安卓 studio显示“constraintlayout中缺少约束”错误?   java Make Logback将日志打印到文件中   java如何在Google应用程序引擎中设置日期时间?   jeditorpane如何阻止java HTMLEditorKit自动关闭我的标记   返回到Activity onCreate()时,不会调用java函数   java为什么我在这个对象上得到一个NullPointerException?   在java中,如何使用包含数组的参数调用图形方法?   java如何在Play framework 2应用程序中存储Akka参与者列表?   ssh使用java将文件从一个solaris 9复制到另一个solaris 9   网络Java服务器正在等待客户端响应   java Spring mvc从formBackingObject()重定向到页面   java Spark:JavaRDD<Tuple2>到javapairdd<>   java如何动态调用基类中由字符串值指定的子类方法?