一种高效的dask图缓存
graphchain的Python项目详细描述
图形链
什么是Graphchain?
graphchain就像dask图的joblib.Memory。Dask graph computations缓存到您选择的本地或远程位置,由PyFilesystem FS URL指定。
当您更改dask图(通过更改计算的实现或其输入)时,graphchain只需重新计算获取结果所需的最小计算数。这允许您快速迭代图形,而无需花费时间重新计算以前计算的键。
graphchain和joblib.memory的主要区别在于,在graphchain中,计算的实质性输入是not序列化和哈希(当输入是大对象(如pandas数据帧)时,这可能非常昂贵)。相反,使用计算对象及其依赖项(也就是计算对象)的哈希链(因此命名为graphchain)来标识缓存文件。
此外,只有在估计从缓存加载计算比简单计算计算节省时间时,才缓存计算结果。是否缓存的决定取决于缓存位置的特性,例如,缓存到本地文件系统与缓存到s3时的特性不同。
示例用法
基本用法
使用pip安装graphchain以开始:
pip install graphchain
为了演示GraphChain如何节省时间,我们首先创建一个简单的DASK图,该图(1)创建几个Pandas数据帧,(2)对这些数据帧运行一个相对繁重的操作,以及(3)总结结果。
importdaskimportgraphchainimportpandasaspddefcreate_dataframe(num_rows,num_cols):print('Creating DataFrame...')returnpd.DataFrame(data=[range(num_cols)]*num_rows)defcomplicated_computation(df,num_quantiles):print('Running complicated computation on DataFrame...')returndf.quantile(q=[i/num_quantilesforiinrange(num_quantiles)])defsummarise_dataframes(*dfs):print('Summing DataFrames...')returnsum(df.sum().sum()fordfindfs)dsk={'df_a':(create_dataframe,10_000,1000),'df_b':(create_dataframe,10_000,1000),'df_c':(complicated_computation,'df_a',2048),'df_d':(complicated_computation,'df_b',2048),'result':(summarise_dataframes,'df_c','df_d')}
使用dask.get
获取'result'
键大约需要6秒:
>>>%timedask.get(dsk,'result')CreatingDataFrame...RunningcomplicatedcomputationonDataFrame...CreatingDataFrame...RunningcomplicatedcomputationonDataFrame...SummingDataFrames...CPUtimes:user7.39s,sys:686ms,total:8.08sWalltime:6.19s
另一方面,第一次使用graphchain.get
来获取'result'
只需4秒:
>>>%timegraphchain.get(dsk,'result')CreatingDataFrame...RunningcomplicatedcomputationonDataFrame...SummingDataFrames...CPUtimes:user4.7s,sys:519ms,total:5.22sWalltime:4.04s
之所以graphchain.get
比dask.get
快,是因为它可以在计算和缓存df_a
和df_c
之后从缓存加载df_b
和df_d
。注意,如果从缓存加载计算的速度估计比简单地运行计算要快,graphchain将只缓存计算结果。
再次运行graphchain.get
来获取'result'
几乎是即时的,因为这一次的结果本身也可以从缓存中获得:
>>>%timegraphchain.get(dsk,'result')CPUtimes:user4.79ms,sys:1.79ms,total:6.58msWalltime:5.34ms
现在假设我们要将结果的汇总方式从总和改为平均值:
defsummarise_dataframes(*dfs):print('Averaging DataFrames...')returnsum(df.mean().mean()fordfindfs)/len(dfs)
如果我们让graphchain获取'result'
,它将检测到只有summarise_dataframes
发生了更改,因此仅使用从缓存加载的输入重新计算此函数:
>>>%timegraphchain.get(dsk,'result')AveragingDataFrames...CPUtimes:user123ms,sys:37.2ms,total:160msWalltime:86.6ms
远程存储GraphChain缓存
graphchain的缓存默认为./__graphchain_cache__
,但您可以要求graphchain在任何PyFilesystem FS URL位置使用缓存,例如s3://mybucket/__graphchain_cache__
:
graphchain.get(dsk,'result',location='s3://mybucket/__graphchain_cache__')
不包括要缓存的密钥
在某些情况下,您可能不希望缓存密钥。为了避免将某些键写入graphchain缓存,可以使用skip_keys
参数:
graphchain.get(dsk,'result',skip_keys=['result'])
在dask.delayed
中使用graphchain或者,您可以将graphchain与dask.delayed一起使用,以便更轻松地创建dask图:
@dask.delayeddefcreate_dataframe(num_rows,num_cols):print('Creating DataFrame...')returnpd.DataFrame(data=[range(num_cols)]*num_rows)@dask.delayeddefcomplicated_computation(df,num_quantiles):print('Running complicated computation on DataFrame...')returndf.quantile(q=[i/num_quantilesforiinrange(num_quantiles)])@dask.delayeddefsummarise_dataframes(*dfs):print('Summing DataFrames...')returnsum(df.sum().sum()fordfindfs)df_a=create_dataframe(num_rows=50_000,num_cols=500,seed=42)df_b=create_dataframe(num_rows=50_000,num_cols=500,seed=42)df_c=complicated_computation(df_a,window=3)df_d=complicated_computation(df_b,window=3)result=summarise_dataframes(df_c,df_d)
之后,您可以通过将delayed_optimize
方法设置为graphchain.optimize
,来计算result
:
withdask.config.set(scheduler='sync',delayed_optimize=graphchain.optimize):result.compute(location='s3://mybucket/__graphchain_cache__')
由基数ai开发
在radix.ai,我们发明、设计和开发人工智能软件。
下面是我们使用机器学习的一些例子,这是人工智能背后的技术:
- 帮助求职者找到工作。在Belgian Public Employment Service website上,我们根据您的简历提供工作推荐。
- 帮助医院节省时间。我们从病人出院信中提取诊断信息。
- 通过检测模仿的文章,帮助出版商计算他们的影响力。
你可以在medium上跟随我们的冒险。