将netcdf cf dsg文件回放到kafka主题上
ncreplayer的Python项目详细描述
ncreplayer
一个小型实用程序,用于加载符合cf dsg的netcdf文件并以batch
模式或stream
模式将其回放到kafka主题上。可以选择使用配置参数控制原始文件中的时间戳和时间增量。
数据的格式如avro模式文件schema.avsc
中所述。您可以选择将数据序列化为avro
、msgpack
或默认的json
。
为什么?
这个工具是为了让我的生活更容易。它不适合许多人使用,但对以下方面很有用:
- 以不同于文件中定义的时间和间隔回放netcdf文件中的数据。
- 为负载测试设置快速数据流。
- 将依赖静态文件的大型系统的部分转换为流处理。
- 接受合作者提供的标准化数据格式(netcdf/cf/dsg),并能够将其传输到更大的系统中。
数据格式
最简单的示例
{"uid":"1","time":"2019-04-01T00:00:00Z","lat":30.5,"lon":-76.5,}
完整示例
{"uid":"1","gid":null,"time":"2019-04-01T00:00:00Z","lat":30.5,"lon":-76.5,"z":null,"values":{"salinity":30.2,"temperature":46.5},"meta":""}
values
对象是可选的,是多类型avromap
。meta
是可选的、开放的。它打算携带元数据来描述values
。我建议使用nco-json
。如果监听这些消息的系统需要一些有关数据的上下文(如将数据流式传输到网站),则这非常有用。YMMV公司。
配置
这个程序使用^{
$ ncreplay Usage: ncreplay [OPTIONS] FILENAME COMMAND [ARGS]... Options: --brokers TEXT Kafka broker string (comman separated)[required] --topic TEXT Kafka topic to send the data to [required] --packing [json|avro|msgpack] The data packing algorithm to use --registry TEXT URL to a Schema Registry if avro packing is requested --uid TEXT Variable name, global attribute, or value to use for the uid values --gid TEXT Variable name, global attribute, or value to use for the gid values --meta / --no-meta Include the `nco-json` metadata in each message? --help Show this message and exit. Commands: batch Batch process a netCDF file in chunks, pausing every [chunk]... stream Streams each unique timestep in the netCDF file every [delta]...
批次
$ ncreplay /path/to/file.nc batch --help Usage: ncreplay batch [OPTIONS] Batch process a netCDF file in chunks, pausing every [chunk] records for[delta] seconds. Optionally change the [starting]time of the file and/or change each timedelta using the [factor] and [offset] parameters. Options: -s, --starting [%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S] -f, --factor FLOAT -o, --offset FLOAT -d, --delta FLOAT -c, --chunk INTEGER --help Show this message and exit.
流
$ ncreplay /path/to/file.nc stream --help Usage: ncreplay stream [OPTIONS] Streams each unqiue timestep in the netCDF file every [delta] seconds. Optionally you can control the [starting] point of the file and this will re-calculate all of the timestamps to match the original timedeltas. Options: -s, --starting [%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S] -d, --delta FLOAT --help Show this message and exit
开发/测试
目前还没有测试,但您可以使用此存储库中包含的文件来使用这些选项
首先建立卡夫卡生态系统
$ docker run -d --net=host \ -e ZK_PORT=50000\ -e BROKER_PORT=4001\ -e REGISTRY_PORT=4002\ -e REST_PORT=4003\ -e CONNECT_PORT=4004\ -e WEB_PORT=4005\ -e RUNTESTS=0\ -e DISABLE=elastic,hbase \ -e DISABLE_JMX=1\ -e RUNTESTS=0\ -e FORWARDLOGS=0\ -e SAMPLEDATA=0\ --name ncreplayer-testing \ landoop/fast-data-dev:1.0.1
然后设置一个侦听器
$ docker run -it --rm --net=host \ landoop/fast-data-dev:1.0.1 \ kafka-console-consumer \ --bootstrap-server localhost:4001 \ --topic axds-ncreplayer-data
现在批处理或流式处理文件:
# Batch $ ncreplay tests/data/gda_example.nc batch -d 10 -c 10# Stream $ ncreplay tests/data/gda_example.nc stream -d 10
要测试avro
打包,请设置一个将自动解压缩数据的侦听器:
$ docker run -it --rm --net=host \ landoop/fast-data-dev:1.0.1 \ kafka-avro-console-consumer \ --bootstrap-server localhost:4001 \ --property schema.registry.url=http://localhost:4002 \ --topic axds-ncreplayer-data
使用avro
包装
$ ncreplay --packing avro tests/data/gda_example.nc batch -d 10 -c 10