python的珍宝数据驱动程序

pytd的Python项目详细描述


Pytd

Build StatusBuild statusPyPI version

pytd为珍宝数据的REST APIsPresto query enginePlazma primary storage提供了用户友好的界面。

无缝连接允许您的python代码高效地读/写大量来自/到珍宝数据的数据。最终,pytd会让您的日常数据分析工作更有效率。

安装

pip install pytd

用法

API keyendpoint分别设置为环境变量TD_API_KEYTD_API_SERVER,并创建一个客户机实例:

importpytdclient=pytd.Client(database='sample_datasets')# or, hard-code your API key, endpoint, and/or query engine:# >>> pytd.Client(apikey='1/XXX', endpoint='https://api.treasuredata.com/', database='sample_datasets', default_engine='presto')

宝藏数据查询

发出presto查询并检索结果:

client.query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1')# {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ['AAME', 9252], ..., ['ZUMZ', 2364]]}

如果是蜂巢:

client.query('select hivemall_version()',engine='hive')# {'columns': ['_c0'], 'data': [['0.6.0-SNAPSHOT-201901-r01']]} (as of Feb, 2019)

也可以显式初始化配置单元的pytd.Client

client_hive=pytd.Client(database='sample_datasets',default_engine='hive')client_hive.query('select hivemall_version()')

将数据写入珍宝数据

表示为pandas.DataFrame的数据可以按如下方式写入珍宝数据:

importpandasaspddf=pd.DataFrame(data={'col1':[1,2],'col2':[3,10]})client.load_table_from_dataframe(df,'takuti.foo',writer='bulk_import',if_exists='overwrite')

对于writer选项,pytd支持三种不同的方法来摄取数据以珍藏数据:

  1. bulk import apibulk_import(默认)
    • 将数据转换为csv文件并以批处理方式上载。
  2. presto插入查询insert_into
    • 通过presto查询引擎发出insert-into查询,插入DataFrame中的每一行。
    • 建议仅用于少量数据。
  3. td-sparkspark
    • 本地定制的spark实例直接将DataFrame写入珍宝数据的主存储系统。

启用Spark Writer

由于td spark允许通过PySpark对主存储系统进行特殊访问,请遵循以下说明:

  1. 联系support@treasuredata.com以激活对您的宝藏数据帐户的权限。
  2. 如果使用第三个选项,请使用[spark]选项安装pytd:
    pip install pytd[spark]
如果您想使用现有的TD火花JAR文件,使用“{ > } > }选项”创建“{”>“}”是有帮助的。

frompytd.writerimportSparkWriterwriter=SparkWriter(apikey='1/XXX',endpoint='https://api.treasuredata.com/',td_spark_path='/path/to/td-spark-assembly.jar')client.load_table_from_dataframe(df,'mydb.bar',writer=writer,if_exists='overwrite')

DB-API

pytd在prestodb/presto-python-client的帮助下实现Python Database API Specification v2.0

首先连接到API:

frompytd.dbapiimportconnectconn=connect(pytd.Client(database='sample_datasets'))# or, connect with Hive:# >>> conn = connect(pytd.Client(database='sample_datasets', default_engine='hive'))
由规范定义的<> ^ {CD14>}允许我们灵活地从自定义函数获取查询结果:

defquery(sql,connection):cur=connection.cursor()cur.execute(sql)rows=cur.fetchall()columns=[desc[0]fordescincur.description]return{'data':rows,'columns':columns}query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1',conn)

下面是一个基于生成器的迭代检索示例,就像pandas.DataFrame.iterrows

defiterrows(sql,connection):cur=connection.cursor()cur.execute(sql)index=0columns=NonewhileTrue:row=cur.fetchone()ifrowisNone:breakifcolumnsisNone:columns=[desc[0]fordescincur.description]yieldindex,dict(zip(columns,row))index+=1forindex,rowiniterrows('select symbol, count(1) as cnt from nasdaq group by 1 order by 1',conn):print(index,row)# 0 {'cnt': 590, 'symbol': 'AAIT'}# 1 {'cnt': 82, 'symbol': 'AAL'}# 2 {'cnt': 9252, 'symbol': 'AAME'}# 3 {'cnt': 253, 'symbol': 'AAOI'}# 4 {'cnt': 5980, 'symbol': 'AAON'}# ...

如何更换熊猫TD

pytd提供了pandas-td兼容的函数,可以更有效地提供相同的功能。如果您仍在使用pandas td,我们建议您切换到pytd,如下所示。

首先,从pypi安装包:

pip install pytd
# or, `pip install pytd[spark]` if you wish to use `to_td`

接下来,对import语句进行以下修改。

之前:

importpandas_tdastd
In[1]:%%load_extpandas_td.ipython

之后:

importpytd.pandas_tdastd
In[1]:%%load_extpytd.pandas_td.ipython

因此,所有pandas_td代码都应该与pytd一起正常运行。如果发现任何不兼容的行为,请报告here中的问题。

使用现有的TD-SARPK汇编.JAR文件< EH3>

如果您想使用现有的TD SCAPAR JAR文件,则使用^ {CD13>}选项创建^ {CD12>}会很有帮助。您可以将writer传递给connect()函数。

importpytdimportpytd.pandas_tdastdimportpandasaspdapikey='1/XXX'endpoint='https://api.treasuredata.com/'writer=pytd.writer.SparkWriter(apikey=apikey,endpoint=endpoint,td_spark_path='/path/to/td-spark-assembly.jar')con=td.connect(apikey=apikey,endpoint=endpoint,writer=writer)df=pd.DataFrame(data={'col1':[1,2],'col2':[3,10]})td.to_td(df,'mydb.buzz',con,if_exists='replace',index=False)

对于开发人员

我们使用blackisort作为格式化程序,使用flake8作为linter。我们的ci与他们核对格式。

请注意,black需要python 3.6+,而pytd支持3.5+,因此开发时必须使用python3.6+。

我们强烈建议您引入pre-commit,以确保提交遵循所需的格式。

您可以按如下方式安装预提交:

pip install pre-commit
pre-commit install

现在,black、isort和flake8将在每次提交更改时进行检查。您可以使用git commit --no-verify跳过这些检查。

如果要手动检查代码格式,可以按如下方式安装:

pip install black isort flake8

然后,您可以手动运行这些工具;

black pytd
flake8 pytd
isort

您可以运行formatter、linter,并使用nox作为以下命令进行测试G:

pip install nox # You should install at the first time
nox

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java JavaFX TableView更新单元格,不更新对象值   在扫描器中使用分隔符的java   java OkHttp 4.9.2,连接无法重用,导致端口耗尽   eclipse中的c JNI:运行Java代码   java是否在出厂的所有硬件设备中都有/mnt/sdcard/Android/data文件夹(或等效文件夹)?   Java,在eclipse中访问资源文件夹中的图像   java为什么Bluemix dashDB操作抛出SqlSyntaxErrorException,SQLCODE=1667?   JavaHtmlUnitWebClient。getPage不处理javascript   Google API认证的java问题   java如何将JSON数组反序列化为Apache beam PCollection<javaObject>   ServerSocket停止接收命令,java/安卓   来自Java类的安卓 Toast消息   java如何自动重新加载应用程序引擎开发服务器?   java是否可以尝试/捕获一些东西来检查是否抛出了异常?   java如何做到这一点当我按下load game时,它不仅会加载信息,还会将您带到游戏中?   Java选项Xmx代表什么?   Java映射,它在插入时打印值   设置“ulimit c unlimited”后,java无法生成系统核心转储