数据建模管理和转换数据
dingDong的Python项目详细描述
叮咚
为开发和维护复杂数据集成项目而创建的模型-关系数据库 或云API集成、数据清理或建模算法。
该项目目前支持从不同的rmdbs批量加载。我们确实计划扩大它以获得充分的支持 在休息和网上也一样。如果项目完全是用python开发的,我们将使用node进行rest集成。
叮咚开发出了一种在使用每个组件的不同数据存储类型之间作为粘合剂的最佳实践。 例如,没有join或union实现,因为通常在连接器上以非常高效的方式使用此功能。 我们专注于正确管理元数据,帮助创建快速、易于管理的数据工作流。
通过将现有连接器的本机功能与ding dong结合使用,我们可以使用 所有组件的优点
叮咚有两个主要模块:
-
ding-为工作流中列出的所有对象创建和管理总体元数据结构
> UL> - 创建新对象
- 使用反向传播机制修改现有对象
- 将数据更新到新对象中
- 存储旧结构
- (TOdo)–>;将所有工作流程更改作为完整的CI/CD方法的一部分
有关叮咚的更多信息,请访问http://www.biskille.com(营销)或叮咚文档
安装
从github下载或使用pip安装
pip install dingDong
样品
下载示例csv文件dataelementdescription.csv、demographics.csv、measuresofbirthanddeath.csv 位于 sampleHealthcare/csvdata文件夹中。 在这个示例中,我们使用 c:\ dingdong 作为所有源csv文件和dingdong日志的主文件夹。
完整代码示例 extractcsvtosqllite.py 位于 示例/samplehealthcare/ 文件夹中
示例演示如何将三个csv文件加载到sqllite中,创建一个简单的基于查询的 并将结果发送到新的csv文件中。
- 加载模块和基本配置 < > >
- config.conn_url->;将连接url设置为所有连接器
- 键->;常规连接名称或连接类型(SQL、Oracle、文件..)
- 值->;可以是字符串或字典 *字符串->;连接字符串URL(键定义的连接类型:SQL、Oracle、MySQL….) *字典->;必须具有"conn"(连接类型)和"url"(连接字符串)。 可以在dingdong.misc.enumsjson.econn找到可用的连接
- config.logs\u debug->;设置日志级别(logging.debug,logging.warning…)
- config.logs目录->;设置用于创建日志文件的日志目录
- 创建工作流可以使用json格式或python字典 在下面的示例中,我们将使用python字典示例工作流包含 < > >
- 将名为dataelementdescription的csv文件映射并加载到名为dateelements的sqllite表中
- 将名为demographics的csv文件映射并加载到名为demographics的sqllite表中
- 将名为measuresofbirthanddeath的csv文件映射并加载到名为birthdate的sqllite表中
- 将基于人口统计和出生日期的新查询创建到名为final的新表中
- 使用直接pl/sql查询更新最终表中的示例字段
- 将最终表数据提取到csv文件中。 我们使用varchar(200)作为默认的csv列数据类型。默认情况下,可以在dingdong.conn.basebatch下找到配置
- 初始类叮咚 < > >
- dicobj->;将词典作为工作流加载
- dirdata->;正在加载此文件夹中的json文件
- includefiles->;筛选要加载到dirdata文件夹中的文件
- notincldefiles->;忽略要加载到dirdata文件夹中的文件
- conndict->;等于config.conn_url,st connection url
- 进程->;并行处理数,仅用于加载数据(DONG模块)
- 叮 < > >
- 基于csv文件创建日期元素、人口统计和出生日期表
- 基于定义的查询创建最终表
- dong-将csv文件中的数据提取到sqlite表中。默认加载为truncate->;insert method 将查询中的数据提取到最终表中(truncate->;insert) < > >
- 如果对象结构已更改,则模式2(如示例中所示) *将创建历史记录表 *将创建新对象,并用历史表中的数据(相同的列名)填充该对象。
配置属性可以在DingDong文档中找到
import logging from dingDong import DingDong from dingDong import Config """ set log level: logging.INFO, logging.DEBUG, logging.ERROR """ Config.LOGS_DEBUG = logging.DEBUG Config.CONN_URL = { 'sampleSql': {'conn': 'sql',"url": "<Sql server connection string>;UID=USER;PWD=PWD;"}, 'file': "C:\\dingDong\\", 'sqlite': "C:\\dingDong\\sqlLiteDB.db"}
nodesToLoad = [ {"source": ["file", "DATAELEMENTDESCRIPTION.csv"], "target": ["sqlite", "dateElements_Desc"]}, {"source": ["file", "DEMOGRAPHICS.csv"], "target": ["sqlite", "demographics"]}, {"source": ["file", "MEASURESOFBIRTHANDDEATH.csv"], "target": ["sqlite", "birthDate"]}, {"query": ["sqlite", """ Select d.[State_FIPS_Code] AS A, d.[County_FIPS_Code] AS B, d.[County_FIPS_Code] AS G,d.[County_FIPS_Code], d.[CHSI_County_Name], d.[CHSI_State_Name],[Population_Size],[Total_Births],[Total_Deaths] From demographics d INNER JOIN birthDate b ON d.[County_FIPS_Code] = b.[County_FIPS_Code] AND d.[State_FIPS_Code] = b.[State_FIPS_Code]"""], "target": ["sqlite", "Finall", 2]}, {"myexec": ["sqlite", "Update dateElements_Desc Set [Data_Type] = 'dingDong';"]}, {"source": ["sqlite", "Finall"], "target": ["file", "finall.csv"]} ]
m = DingDong(dicObj=nodesToLoad, filePath=None, dirData=None, includeFiles=None, notIncludeFiles=None, connDict=None, processes=1)
m.ding()
m.dong()
完整示例代码:
from dingDong import DingDong from dingDong import Config Config.CONN_URL = { 'x1' : {'conn':'sql',"url":"DRIVER={SQL Server};SERVER=CPX-VLQ5GA42TW2\SQLEXPRESS;DATABASE=ContosoRetailDW;UID=bpmk;PWD=bpmk;"}, 'x2' : {'conn':'sql',"url":"DRIVER={SQL Server};SERVER=CPX-VLQ5GA42TW2\SQLEXPRESS;DATABASE=ContosoRetailDW;UID=bpmk;PWD=bpmk;"}, 'file' : "C:\\dingDong\\", 'sqlite': "C:\\dingDong\\sqlLiteDB.db"} Config.LOGS_DEBUG = logging.DEBUG Config.LOGS_DIR = "C:\\dingDong" nodesToLoad = [ { "source":["file","DATAELEMENTDESCRIPTION.csv"], "target":["sqlite","dateElements_Desc"]}, { "source":["file","DEMOGRAPHICS.csv"], "target":["sqlite","demographics"]}, { "source":["file","MEASURESOFBIRTHANDDEATH.csv"], "target":["sqlite","birthDate"]}, { "query":["sqlite",""" Select d.[State_FIPS_Code] AS A, d.[County_FIPS_Code] AS B, d.[County_FIPS_Code] AS G,d.[County_FIPS_Code], d.[CHSI_County_Name], d.[CHSI_State_Name],[Population_Size],[Total_Births],[Total_Deaths] From demographics d INNER JOIN birthDate b ON d.[County_FIPS_Code] = b.[County_FIPS_Code] AND d.[State_FIPS_Code] = b.[State_FIPS_Code]"""], "target":["sqlite","Final", 2]}, { "myexec":["sqlite","Update dateElements_Desc Set [Data_Type] = 'dingDong';"]}, { "source":["sqlite","Final"], "target":["file","final.csv"]} ] m = DingDong(dicObj=nodesToLoad, filePath=None, dirData=None, includeFiles=None, notIncludeFiles=None, connDict=None, processes=1) m.ding() m.dong()
路线图
我们希望创建一个能够设计、实施、维护和数据集成项目的平台,例如:
- 使用简单的json映射从任何api到任何api的任何restapi连接
- 任何使用json映射的关系数据库连接
- 任何非关系存储
- 任何中间件业务逻辑的主平台-从示例if到使用ml和dl算法的统计算法
- 启用实时和计划集成
我们将相应地扩展连接器和元数据管理器。
批量支持的连接器
<表> < COLGROUP > < COL/> < COL/> < COL/> < COL/> < COL/> <广告> 连接器类型 python模块 选中的版本 开发状态 注释 < /广告> <正文> SQL语句 pyodbc 4.0.23 已测试,产品 提取速度慢,数据量大 首选使用ceodbc SQL语句 ceodbc 2.0.1 已测试,产品 用于海量数据加载的sql server连接 从第三部分文件夹手动安装 访问 pyodbc 4.0.23 已测试,产品作者
叮咚是由Tal Shany创建的 ( tal @ biskille com ) 我们正在寻找贡献!!!!
许可证
GNU通用公共许可v3.0
请参见复制以查看全文。