Kafka Store提供了一种从Kafka存档数据的简单方法

kafka_store的Python项目详细描述


##卡夫卡商店[构建状态](https://travis-ci.org/smyte/kafka_store.svg?branch=master)](https://travis ci.org/smyte/kafka-store)


kafka-store为kafka主题的长期存档提供了一种安全的方法。


Kafka Store确保Kafka主题中的每一条消息都备份到Google云存储**只备份一次**,使用**可预测的文件名**,并以**容错**的方式。
*以**低内存要求**将大的**压缩**avro编码的文件保存到您的服务器。
*可以选择将文件记录到**mysql表**,偏移量范围为更快的查找。

secor

这个工具与之前发布的名为[secor]的工具非常相似(https://github.com/pinterest/secor)。我们开始使用secor,但是我们编写替换的动机主要是由于**可预测的文件名**保证,以及在尝试使用比我们的用例所需复杂得多的工具时出现的许多生产问题。

通过使用kafka新的时间戳特性,我们可以确保每个消息总是在**同一个文件**中登陆。由于我们的文件是用初始消息的偏移量命名的,因此来自s3的流被简化了,因为下一个转储的文件名是可预测的("final_offset+1")。



*我们只针对长期存档。不支持输出分区、转换等。
*没有统计接口。我们建议基于卡夫卡延迟进行报警。

要求

*必须在您的卡夫卡经纪人上启用时间戳。这要求启用较新版本的Kafka和最低协议0.10.0.0。
*支持时间戳的"librdkafka"版本。如果您使用压缩,可能需要检查我们的[未合并的修补程序](https://github.com/edenhill/librdkafka/pull/858)。
*我们还不支持压缩主题。


--代理列表<;broker>;--topic<;topic>;--group<;group>;[选项]
kafka store(-h--help)


$kafka store reader
用法:
kafka store reader local<;path>;[选项]
kafka store reader(-h--help)
````

| kafkacat-p-b localhost-t sample-p 5

10、11、12、13、14、15、16、17、18、19
信息:卡夫卡存储。缓冲区:保存样本/000005/00000000000000000000>;/tmp/tmpirblvjzx
信息:卡夫卡存储。缓冲区:关闭样本/000005/00000000000000>;/tmp/tmpirblvjzx records=2 0.3kb
信息:kafka_store.handler:committed sample/000005/00000000000000000000
信息:kafka_store.buffer:saving sample/000005/00000000000000000002>;/tmp/tmpkz9mro1t

"key":空,"offset":1,"timestamp":1478570875023,"value":"world"}
下一个文件尚未准备好。等待:home/josh/kafka data/sample/000005/00000000000000000002
{"filename":"00000000000000000002","key":null,"offset":2,"timestamp":1478570890054,"value":"!}
下一个文件尚未准备好。等待:home/josh/kafka data/sample/000005/00000000000000000003

````

**注意**:初始运行需要"偏移重置",但不建议使用在那之后继续生产。

`不能立即通过。第一个文件在"world"之后关闭,因为从"hello"到"!"经过了20秒!,但由于没有更多消息,最终文件不会立即关闭。我们不能保证卡夫卡不会在前一条消息后15秒发送时间戳为的消息([时间很难](http://infiniteundo.com/post/2532699628/false程序员相信时间))。

当前设置需要等待八个小时才能达到超级安全,但这将确保最终提交流量不再增加的主题。

但我们非常清楚,它不能满足所有(甚至大部分)潜在的用例。不幸的是,作为一家初创公司,我们没有时间来完成这些工作,但我们很高兴审查拉取请求,并与社区合作,以获得所需的功能。

*使用配置文件,而不是通过命令行接受所有选项。这将是大多数其他任务的先决条件。
*完全支持谷歌云认证。目前,我们正在gce内部运行,因此默认身份验证*只起作用*
*支持s3、azure和其他长期存储系统。
*使用同一实例上的多个主题。目前我们只支持一个主题。



===
history
=



>0.1.4(2016-11-08)
----


*清理使用示例

<0.1.3(2016-11-08)
----


*开源版本!

0.1.0(2016-11-04)
----


*在PYPI上首次发布。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
从Java中的方法返回列表时遇到问题   java如何忽略json字段,并使用Jackson ObjectMapper获取其值以进行映射   spring通过更新其各自java对象的值来映射两个xsd文件   java从HttpClient获取500错误,在浏览器中工作   java使用物理键输入(耳机中的按钮)在安卓中执行一些操作   如何在int数组(java)中追加int?   java Spring RequestParam的默认值等于方法调用   java将JsonLayout添加到log4j2 json配置   Ubuntu上的maven Tomcat6 libs和/usr/share/java   java单元测试Android活动   java获取URL证书的屏幕截图   java如何为自定义类加载器加载的类提供工具?   FB墙上的java错误图片,来自安卓应用程序的反馈帖子(安卓 FB sdk)   从Intellij IDEA内部运行Tomcat时的java差异?   java TDD与不可能的例外   安卓 Java日期表示差异