一个用于druid的python连接器。

pydruid的Python项目详细描述


皮德鲁伊

pydruid公开了一个简单的api来创建、执行和分析Druid查询。pydruid可以将查询结果解析为Pandasdataframe对象,用于后续的数据分析——这提供了DruidSciPy堆栈(用于科学计算)和scikit-learn(用于机器学习)之间的紧密集成。pydruid可以将查询结果导出到tsv或json中,以便使用您喜欢的工具(例如r、julia、matlab、excel)进行进一步处理。它提供同步和异步客户端。

另外,pydruid实现了Python DB API 2.0、aSQLAlchemy dialect,并且a提供了与druid交互的命令行接口。

要安装:

pipinstallpydruid# or, if you intend to use asynchronous clientpipinstallpydruid[async]# or, if you intend to export query results into pandaspipinstallpydruid[pandas]# or, if you intend to do bothpipinstallpydruid[async,pandas]# or, if you want to use the SQLAlchemy enginepipinstallpydruid[sqlalchemy]# or, if you want to use the CLIpipinstallpydruid[cli]

文档:https://pythonhosted.org/pydruid/

示例

下面的示例演示如何执行和分析三种查询类型的结果:timeseries、topn和groupby。我们将使用这些查询来询问关于twitter公共数据集的简单问题。

时间序列

2014年索契奥运会期间,每天的平均微博长度是多少?

frompydruid.clientimport*frompylabimportpltquery=PyDruid(druid_url_goes_here,'druid/v2')ts=query.timeseries(datasource='twitterstream',granularity='day',intervals='2014-02-02/p4w',aggregations={'length':doublesum('tweet_length'),'count':doublesum('count')},post_aggregations={'avg_tweet_length':(Field('length')/Field('count'))},filter=Dimension('first_hashtag')=='sochi2014')df=query.export_pandas()df['timestamp']=df['timestamp'].map(lambdax:x.split('T')[0])df.plot(x='timestamp',y='avg_tweet_length',ylim=(80,140),rot=20,title='Sochi 2014')plt.ylabel('avg tweet length (chars)')plt.show()

alt text

顶部

在2014年奥斯卡颁奖典礼上,谁是最受关注的十大人物(@user_name)?

