卡夫卡的消费者

kafka_influxdb的Python项目详细描述


Build Status
Coverage Status
Code Climate
PyPi Version
Scrutinizer
用python编写的infloxdb的kafka消费者。
支持IntroxDB 0.9.x及更高版本。对于IntroxDB 0.8.x支持,请查看0.3.0 tag

用例

在高负载期间,kafka将用作度量数据的缓冲区。
此外,它还可用于将具有不可靠连接的离岸数据中心的指标发送到您的监控后端。

image5

快速启动

为了快速测试,在kafka和infloxdb旁边的容器中运行kafka infloxdb。一些示例消息在启动时自动生成(使用kafkacat)。

Python2:

make
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s

Python3:

make RUNTIME=py3
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s

皮比5.x

make RUNTIME=pypy
docker exec -it kafkainfluxdb
pypy3 -m kafka_influxdb -c config_example.yaml -s --kafka_reader=kafka_influxdb.reader.kafka_python

(注意,还提供了一个附加标志:--kafka_reader=kafka_influxdb.reader.kafka_python。这是因为pypy与汇合的kafka消费者不兼容,后者是librdkafka的c扩展。因此,我们在这里使用kafka_python库,它与pypy兼容,但速度稍慢一些。)

安装

pip install kafka_influxdb
kafka_influxdb -c config-example.yaml

性能

下图显示了从kafka读取的各种python版本和kafka消费者插件的消息数。
这是针对具有10个分区和5个消息代理的kafka主题进行的测试。
正如您所看到的,在python 3上使用字节码优化的-O标志和confluent-kafka读取器(默认设置)可以获得最佳性能。注意,编码和向NeXDB发送数据可能会降低此最大性能,尽管与LogSTASH相比仍应看到显著的性能提升。

Benchmark results

基准

对于快速基准测试,可以使用以下命令启动完整的kafkacat -> Kafka -> kafka_influxdb -> Influxdb设置:

make

这将立即开始从卡夫卡读取消息并将其写入infloxdb。要查看输出,可以使用infloxdb cli。

docker exec -it docker_influxdb_1 bash # Double check your container name
influx
use metrics
show measurements

支持的格式

您可以编写一个自定义编码器来支持任何输入和输出格式(甚至是像protobuf这样的奇特的东西)。请看encoder目录中的示例以开始。官方支持以下格式:

输入格式

mydatacenter.myhost.load.load.shortterm 0.45 1436357630
[{"values":[0.6],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"1","type":"percent","type_instance":"system"}]

输出格式

load_load_shortterm,datacenter=mydatacenter,host=myhost value="0.45" 1436357630

配置

查看config-example.yaml以了解如何创建配置文件。
您可以覆盖命令行中的设置。允许使用以下参数:
OptionDescription
^{tt7}$, ^{tt8}$Show help message and exit
^{tt9}$Hostname or IP of Kafka message broker (default: localhost)
^{tt10}$Port of Kafka message broker (default: 9092)
^{tt11}$Topic for metrics (default: my_topic)
^{tt12}$Kafka consumer group (default: my_group)
^{tt13}$Kafka client library to use (kafka_python or confluent) (default: kafka_influxdb.reader.confluent)
^{tt14}$InfluxDB hostname or IP (default: localhost)
^{tt15}$InfluxDB API port (default: 8086)
^{tt16}$InfluxDB username (default: root)
^{tt17}$InfluxDB password (default: root)
^{tt18}$InfluxDB database to write metrics into (default: metrics)
^{tt19}$Use SSL connection for InfluxDB (default: False)
^{tt20}$Verify the SSL certificate before connecting (default: False)
^{tt21}$Max number of seconds to establish a connection to InfluxDB (default: 5)
^{tt22}$Use UDP connection for InfluxDB (default: False)
^{tt23}$Retention policy for incoming metrics (default: autogen)
^{tt24}$Precision of incoming metrics. Can be one of ‘s’, ‘m’, ‘ms’, ‘u’ (default: s)
^{tt25}$Input encoder which converts an incoming message to dictionary (default: collectd_graphite_encoder)
^{tt26}$Maximum number of messages that will be collected before flushing to the backend (default: 1000)
^{tt27}$, ^{tt28}$Configfile path (default: None)
^{tt29}$, ^{tt30}$Show performance statistics (default: True)
^{tt31}$, ^{tt32}$Set verbosity level. Increase verbosity by adding a v: -v -vv -vvv (default: 0)
^{tt33}$Show version

与其他工具比较

对于logstash,有一个kafka输入插件和一个infloxdb输出插件。它支持IntroxDB 0.9+。通过这种设置,我们已经实现了大约5000消息/秒的消息吞吐量。查看docker/logstash/config.conf上的配置,您可以自己运行基准:

make RUNTIME=logstash
docker exec -it logstash
logstash -f config.conf

如果您知道这里可以提到的其他工具,请发送一个拉取请求。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java根据两个数组的值对数组进行排序   具有自签名证书和NTLM代理的java Maven SSL repo错误   java自定义字体按钮不工作AndroidStudio   java通过Spring MVC web应用程序向客户端发送文本文件   Java Spring Web服务SOAP身份验证   ANT property environment=“env”无法在JAVA中检索它,但如果作为ANT命令运行,则可以正常工作   java是为spring mvc rest api或spring boot api对应用服务器的每个新请求创建的服务、存储库和组件的新实例吗?   java私有静态最终字符串未完成其工作   PKCS12的安全Java密钥重新处理   java JPA继承表每类SQLSyntaxErrorException