阿帕契熊蟒蛇变形系列
beam-nuggets的Python项目详细描述
关于
Apache beampython sdk的随机转换集合。很多是 简单的变换。最有用的是那些 从关系数据库读/写。
安装
- 使用pip
pip install beam-nuggets
- 来源
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
pip install .
支持的转换
IO
- relational_db.ReadFromDB 用于从关系数据库表中读取。
- relational_db.Write
用于写入关系数据库表。
上面的转换使用SqlAlchemy与数据库通信, 因此,它们可以读取和写入所有支持的关系数据库 由SqlAlchemy提供。 对postgresql、mysql和sqlite的转换are tested。 - kafkaio.KafkaProduce用于向卡夫卡主题写作。
- kafkaio.KafkaConsume用于消费卡夫卡主题。
- csvio.Read 用于读取csv文件。
其他
- SelectFromNestedDict 从嵌套字典形成的记录中选择子集。
- ParseJson
- AssignUniqueId
文档
见here。
用法
使用beam nugget将数据写入sqlite表 relational_db.Write转换。
# write_sqlite.py contentsimportapache_beamasbeamfromapache_beam.options.pipeline_optionsimportPipelineOptionsfrombeam_nuggets.ioimportrelational_dbrecords=[{'name':'Jan','num':1},{'name':'Feb','num':2}]source_config=relational_db.SourceConfiguration(drivername='sqlite',database='/tmp/months_db.sqlite',create_if_missing=True# create the database if not there )table_config=relational_db.TableConfiguration(name='months',create_if_missing=True,# automatically create the table if not thereprimary_key_columns=['num']# and use 'num' column as primary key)withbeam.Pipeline(options=PipelineOptions())asp:# Will use local runnermonths=p|"Reading month records">>beam.Create(records)months|'Writing to DB'>>relational_db.Write(source_config=source_config,table_config=table_config)
执行管道
python write_sqlite.py
检查内容
sqlite3 /tmp/months_db.sqlite 'select * from months'# output:# 1.0|Jan# 2.0|Feb
要将相同的数据写入postgresql表,只需创建一个合适的 relational_db.SourceConfiguration如下。
source_config=relational_db.SourceConfiguration(drivername='postgresql+pg8000',host='localhost',port=5432,username='postgres',password='password',database='calendar',create_if_missing=True# create the database if not there )
单击here
更多示例,包括在google云平台中编写postgresql
使用DataFlowRunner。
演示如何使用束核relational_db.ReadFromDB的示例
转换为从PostgreSQL数据库表中读取。
from__future__importprint_functionimportapache_beamasbeamfromapache_beam.options.pipeline_optionsimportPipelineOptionsfrombeam_nuggets.ioimportrelational_dbwithbeam.Pipeline(options=PipelineOptions())asp:source_config=relational_db.SourceConfiguration(drivername='postgresql+pg8000',host='localhost',port=5432,username='postgres',password='password',database='calendar',)records=p|"Reading records from db">>relational_db.ReadFromDB(source_config=source_config,table_name='months',query='select num, name from months'# optional. When omitted, all table records are returned. )records|'Writing to stdout'>>beam.Map(print)
有关更多示例,请参见here。
开发
- 安装
git clone git@github.com:mohaseeb/beam-nuggets.git cd beam-nuggets exportBEAM_NUGGETS_ROOT=`pwd` pip install -e .[dev]
- 在专用开发分支上进行更改
- 运行测试
cd$BEAM_NUGGETS_ROOT python -m unittest discover -v
- 生成文档
cd$BEAM_NUGGETS_ROOT docs/generate_docs.sh
- 创建一个针对master的pr。
- 合并已接受的PR并更新本地主机后,上载新的 建立到Pypi。
cd$BEAM_NUGGETS_ROOT scripts/build_test_deploy.sh
积压工作
- 版本化文档?
- 总结使用源/汇与pardo(和groupby)进行io的研究
- 更多掘金:writetocsv
- 调查sdf pardo的就绪性,以及是否可以用于relational数据库。readfromdb
- 集成测试
- 在IO转换上处理DB转换失败
- 更多掘金:ElasticSearch,Mongo
- 写入关系数据库,记录
出资人
许可证
麻省理工学院