GeoLib输入数据
geolibs-glutemulo的Python项目详细描述
臀肌
ha geo社会演示数据摄取器
用法
阅读deexamples files。
我们使用环境变量。完整列表见Environ vars file example, 和例子。
使用producer将数据上传到kafka
参见下面的python示例。必须使用列mame:value生成dict
使用摄取器消费者
使用gluto
docker并填充环境变量。
使用GLUTEMULO_BACKEND
和它的特定变量(数据库、主机等)选择后端。
您可以选择2个后端:postgres
或carto
完整列表见Environ vars file example。
然后设置:
GLUTEMULO_INGESTOR_DATASET
要上载数据的表GLUTEMULO_INGESTOR_DATASET_COLUMNS
逗号分隔的列名列表
现在,在后端创建表或设置GLUTEMULO_INGESTOR_DATASET_DDL
和GLUTEMULO_INGESTOR_DATASET_AUTOCREATE=False
然后为kafka配置摄取器。 首先阅读python-kafka doc 然后使用以下变量:
GLUTEMULO_INGESTOR_TOPIC
要使用的主题GLUTEMULO_INGESTOR_BOOTSTRAP_SERVERS
要连接的服务器列表GLUTEMULO_INGESTOR_GROUP_ID
组ID.GLUTEMULO_INGESTOR_AUTO_OFFSET_RESET
最新的或最早的。GLUTEMULO_INGESTOR_MAX_POLL_RECORDS
一批消息 中返回的最大记录数
GLUTEMULO_INGESTOR_FETCH_MIN_BYTES
服务器应为获取请求返回的最小数据量,否则请等待获取最大值等待毫秒,以便累积更多数据。默认值:1
对于docker,我们包含一个example docker-compose file。 请记住,您可以使用相同的组id进行缩放
docker-compose scale gluto=3
运行烧瓶演示
$ FLASK_ENV=development flask run * Environment: development * Debug mode: on * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit) * Restarting with stat * Debugger is active! * Debugger PIN: 194-409-049
测试
$ http -j POST localhost:5000/v1/ uno=1dos=2` HTTP/1.0 201 CREATED Content-Length: 13 Content-Type: text/html;charset=utf-8 Date: Thu, 02 May 201914:56:07 GMT Server: Werkzeug/0.15.2 Python/3.7.2 DATA Received
生产者/消费者
卡夫卡+json
异步生产者:
fromglutemulo.kafka.producerimportJsonKafkaproductor=JsonKafka(bootstrap_servers="localhost:9092")future=productor.produce('simple-topic',dict(dos='BB'))
成批消费:
fromglutemulo.kafka.consumerimportJsonKafkaconsumer=JsonKafka('simple-topic',bootstrap_servers="localhost:9092")formsginconsumer.consume():formsginmessages:print(msg)
卡夫卡+阿夫罗
同步制作人:
SCHEMA={"type":"record","name":"simpledata","doc":"This is a sample Avro schema to get you started.","fields":[{"name":"name","type":"string"},{"name":"number1","type":"int"},],}SCHEMA_ID=1
fromglutemulo.kafka.producerimportAvroKafkaasProducerproductor=Producer(SCHEMA,SCHEMA_ID,bootstrap_servers="localhost:9092")future=productor.produce('simple-topic-avro',dict(name='Un nombre',number1=10))
消费者:
fromglutemulo.kafka.consumerimportAvroKafkaasConsumerconsumer=Consumer('simple-topic-avro',SCHEMA,SCHEMA_ID,bootstrap_servers="localhost:9092")formessagesinconsumer.consume():formsginmessages:print(msg)
用于测试
可以使用kafka附带的kafka控制台使用者脚本设置kafka使用者。
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.240:9092 --topic pylog --from-beginning
this is an awsome log
用卡夫卡测试
您可以使用名为kafkacat的应用程序。
安装应用程序后,我们将以使用者模式(这是默认模式)运行它。
kafkacat -b 192.168.240.41:9092 -t one-test
这应该还没有显示任何东西,因为我们还没有发送任何东西到我们的主题…
要发送内容,我们可以将任何文本文件复制到当前目录并发送到我们的卡夫卡主题。在另一个窗口中,运行以下命令。
$ cat README | kafkacat -b 192.168.240.41 -t one-test
您应该在第一个窗口中看到kafkacat仍在consumer模式下运行的输出。