将netcdf cf dsg文件回放到kafka主题上

ncreplayer的Python项目详细描述


ncreplayer

Build StatusLicense

一个小型实用程序,用于加载符合cf dsg的netcdf文件并以batch模式或stream模式将其回放到kafka主题上。可以选择使用配置参数控制原始文件中的时间戳和时间增量。

数据的格式如avro模式文件schema.avsc中所述。您可以选择将数据序列化为avromsgpack或默认的json

为什么?

这个工具是为了让我的生活更容易。它不适合许多人使用,但对以下方面很有用:

  • 以不同于文件中定义的时间和间隔回放netcdf文件中的数据。
  • 为负载测试设置快速数据流。
  • 将依赖静态文件的大型系统的部分转换为流处理。
  • 接受合作者提供的标准化数据格式(netcdf/cf/dsg),并能够将其传输到更大的系统中。

数据格式

最简单的示例

{"uid":"1","time":"2019-04-01T00:00:00Z","lat":30.5,"lon":-76.5,}

完整示例

{"uid":"1","gid":null,"time":"2019-04-01T00:00:00Z","lat":30.5,"lon":-76.5,"z":null,"values":{"salinity":30.2,"temperature":46.5},"meta":""}
  • values对象是可选的,是多类型avromap
  • meta是可选的、开放的。它打算携带元数据来描述values。我建议使用nco-json。如果监听这些消息的系统需要一些有关数据的上下文(如将数据流式传输到网站),则这非常有用。YMMV公司。

配置

这个程序使用^{}作为cli接口。我可能花了50%的时间让这个实用程序只是在玩cli界面。我不知道我想到的是令人惊叹的还是一堆垃圾。它起作用了。欢迎评论。

$ ncreplay
Usage: ncreplay [OPTIONS] FILENAME COMMAND [ARGS]...

Options:
  --brokers TEXT                 Kafka broker string (comman separated)[required]
  --topic TEXT                   Kafka topic to send the data to  [required]
  --packing [json|avro|msgpack]  The data packing algorithm to use
  --registry TEXT                URL to a Schema Registry if avro packing is
                                 requested
  --uid TEXT                     Variable name, global attribute, or value to
                                 use for the uid values
  --gid TEXT                     Variable name, global attribute, or value to
                                 use for the gid values
  --meta / --no-meta             Include the `nco-json` metadata in each
                                 message?
  --help                         Show this message and exit.

Commands:
  batch   Batch process a netCDF file in chunks, pausing every [chunk]...
  stream  Streams each unique timestep in the netCDF file every [delta]...

批次

$ ncreplay /path/to/file.nc batch --help
Usage: ncreplay batch [OPTIONS]

  Batch process a netCDF file in chunks, pausing every [chunk] records
  for[delta] seconds. Optionally change the [starting]time of the file
  and/or change each timedelta using the [factor] and [offset] parameters.

Options:
  -s, --starting [%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S]
  -f, --factor FLOAT
  -o, --offset FLOAT
  -d, --delta FLOAT
  -c, --chunk INTEGER
  --help                          Show this message and exit.

$ ncreplay /path/to/file.nc stream --help
Usage: ncreplay stream [OPTIONS]

  Streams each unqiue timestep in the netCDF file every [delta] seconds.
  Optionally you can control the [starting] point of the file and this will
  re-calculate all of the timestamps to match the original timedeltas.

Options:
  -s, --starting [%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S]
  -d, --delta FLOAT
  --help                          Show this message and exit

开发/测试

目前还没有测试,但您可以使用此存储库中包含的文件来使用这些选项

首先建立卡夫卡生态系统

$ docker run -d --net=host \
    -e ZK_PORT=50000\
    -e BROKER_PORT=4001\
    -e REGISTRY_PORT=4002\
    -e REST_PORT=4003\
    -e CONNECT_PORT=4004\
    -e WEB_PORT=4005\
    -e RUNTESTS=0\
    -e DISABLE=elastic,hbase \
    -e DISABLE_JMX=1\
    -e RUNTESTS=0\
    -e FORWARDLOGS=0\
    -e SAMPLEDATA=0\
    --name ncreplayer-testing \
  landoop/fast-data-dev:1.0.1

然后设置一个侦听器

$ docker run -it --rm --net=host \
  landoop/fast-data-dev:1.0.1  \
    kafka-console-consumer \
      --bootstrap-server localhost:4001 \
      --topic axds-ncreplayer-data

现在批处理或流式处理文件:

# Batch
$ ncreplay tests/data/gda_example.nc batch -d 10 -c 10# Stream
$ ncreplay tests/data/gda_example.nc stream -d 10

要测试avro打包,请设置一个将自动解压缩数据的侦听器:

$ docker run -it --rm --net=host \
  landoop/fast-data-dev:1.0.1  \
    kafka-avro-console-consumer \
      --bootstrap-server localhost:4001 \
      --property schema.registry.url=http://localhost:4002 \
      --topic axds-ncreplayer-data

使用avro包装

$ ncreplay --packing avro tests/data/gda_example.nc batch -d 10 -c 10

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java IntelliJ找不到依赖项选项卡   java向字符串数组string[]添加元素并在Junit中测试结果   如何在eclipse中获取活动java项目的名称   如何使用java在mysql中插入时间   java ArrayList更新了插入一行,但Jtable仍然没有刷新   如何在JavaSwing中命名坐标(点)   java Matcher/模式不打印   java错误地设置了arraylist   使用UsernamePasswordCredential提供程序的java列表Azure AD   java在HTTP请求中设置UTC时间   未加载事件:jquery完整日历Java集成   java Maven插件依赖项无法从内部repo解析依赖项   Maven更新重置Java版本   java如何向中添加图片。带有Apache POI XWPF的docx,但不指定其大小   Java最大函数递归