一个用于druid的python连接器。
pydruid的Python项目详细描述
皮德鲁伊
pydruid公开了一个简单的api来创建、执行和分析Druid查询。pydruid可以将查询结果解析为Pandasdataframe对象,用于后续的数据分析——这提供了Druid、SciPy堆栈(用于科学计算)和scikit-learn(用于机器学习)之间的紧密集成。pydruid可以将查询结果导出到tsv或json中,以便使用您喜欢的工具(例如r、julia、matlab、excel)进行进一步处理。它提供同步和异步客户端。
另外,pydruid实现了Python DB API 2.0、aSQLAlchemy dialect,并且a提供了与druid交互的命令行接口。
要安装:
pipinstallpydruid# or, if you intend to use asynchronous clientpipinstallpydruid[async]# or, if you intend to export query results into pandaspipinstallpydruid[pandas]# or, if you intend to do bothpipinstallpydruid[async,pandas]# or, if you want to use the SQLAlchemy enginepipinstallpydruid[sqlalchemy]# or, if you want to use the CLIpipinstallpydruid[cli]
文档:https://pythonhosted.org/pydruid/。
示例
下面的示例演示如何执行和分析三种查询类型的结果:timeseries、topn和groupby。我们将使用这些查询来询问关于twitter公共数据集的简单问题。
时间序列
2014年索契奥运会期间,每天的平均微博长度是多少?
frompydruid.clientimport*frompylabimportpltquery=PyDruid(druid_url_goes_here,'druid/v2')ts=query.timeseries(datasource='twitterstream',granularity='day',intervals='2014-02-02/p4w',aggregations={'length':doublesum('tweet_length'),'count':doublesum('count')},post_aggregations={'avg_tweet_length':(Field('length')/Field('count'))},filter=Dimension('first_hashtag')=='sochi2014')df=query.export_pandas()df['timestamp']=df['timestamp'].map(lambdax:x.split('T')[0])df.plot(x='timestamp',y='avg_tweet_length',ylim=(80,140),rot=20,title='Sochi 2014')plt.ylabel('avg tweet length (chars)')plt.show()
顶部
在2014年奥斯卡颁奖典礼上,谁是最受关注的十大人物(@user_name)?
top=query.topn(datasource='twitterstream',granularity='all',intervals='2014-03-03/p1d',# utc time of 2014 oscarsaggregations={'count':doublesum('count')},dimension='user_mention_name',filter=(Dimension('user_lang')=='en')&(Dimension('first_hashtag')=='oscars')&(Dimension('user_time_zone')=='Pacific Time (US & Canada)')&~(Dimension('user_mention_name')=='No Mention'),metric='count',threshold=10)df=query.export_pandas()printdfcounttimestampuser_mention_name013032014-03-03T00:00:00.000ZTheEllenShow1442014-03-03T00:00:00.000ZTheAcademy2212014-03-03T00:00:00.000ZMTV3212014-03-03T00:00:00.000Zpeoplemag4172014-03-03T00:00:00.000ZTHR5162014-03-03T00:00:00.000ZItsQueenElsa6162014-03-03T00:00:00.000Zeonline7152014-03-03T00:00:00.000ZPerezHilton8142014-03-03T00:00:00.000Zrealjohngreen9122014-03-03T00:00:00.000ZKevinSpacey
分组方式
用户回复其他用户的社交网络是什么样子的?
fromigraphimport*fromcairoimport*frompandasimportconcatgroup=query.groupby(datasource='twitterstream',granularity='hour',intervals='2013-10-04/pt12h',dimensions=["user_name","reply_to_name"],filter=(~(Dimension("reply_to_name")=="Not A Reply"))&(Dimension("user_location")=="California"),aggregations={"count":doublesum("count")})df=query.export_pandas()# map names to categorical variables with a lookup tablenames=concat([df['user_name'],df['reply_to_name']]).unique()nameLookup=dict([pair[::-1]forpairinenumerate(names)])df['user_name_lookup']=df['user_name'].map(nameLookup.get)df['reply_to_name_lookup']=df['reply_to_name'].map(nameLookup.get)# create the graph with igraphg=Graph(len(names),directed=False)vertices=zip(df['user_name_lookup'],df['reply_to_name_lookup'])g.vs["name"]=namesg.add_edges(vertices)layout=g.layout_fruchterman_reingold()plot(g,"tweets.png",layout=layout,vertex_size=2,bbox=(400,400),margin=25,edge_width=1,vertex_color="blue")
异步客户端
pydruid.async_client.AsyncPyDruid
实现异步客户端。为了实现这一点,它使用了异步
来自Tornado
框架的http客户端。异步客户机适合与异步框架(如tornado)一起使用
在规模上提供更好的性能。它允许您同时服务多个请求,而不阻塞
德鲁伊正在执行你的查询。
示例
fromtornadoimportgenfrompydruid.async_clientimportAsyncPyDruidfrompydruid.utils.aggregatorsimportlongsumfrompydruid.utils.filtersimportDimensionclient=AsyncPyDruid(url_to_druid_broker,'druid/v2')@gen.coroutinedefyour_asynchronous_method_serving_top10_mentions_for_day(daytop_mentions=yieldclient.topn(datasource='twitterstream',granularity='all',intervals="%s/p1d"%(day,),aggregations={'count':doublesum('count')},dimension='user_mention_name',filter=(Dimension('user_lang')=='en')&(Dimension('first_hashtag')=='oscars')&(Dimension('user_time_zone')=='Pacific Time (US & Canada)')&~(Dimension('user_mention_name')=='No Mention'),metric='count',threshold=10)# asynchronously return results# can be simply ```return top_mentions``` in python 3.xraisegen.Return(top_mentions)
垫圈
Theta Sketch Post聚合器的构建与普通Post聚合器略有不同,因为它们有不同的运算符。
注意:您必须将druid-datasketches
扩展加载到您的druid集群中才能使用它们。
有关详细信息,请参阅Druid datasketches文档。
frompydruid.clientimport*frompydruid.utilsimportaggregatorsfrompydruid.utilsimportfiltersfrompydruid.utilsimportpostaggregatorquery=PyDruid(url_to_druid_broker,'druid/v2')ts=query.groupby(datasource='test_datasource',granularity='all',intervals='2016-09-01/P1M',filter=(filters.Dimension('product').in_(['product_A','product_B'])),aggregations={'product_A_users':aggregators.filtered(filters.Dimension('product')=='product_A',aggregators.thetasketch('user_id')),'product_B_users':aggregators.filtered(filters.Dimension('product')=='product_B',aggregators.thetasketch('user_id'))},post_aggregations={'both_A_and_B':postaggregator.ThetaSketchEstimate(postaggregator.ThetaSketch('product_A_users')&postaggregator.ThetaSketch('product_B_users'))})
数据库API
frompydruid.dbimportconnectconn=connect(host='localhost',port=8082,path='/druid/v2/sql/',scheme='http')curs=conn.cursor()curs.execute(""" SELECT place, CAST(REGEXP_EXTRACT(place, '(.*),', 1) AS FLOAT) AS lat, CAST(REGEXP_EXTRACT(place, ',(.*)', 1) AS FLOAT) AS lon FROM places LIMIT 10""")forrowincurs:print(row)
炼金术
fromsqlalchemyimport*fromsqlalchemy.engineimportcreate_enginefromsqlalchemy.schemaimport*engine=create_engine('druid://localhost:8082/druid/v2/sql/')# uses HTTP by default :(# engine = create_engine('druid+http://localhost:8082/druid/v2/sql/')# engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')places=Table('places',MetaData(bind=engine),autoload=True)print(select([func.count('*')],from_obj=places).scalar())
列标题
在0.13.0版本中,druid sql增加了对在 可通过请求中的“header”字段请求的响应。这个 有助于确保定义了游标描述(这是一项要求 对于sqlalchemy查询语句),无论结果集是否包含 任何一排。历史上,这对于不包含 一行无法推断出所需的列名。
通过使用查询,可以通过sqlalchemy uri配置启用头 参数,即
engine=create_engine('druid://localhost:8082/druid/v2/sql?header=true')
注意当前的默认值是false
,以确保向后兼容,但是应该
对于druid版本,设置为true
;=0.13.0。
命令行
$ pydruid http://localhost:8082/druid/v2/sql/ > SELECT COUNT(*) AS cnt FROM places cnt ----- 12345 > SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES; TABLE_NAME ---------- test_table COLUMNS SCHEMATA TABLES > BYE; GoodBye!