分布式数据任务中数据连接器的统一
tentaclio的Python项目详细描述
Tentaclio
简化的python库:
- 处理来自不同协议的流,如
file:
,ftp:
,sftp:
,s3:
,… - 正在打开数据库连接。
- 管理分布式系统中的凭据。
设计中的主要考虑事项:
- 易于使用:所有流都通过
tentaclio.open
打开,所有数据库连接都通过tentaclio.db
打开。 - url是基本的资源定位器和db连接字符串。
- 受保护资源的自动逻辑身份验证。
- 可扩展:您可以为其他方案添加自己的处理程序。
- 熊猫互动。
快速示例。
读写流。
importtentacliocontents="? ?"withtentaclio.open("ftp://localhost:2021/upload/file.txt",mode="w")aswriter:writer.write(contents)# Using boto3 authentication under the hood.bucket="s3://my-bucket/octopus/hello.txt"withtentaclio.open(bucket)asreader:print(reader.read())
复制流
importtentacliowithtentaclio.open("/home/constantine/data.csv")asreader,tentaclio.open("sftp://constantine:tentacl3@sftp.octoenergy.com/uploads/data.csv",mode="w")aswriter:)aswriter:writer.write(reader.read())
列出资源
importtentaclioforentryintentaclio.listdir("s3:://mybucket/path/to/dir"):print("Entry",entry)
经过身份验证的资源。
importosimporttentaclioprint("env ftp credentials",os.getenv("OCTOIO__CONN__OCTOENERGY_FTP"))# This prints `sftp://constantine:tentacl3@sftp.octoenergy.com/` # Credentials get automatically injected. withtentaclio.open("sftp://sftp.octoenergy.com/uploads/data.csv")asreader:print(reader.read())
数据库连接。
importosimporttentaclioprint("env TENTACLIO__CONN__DB",os.getenv("TENTACLIO__CONN__DB"))# This prints `postgresql://octopus:tentacle@localhost:5444/example` # hostname is a wildcard, the credentials get injected. withtentaclio.db("postgresql://hostname/example")aspg:results=pg.query("select * from my_table")
熊猫互动。
importpandasaspd# ?? importtentaclio# ? df=pd.DataFrame([[1,2,3],[10,20,30]],columns=["col_1","col_2","col_3"])bucket="s3://my-bucket/data/pandas.csv"withtentaclio.open(bucket,mode="w")aswriter:# supports more pandas readers df.to_csv(writer,index=False)withtentaclio.open(bucket)asreader:new_df=pd.read_csv(reader)
安装
您可以使用pip获得tentaclio
pip install tentaclio
或pipenv
pipenv install tentaclio
正在开发。
克隆此repo并安装pipenv:
在Makefile
中,您将找到一些有用的目标,用于绒线、测试等,例如:
make test
如何使用
这就是如何使用tentaclio
来满足您的日常数据摄取和存储需求。
流
为了打开流以加载或存储数据,通用函数是:
importtentacliowithtentaclio.open("/path/to/my/file")asreader:contents=reader.read()withtentaclio.open("s3://bucket/file",mode='w')aswriter:writer.write(contents)
允许的模式有r
、w
、rb
和wb
。可以使用t
而不是b
来指示文本流,但这是默认值。
支持的URL协议有:
/local/file
file:///local/file
s3://bucket/file
ftp://path/to/file
sftp://path/to/file
http://host.com/path/to/resource
https://host.com/path/to/resource
postgresql://host/database::table
将允许您从csv格式写入具有相同列名的数据库(请注意,该表位于::
:warning:)之后。
您可以为任何URL添加凭据,以便访问受保护的资源。
您可以将这些读写器与pandas函数一起使用,如:
importpandasaspdimporttentacliowithtentaclio.open("/path/to/my/file")asreader:df=pd.read_csv(reader)[...]withtentaclio.open("s3::/path/to/my/file",mode='w')aswriter:df.to_parquet(writer)
Readers
、Writers
及其可关闭版本可用于任何需要类似文件的对象的地方;pandas或pickle就是此类函数的示例。
对资源进行类似文件系统的操作
列出资源
有些url方案允许以pythonnic方式列出资源:
importtentaclioforentryintentaclio.listdir("s3:://mybucket/path/to/dir"):print("Entry",entry)
鉴于listdir
可能更方便,我们还提供scandir
,它返回DirEntrys和walk
的列表。所有函数都尽可能地遵循其标准库定义。
数据库访问
为了打开数据库连接,您可以使用tentaclio.db
,并可以立即访问postgres、sqlite、athena和mssql。
importtentaclio[...]query="select 1";withtentaclio.db(POSTGRES_TEST_URL)asclient:result=client.query(query)[...]
支持的数据库方案是:
postgresql://
sqlite://
awsathena+rest://
mssql://
自动凭证注入
使用前缀为
TENTACLIO__CONN__
(即TENTACLIO__CONN__DATA_FTP=sfpt://real_user:132ldsf@ftp.octoenergy.com
)的环境变量配置凭据。打开流:
withtentaclio.open("sftp://ftp.octoenergy.com/file.csv")asreader:reader.read()
凭据将被注入到url中。
- 打开数据库客户端:
importtentacliowithtentaclio.db("postgresql://hostname/my_data_base")asclient:client.query("select 1")
注意,要验证的url中的hostname
是一个通配符,它将匹配任何主机名。如果{{CD39>}的证书存在,那么^ {< CD37>}将被注入到^ {CD38>}。
URL的不同组件设置不同:
- 方案和路径将从url设置,如果缺少则为空。
- 用户名、密码和主机名将从存储的凭据中设置。
凭证文件
您还可以设置一个凭据文件,其外观如下:
secrets:
db_1: postgresql://user1:pass1@myhost.com/database_1
db_2: postgresql://user2:pass2@otherhost.com/database_2
ftp_server: ftp://fuser:fpass@ftp.myhost.com
让它进入通过设置环境变量TENTACLIO__SECRETS_FILE
可以实现tentaclio。每个url的实际名称用于跟踪,对功能没有影响。
协议结构子类型快速说明。
为了从数据相关函数的实现中(或者在系统的任何部分中)抽象出具体的依赖关系,我们使用类型protocols。这比使用子类或more complex approches允许更灵活的依赖注入。这个想法很大程度上受到了在go中如何做这件事的启发。在我们的tech blog中了解有关此原则的更多信息。