带有postgres配置的kafka avro二进制用户
kafka-avro-binary-consumer的Python项目详细描述
首先,我们需要启动Confluent Kafka服务器,使用本文,了解如何做到这一点: https://docs.confluent.io/3.0.0/control-center/docs/quickstart.html
本项目的目的是: -使用二进制avro,将其分成不同的主题
要安装此项目,您需要: -安装所有包,包文件夹中的内容 -设置Postgres服务器并执行create_config_tables.sql和insert_to_config_tables.sql文件 -将binary_avro_consumer.py和conf.cnf放在服务器上,用命令执行python文件 python3.6 binary_avro_consumer.py(参数)
有关控制台执行参数的详细信息:
create tables语句存储在create_config_tables.sql文件中。 insert in to config tables语句存储在insert_to_config_tables.sql文件中。 您应该执行create语句,并将您的设置插入到这些表中。 你有这样的表格结构:
config_key | config_value
--------------------+----------------
topic_name | field_name
--------------+-------------
下面是它们的填充示例:
--The config key, means the key of some setting, there an explanation of their meaning--
bootstrap_server_from - The bootstrap server from what we have messages, can be multiple times in db, because of multiple bootstrap server, kafka is cluster.
bootstrap_server_from_port - The port of those bootstrap servers, usually bootstrap servers have the same port.\
schema_registry - The schema registry url, should starts with http:// or https://
schema_registry_port - The schema registry port
topic_read - The topic, from what we need to read messages, so this topic is from `bootstrap_server_from` server.
group_id - Usually uses default name `example_avro`, this parameter required for consuming
bootstrap_server_to - The server to what we writes messages, what we read and modified in `bootstrap_server_from`
bootstrap_server_to_port - The port of `bootstrap_server_to`
from_beginning - start consuming from beginning 1 - true, 0 - false
count_messages_consume - count of messages, what consumes per one iteration
config_key | config_value
----------------------------+----------------
bootstrap_server_from | localhost
bootstrap_server_from_port | 9092
schema_registry | http://0.0.0.0
schema_registry_port | 8081
topic_read | avro-test
group_id | example_avro
bootstrap_server_to | localhost
bootstrap_server_to_port | 9092
from_beginning | 1
count_messages_consume | 100
topic_name | field_name
--------------+-------------
first_topic | uid
first_topic | somefield
second_topic | options hel
second_topic | options mel
For example, you have such avro schema:
"namespace" : "my.com.ns",
"name": "myrecord",
"type" : "record",
"fields" : [
{"name": "uid", "type": "int"},
{"name": "somefield", "type": "string"},
{"name": "options", "type": {
"type": "array",
"items": {
"type": "record",
"name": "lvl2_record",
"fields": [
{"name": "hel", "type": "string"},
{"name": "mel", "type": "string"}
}
]
}
}
}
]
}
You need to extract such values from this schema:
uid, somefield, options->hel, options->mel, and you need to store this values in first_topic and second_topic, so for example, we store uid and somefield in first_topic,
and options->hel, options->mel in second_topic. options->hel, options->mel means that field hel is a child of options, the same for mel.
So we write to db: first_topic uid,somefield , what means, plz store uid and somefield in first_topic, the same for second_topic.
如何理解avro模式实际存储在模式注册表中的位置? 给你一个答案:
假设您创建了一个名为test的主题,并将架构注册到架构注册表,以了解架构是什么,并跟踪该架构是否已更改,您需要: 若要在终端中执行此命令,架构注册表服务器应正常工作(而不是http://localhost:8081/subjects您应放置架构注册表url): curl-x获取http://localhost:8081/subjects
Output of curl: ["Kafka-value","Kafka-key","test-value"]
You see, that your test topic also created 'test-value' subject, so the schema what you need is
http://localhost:8081/subjects/test-value/versions/latest",
in some cases your schema can be situated by `Kafka-value` path, so your url will be: http://localhost:8081/subjects/Kafka-value/versions/latest"
Change this url conf.cnf file, more about conf.cnf:
conf.cnf, is config for python script:
On FIRST line is schema registry url.
On SECOND line is DB name.
On THIRD line is username.
On fourth line is password.
On sixth line is host.
On the seventh line is the number of avro messages to produce by AvroProducer.
在所有这些之后,您需要启动名为pushpop_complex_avro.py的脚本, 通过这样的命令python3.6 pushpop_complex_avro.py(可选的params-d-i-e,分别用于debug、info和error)。 脚本运行后,它将等待消息,因此您需要生成要读取主题的消息: Python3.6 avro_producer.py 请看,这些信息被分割并指向第一个和第二个主题。