卡夫卡的消费者
kafka_influxdb的Python项目详细描述
用python编写的infloxdb的kafka消费者。
支持IntroxDB 0.9.x及更高版本。对于IntroxDB 0.8.x支持,请查看0.3.0 tag。
用例
在高负载期间,kafka将用作度量数据的缓冲区。
此外,它还可用于将具有不可靠连接的离岸数据中心的指标发送到您的监控后端。
快速启动
为了快速测试,在kafka和infloxdb旁边的容器中运行kafka infloxdb。一些示例消息在启动时自动生成(使用kafkacat)。
Python2:
make docker exec -it kafkainfluxdb python -m kafka_influxdb -c config_example.yaml -s
Python3:
make RUNTIME=py3 docker exec -it kafkainfluxdb python -m kafka_influxdb -c config_example.yaml -s
皮比5.x
make RUNTIME=pypy docker exec -it kafkainfluxdb pypy3 -m kafka_influxdb -c config_example.yaml -s --kafka_reader=kafka_influxdb.reader.kafka_python
(注意,还提供了一个附加标志:--kafka_reader=kafka_influxdb.reader.kafka_python。这是因为pypy与汇合的kafka消费者不兼容,后者是librdkafka的c扩展。因此,我们在这里使用kafka_python库,它与pypy兼容,但速度稍慢一些。)
安装
pip install kafka_influxdb kafka_influxdb -c config-example.yaml
性能
下图显示了从kafka读取的各种python版本和kafka消费者插件的消息数。
这是针对具有10个分区和5个消息代理的kafka主题进行的测试。
正如您所看到的,在python 3上使用字节码优化的-O标志和confluent-kafka读取器(默认设置)可以获得最佳性能。注意,编码和向NeXDB发送数据可能会降低此最大性能,尽管与LogSTASH相比仍应看到显著的性能提升。
基准
对于快速基准测试,可以使用以下命令启动完整的kafkacat -> Kafka -> kafka_influxdb -> Influxdb设置:
make
这将立即开始从卡夫卡读取消息并将其写入infloxdb。要查看输出,可以使用infloxdb cli。
docker exec -it docker_influxdb_1 bash # Double check your container name influx use metrics show measurements
支持的格式
您可以编写一个自定义编码器来支持任何输入和输出格式(甚至是像protobuf这样的奇特的东西)。请看encoder目录中的示例以开始。官方支持以下格式:
输入格式
mydatacenter.myhost.load.load.shortterm 0.45 1436357630
[{"values":[0.6],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"1","type":"percent","type_instance":"system"}]
输出格式
load_load_shortterm,datacenter=mydatacenter,host=myhost value="0.45" 1436357630
配置
查看config-example.yaml以了解如何创建配置文件。
您可以覆盖命令行中的设置。允许使用以下参数:
Option | Description |
---|---|
^{tt7}$, ^{tt8}$ | Show help message and exit |
^{tt9}$ | Hostname or IP of Kafka message broker (default: localhost) |
^{tt10}$ | Port of Kafka message broker (default: 9092) |
^{tt11}$ | Topic for metrics (default: my_topic) |
^{tt12}$ | Kafka consumer group (default: my_group) |
^{tt13}$ | Kafka client library to use (kafka_python or confluent) (default: kafka_influxdb.reader.confluent) |
^{tt14}$ | InfluxDB hostname or IP (default: localhost) |
^{tt15}$ | InfluxDB API port (default: 8086) |
^{tt16}$ | InfluxDB username (default: root) |
^{tt17}$ | InfluxDB password (default: root) |
^{tt18}$ | InfluxDB database to write metrics into (default: metrics) |
^{tt19}$ | Use SSL connection for InfluxDB (default: False) |
^{tt20}$ | Verify the SSL certificate before connecting (default: False) |
^{tt21}$ | Max number of seconds to establish a connection to InfluxDB (default: 5) |
^{tt22}$ | Use UDP connection for InfluxDB (default: False) |
^{tt23}$ | Retention policy for incoming metrics (default: autogen) |
^{tt24}$ | Precision of incoming metrics. Can be one of ‘s’, ‘m’, ‘ms’, ‘u’ (default: s) |
^{tt25}$ | Input encoder which converts an incoming message to dictionary (default: collectd_graphite_encoder) |
^{tt26}$ | Maximum number of messages that will be collected before flushing to the backend (default: 1000) |
^{tt27}$, ^{tt28}$ | Configfile path (default: None) |
^{tt29}$, ^{tt30}$ | Show performance statistics (default: True) |
^{tt31}$, ^{tt32}$ | Set verbosity level. Increase verbosity by adding a v: -v -vv -vvv (default: 0) |
^{tt33}$ | Show version |
与其他工具比较
对于logstash,有一个kafka输入插件和一个infloxdb输出插件。它支持IntroxDB 0.9+。通过这种设置,我们已经实现了大约5000消息/秒的消息吞吐量。查看docker/logstash/config.conf上的配置,您可以自己运行基准:
make RUNTIME=logstash docker exec -it logstash logstash -f config.conf
如果您知道这里可以提到的其他工具,请发送一个拉取请求。