GeoLib输入数据

geolibs-glutemulo的Python项目详细描述


臀肌

ha geo社会演示数据摄取器

用法

阅读deexamples files

我们使用环境变量。完整列表见Environ vars file example, 和例子。

使用producer将数据上传到kafka

参见下面的python示例。必须使用列mame:value生成dict

使用摄取器消费者

使用glutodocker并填充环境变量。

使用GLUTEMULO_BACKEND和它的特定变量(数据库、主机等)选择后端。 您可以选择2个后端:postgrescarto 完整列表见Environ vars file example

然后设置:

  1. GLUTEMULO_INGESTOR_DATASET
    要上载数据的表
  2. GLUTEMULO_INGESTOR_DATASET_COLUMNS
    逗号分隔的列名列表

现在,在后端创建表或设置GLUTEMULO_INGESTOR_DATASET_DDLGLUTEMULO_INGESTOR_DATASET_AUTOCREATE=False

然后为kafka配置摄取器。 首先阅读python-kafka doc 然后使用以下变量:

  1. GLUTEMULO_INGESTOR_TOPIC
    要使用的主题
  2. GLUTEMULO_INGESTOR_BOOTSTRAP_SERVERS
    要连接的服务器列表
  3. GLUTEMULO_INGESTOR_GROUP_ID
    组ID.
  4. GLUTEMULO_INGESTOR_AUTO_OFFSET_RESET
    最新的或最早的。
  5. GLUTEMULO_INGESTOR_MAX_POLL_RECORDS
    一批消息
  6. 中返回的最大记录数
  7. GLUTEMULO_INGESTOR_FETCH_MIN_BYTES
    服务器应为获取请求返回的最小数据量,否则请等待获取最大值等待毫秒,以便累积更多数据。默认值:1

对于docker,我们包含一个example docker-compose file。 请记住,您可以使用相同的组id进行缩放

docker-compose scale gluto=3

运行烧瓶演示

$ FLASK_ENV=development flask run
 * Environment: development
 * Debug mode: on
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 194-409-049

测试

$ http -j POST localhost:5000/v1/ uno=1dos=2`
HTTP/1.0 201 CREATED
Content-Length: 13
Content-Type: text/html;charset=utf-8
Date: Thu, 02 May 201914:56:07 GMT
Server: Werkzeug/0.15.2 Python/3.7.2

DATA Received

生产者/消费者

卡夫卡+json

异步生产者:

fromglutemulo.kafka.producerimportJsonKafkaproductor=JsonKafka(bootstrap_servers="localhost:9092")future=productor.produce('simple-topic',dict(dos='BB'))

成批消费:

fromglutemulo.kafka.consumerimportJsonKafkaconsumer=JsonKafka('simple-topic',bootstrap_servers="localhost:9092")formsginconsumer.consume():formsginmessages:print(msg)

卡夫卡+阿夫罗

同步制作人:

SCHEMA={"type":"record","name":"simpledata","doc":"This is a sample Avro schema to get you started.","fields":[{"name":"name","type":"string"},{"name":"number1","type":"int"},],}SCHEMA_ID=1
fromglutemulo.kafka.producerimportAvroKafkaasProducerproductor=Producer(SCHEMA,SCHEMA_ID,bootstrap_servers="localhost:9092")future=productor.produce('simple-topic-avro',dict(name='Un nombre',number1=10))

消费者:

fromglutemulo.kafka.consumerimportAvroKafkaasConsumerconsumer=Consumer('simple-topic-avro',SCHEMA,SCHEMA_ID,bootstrap_servers="localhost:9092")formessagesinconsumer.consume():formsginmessages:print(msg)

用于测试

可以使用kafka附带的kafka控制台使用者脚本设置kafka使用者。

$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.240:9092 --topic pylog --from-beginning

this is an awsome log

用卡夫卡测试

您可以使用名为kafkacat的应用程序。

安装应用程序后,我们将以使用者模式(这是默认模式)运行它。

kafkacat -b 192.168.240.41:9092 -t one-test

这应该还没有显示任何东西,因为我们还没有发送任何东西到我们的主题…

要发送内容,我们可以将任何文本文件复制到当前目录并发送到我们的卡夫卡主题。在另一个窗口中,运行以下命令。

$ cat README | kafkacat -b 192.168.240.41 -t one-test

您应该在第一个窗口中看到kafkacat仍在consumer模式下运行的输出。

链接

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
在MATLAB中生成java格式的矩阵   java混合图片,颜色为白色   java Apache Tomcat 7域问题代理设置   java无法从资产中读取csv文件   java为SpringFramework 5中的所有控制器和方法指定一个模式   java为我的应用程序打开html帮助页面   java中的条件语句。属性文件   数组Java彩票类   javascript允许Rhino使用当前项目中的Java类   java无法将ModelMap添加到会话   Java执行命令行程序   java这个电子邮件程序是如何工作的?   java自定义listview,视图问题   java将变量从JavaFx传递到JavaScript&编写代码来触发按钮   java从csv文件中删除记录   爪哇移动蝙蝠。重新油漆   socketjava断开连接   已解析页面的java HTML