top=query.topn(datasource='twitterstream',granularity='all',intervals='2014-03-03/p1d',# utc time of 2014 oscarsaggregations={'count':doublesum('count')},dimension='user_mention_name',filter=(Dimension('user_lang')=='en')&(Dimension('first_hashtag')=='oscars')&(Dimension('user_time_zone')=='Pacific Time (US & Canada)')&~(Dimension('user_mention_name')=='No Mention'),metric='count',threshold=10)df=query.export_pandas()printdfcounttimestampuser_mention_name013032014-03-03T00:00:00.000ZTheEllenShow1442014-03-03T00:00:00.000ZTheAcademy2212014-03-03T00:00:00.000ZMTV3212014-03-03T00:00:00.000Zpeoplemag4172014-03-03T00:00:00.000ZTHR5162014-03-03T00:00:00.000ZItsQueenElsa6162014-03-03T00:00:00.000Zeonline7152014-03-03T00:00:00.000ZPerezHilton8142014-03-03T00:00:00.000Zrealjohngreen9122014-03-03T00:00:00.000ZKevinSpacey

分组方式

用户回复其他用户的社交网络是什么样子的?

fromigraphimport*fromcairoimport*frompandasimportconcatgroup=query.groupby(datasource='twitterstream',granularity='hour',intervals='2013-10-04/pt12h',dimensions=["user_name","reply_to_name"],filter=(~(Dimension("reply_to_name")=="Not A Reply"))&(Dimension("user_location")=="California"),aggregations={"count":doublesum("count")})df=query.export_pandas()# map names to categorical variables with a lookup tablenames=concat([df['user_name'],df['reply_to_name']]).unique()nameLookup=dict([pair[::-1]forpairinenumerate(names)])df['user_name_lookup']=df['user_name'].map(nameLookup.get)df['reply_to_name_lookup']=df['reply_to_name'].map(nameLookup.get)# create the graph with igraphg=Graph(len(names),directed=False)vertices=zip(df['user_name_lookup'],df['reply_to_name_lookup'])g.vs["name"]=namesg.add_edges(vertices)layout=g.layout_fruchterman_reingold()plot(g,"tweets.png",layout=layout,vertex_size=2,bbox=(400,400),margin=25,edge_width=1,vertex_color="blue")

alt text

异步客户端

pydruid.async_client.AsyncPyDruid实现异步客户端。为了实现这一点,它使用了异步 来自Tornado框架的http客户端。异步客户机适合与异步框架(如tornado)一起使用 在规模上提供更好的性能。它允许您同时服务多个请求,而不阻塞 德鲁伊正在执行你的查询。

示例

fromtornadoimportgenfrompydruid.async_clientimportAsyncPyDruidfrompydruid.utils.aggregatorsimportlongsumfrompydruid.utils.filtersimportDimensionclient=AsyncPyDruid(url_to_druid_broker,'druid/v2')@gen.coroutinedefyour_asynchronous_method_serving_top10_mentions_for_day(daytop_mentions=yieldclient.topn(datasource='twitterstream',granularity='all',intervals="%s/p1d"%(day,),aggregations={'count':doublesum('count')},dimension='user_mention_name',filter=(Dimension('user_lang')=='en')&(Dimension('first_hashtag')=='oscars')&(Dimension('user_time_zone')=='Pacific Time (US & Canada)')&~(Dimension('user_mention_name')=='No Mention'),metric='count',threshold=10)# asynchronously return results# can be simply ```return top_mentions``` in python 3.xraisegen.Return(top_mentions)

垫圈

Theta Sketch Post聚合器的构建与普通Post聚合器略有不同,因为它们有不同的运算符。 注意:您必须将druid-datasketches扩展加载到您的druid集群中才能使用它们。 有关详细信息,请参阅Druid datasketches文档。

frompydruid.clientimport*frompydruid.utilsimportaggregatorsfrompydruid.utilsimportfiltersfrompydruid.utilsimportpostaggregatorquery=PyDruid(url_to_druid_broker,'druid/v2')ts=query.groupby(datasource='test_datasource',granularity='all',intervals='2016-09-01/P1M',filter=(filters.Dimension('product').in_(['product_A','product_B'])),aggregations={'product_A_users':aggregators.filtered(filters.Dimension('product')=='product_A',aggregators.thetasketch('user_id')),'product_B_users':aggregators.filtered(filters.Dimension('product')=='product_B',aggregators.thetasketch('user_id'))},post_aggregations={'both_A_and_B':postaggregator.ThetaSketchEstimate(postaggregator.ThetaSketch('product_A_users')&postaggregator.ThetaSketch('product_B_users'))})

数据库API

frompydruid.dbimportconnectconn=connect(host='localhost',port=8082,path='/druid/v2/sql/',scheme='http')curs=conn.cursor()curs.execute("""    SELECT place,           CAST(REGEXP_EXTRACT(place, '(.*),', 1) AS FLOAT) AS lat,           CAST(REGEXP_EXTRACT(place, ',(.*)', 1) AS FLOAT) AS lon      FROM places     LIMIT 10""")forrowincurs:print(row)

炼金术

fromsqlalchemyimport*fromsqlalchemy.engineimportcreate_enginefromsqlalchemy.schemaimport*engine=create_engine('druid://localhost:8082/druid/v2/sql/')# uses HTTP by default :(# engine = create_engine('druid+http://localhost:8082/druid/v2/sql/')# engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')places=Table('places',MetaData(bind=engine),autoload=True)print(select([func.count('*')],from_obj=places).scalar())

列标题

在0.13.0版本中,druid sql增加了对在 可通过请求中的“header”字段请求的响应。这个 有助于确保定义了游标描述(这是一项要求 对于sqlalchemy查询语句),无论结果集是否包含 任何一排。历史上,这对于不包含 一行无法推断出所需的列名。

通过使用查询,可以通过sqlalchemy uri配置启用头 参数,即

engine=create_engine('druid://localhost:8082/druid/v2/sql?header=true')

注意当前的默认值是false,以确保向后兼容,但是应该 对于druid版本,设置为true;=0.13.0。

命令行

$ pydruid http://localhost:8082/druid/v2/sql/
> SELECT COUNT(*) AS cnt FROM places
  cnt
-----
12345
> SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES;
TABLE_NAME
----------
test_table
COLUMNS
SCHEMATA
TABLES
> BYE;
GoodBye!

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
使用jaxrpc的Java eclipse WebService客户端   java编程方式在对象上写入名称   java Spring批处理:重试后跳过   java Android错误:错误:任务执行失败:应用程序:transformClassesWithDexForDebug'   带有清单文件nullPointerException的java Android元数据   spring Java Quartz调度作业停止运行   JavaMockito:如何在不调用实际方法的情况下,模拟带有参数和无效返回类型的静态方法?   java Tomcat连接池问题无法在关闭的连接上调用方法   java如何交换列表中的项目?   java如何停止线程并通过Toast在线程中正确显示文本?   java为什么连续写入OutputStream时偏移量0不会导致重复字节?   java我无法生成头文件   不兼容的返回类型错误java   修改值后键值对的java Jolt转换规范   java有自动更新Javadoc的工具吗?   java线程如何在ints自身实例类中共享变量   java继承一个非gwt模块   java Hibernate xml配置   使用netty4异步调用的java链接HTTP请求响应