python的珍宝数据驱动程序

pytd的Python项目详细描述


Pytd

Build StatusBuild statusPyPI version

pytd为珍宝数据的REST APIsPresto query enginePlazma primary storage提供了用户友好的界面。

无缝连接允许您的python代码高效地读/写大量来自/到珍宝数据的数据。最终,pytd会让您的日常数据分析工作更有效率。

安装

pip install pytd

用法

API keyendpoint分别设置为环境变量TD_API_KEYTD_API_SERVER,并创建一个客户机实例:

importpytdclient=pytd.Client(database='sample_datasets')# or, hard-code your API key, endpoint, and/or query engine:# >>> pytd.Client(apikey='1/XXX', endpoint='https://api.treasuredata.com/', database='sample_datasets', default_engine='presto')

宝藏数据查询

发出presto查询并检索结果:

client.query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1')# {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ['AAME', 9252], ..., ['ZUMZ', 2364]]}

如果是蜂巢:

client.query('select hivemall_version()',engine='hive')# {'columns': ['_c0'], 'data': [['0.6.0-SNAPSHOT-201901-r01']]} (as of Feb, 2019)

也可以显式初始化配置单元的pytd.Client

client_hive=pytd.Client(database='sample_datasets',default_engine='hive')client_hive.query('select hivemall_version()')

将数据写入珍宝数据

表示为pandas.DataFrame的数据可以按如下方式写入珍宝数据:

importpandasaspddf=pd.DataFrame(data={'col1':[1,2],'col2':[3,10]})client.load_table_from_dataframe(df,'takuti.foo',writer='bulk_import',if_exists='overwrite')

对于writer选项,pytd支持三种不同的方法来摄取数据以珍藏数据:

  1. bulk import apibulk_import(默认)
    • 将数据转换为csv文件并以批处理方式上载。
  2. presto插入查询insert_into
    • 通过presto查询引擎发出insert-into查询,插入DataFrame中的每一行。
    • 建议仅用于少量数据。
  3. td-sparkspark
    • 本地定制的spark实例直接将DataFrame写入珍宝数据的主存储系统。

启用Spark Writer

由于td spark允许通过PySpark对主存储系统进行特殊访问,请遵循以下说明:

  1. 联系support@treasuredata.com以激活对您的宝藏数据帐户的权限。
  2. 如果使用第三个选项,请使用[spark]选项安装pytd:
    pip install pytd[spark]
如果您想使用现有的TD火花JAR文件,使用“{ > } > }选项”创建“{”>“}”是有帮助的。

frompytd.writerimportSparkWriterwriter=SparkWriter(apikey='1/XXX',endpoint='https://api.treasuredata.com/',td_spark_path='/path/to/td-spark-assembly.jar')client.load_table_from_dataframe(df,'mydb.bar',writer=writer,if_exists='overwrite')

DB-API

pytd在prestodb/presto-python-client的帮助下实现Python Database API Specification v2.0

首先连接到API:

frompytd.dbapiimportconnectconn=connect(pytd.Client(database='sample_datasets'))# or, connect with Hive:# >>> conn = connect(pytd.Client(database='sample_datasets', default_engine='hive'))
由规范定义的<> ^ {CD14>}允许我们灵活地从自定义函数获取查询结果:

defquery(sql,connection):cur=connection.cursor()cur.execute(sql)rows=cur.fetchall()columns=[desc[0]fordescincur.description]return{'data':rows,'columns':columns}query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1',conn)

下面是一个基于生成器的迭代检索示例,就像pandas.DataFrame.iterrows

defiterrows(sql,connection):cur=connection.cursor()cur.execute(sql)index=0columns=NonewhileTrue:row=cur.fetchone()ifrowisNone:breakifcolumnsisNone:columns=[desc[0]fordescincur.description]yieldindex,dict(zip(columns,row))index+=1forindex,rowiniterrows('select symbol, count(1) as cnt from nasdaq group by 1 order by 1',conn):print(index,row)# 0 {'cnt': 590, 'symbol': 'AAIT'}# 1 {'cnt': 82, 'symbol': 'AAL'}# 2 {'cnt': 9252, 'symbol': 'AAME'}# 3 {'cnt': 253, 'symbol': 'AAOI'}# 4 {'cnt': 5980, 'symbol': 'AAON'}# ...

如何更换熊猫TD

pytd提供了pandas-td兼容的函数,可以更有效地提供相同的功能。如果您仍在使用pandas td,我们建议您切换到pytd,如下所示。

首先,从pypi安装包:

pip install pytd
# or, `pip install pytd[spark]` if you wish to use `to_td`

接下来,对import语句进行以下修改。

之前:

importpandas_tdastd
In[1]:%%load_extpandas_td.ipython

之后:

importpytd.pandas_tdastd
In[1]:%%load_extpytd.pandas_td.ipython

因此,所有pandas_td代码都应该与pytd一起正常运行。如果发现任何不兼容的行为,请报告here中的问题。

使用现有的TD-SARPK汇编.JAR文件< EH3>

如果您想使用现有的TD SCAPAR JAR文件,则使用^ {CD13>}选项创建^ {CD12>}会很有帮助。您可以将writer传递给connect()函数。

importpytdimportpytd.pandas_tdastdimportpandasaspdapikey='1/XXX'endpoint='https://api.treasuredata.com/'writer=pytd.writer.SparkWriter(apikey=apikey,endpoint=endpoint,td_spark_path='/path/to/td-spark-assembly.jar')con=td.connect(apikey=apikey,endpoint=endpoint,writer=writer)df=pd.DataFrame(data={'col1':[1,2],'col2':[3,10]})td.to_td(df,'mydb.buzz',con,if_exists='replace',index=False)

对于开发人员

我们使用blackisort作为格式化程序,使用flake8作为linter。我们的ci与他们核对格式。

请注意,black需要python 3.6+,而pytd支持3.5+,因此开发时必须使用python3.6+。

我们强烈建议您引入pre-commit,以确保提交遵循所需的格式。

您可以按如下方式安装预提交:

pip install pre-commit
pre-commit install

现在,black、isort和flake8将在每次提交更改时进行检查。您可以使用git commit --no-verify跳过这些检查。

如果要手动检查代码格式,可以按如下方式安装:

pip install black isort flake8

然后,您可以手动运行这些工具;

black pytd
flake8 pytd
isort

您可以运行formatter、linter,并使用nox作为以下命令进行测试G:

pip install nox # You should install at the first time
nox

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java窗口。位置和窗口。公开问题   java如何从存储在ArrayList<Node>中的动态生成的文本字段中获取文本?   java如何立即关闭InputStream?   如何重新启动Java程序以激活环境变量   java搜索字符串是否相差一个字符   java CFB模式输出与CTR输出相同;我做错什么了吗?   java如何在javaFX中将实例化对象添加到Stage   java如何在jtextarea上打印来自不同类的文本消息   java以编程方式确定IOException的原因?   限制Java NIO通道(文件或socket)中的可用内容   javajaxb与JDOM:是否可以使用JAXB更新xml文件   批处理文件到java测试   JavaFX:stage的作用是什么。可设置大小(false)是否会导致额外的页边距?   java有没有办法告诉IntelliJ按需堆叠参数?   java Seam会话范围的组件在下一个请求中消失   java Google Web Toolkit对开发复杂的java脚本有用吗?   安卓 studio java ArrayList正在检索最高值   java为什么递归地用随机数填充LinkedList时会出现StackOverflowException?