一种高效的dask图缓存

graphchain的Python项目详细描述


CircleCILicensePyPIDocumentation

图形链

什么是Graphchain?

graphchain就像dask图的joblib.MemoryDask graph computations缓存到您选择的本地或远程位置,由PyFilesystem FS URL指定。

当您更改dask图(通过更改计算的实现或其输入)时,graphchain只需重新计算获取结果所需的最小计算数。这允许您快速迭代图形,而无需花费时间重新计算以前计算的键。


来源:xkcd.com/1205/

graphchain和joblib.memory的主要区别在于,在graphchain中,计算的实质性输入是not序列化和哈希(当输入是大对象(如pandas数据帧)时,这可能非常昂贵)。相反,使用计算对象及其依赖项(也就是计算对象)的哈希链(因此命名为graphchain)来标识缓存文件。

此外,只有在估计从缓存加载计算比简单计算计算节省时间时,才缓存计算结果。是否缓存的决定取决于缓存位置的特性,例如,缓存到本地文件系统与缓存到s3时的特性不同。

示例用法

基本用法

使用pip安装graphchain以开始:

pip install graphchain

为了演示GraphChain如何节省时间,我们首先创建一个简单的DASK图,该图(1)创建几个Pandas数据帧,(2)对这些数据帧运行一个相对繁重的操作,以及(3)总结结果。

importdaskimportgraphchainimportpandasaspddefcreate_dataframe(num_rows,num_cols):print('Creating DataFrame...')returnpd.DataFrame(data=[range(num_cols)]*num_rows)defcomplicated_computation(df,num_quantiles):print('Running complicated computation on DataFrame...')returndf.quantile(q=[i/num_quantilesforiinrange(num_quantiles)])defsummarise_dataframes(*dfs):print('Summing DataFrames...')returnsum(df.sum().sum()fordfindfs)dsk={'df_a':(create_dataframe,10_000,1000),'df_b':(create_dataframe,10_000,1000),'df_c':(complicated_computation,'df_a',2048),'df_d':(complicated_computation,'df_b',2048),'result':(summarise_dataframes,'df_c','df_d')}

使用dask.get获取'result'键大约需要6秒:

>>>%timedask.get(dsk,'result')CreatingDataFrame...RunningcomplicatedcomputationonDataFrame...CreatingDataFrame...RunningcomplicatedcomputationonDataFrame...SummingDataFrames...CPUtimes:user7.39s,sys:686ms,total:8.08sWalltime:6.19s

另一方面,第一次使用graphchain.get来获取'result'只需4秒:

>>>%timegraphchain.get(dsk,'result')CreatingDataFrame...RunningcomplicatedcomputationonDataFrame...SummingDataFrames...CPUtimes:user4.7s,sys:519ms,total:5.22sWalltime:4.04s

之所以graphchain.getdask.get快,是因为它可以在计算和缓存df_adf_c之后从缓存加载df_bdf_d。注意,如果从缓存加载计算的速度估计比简单地运行计算要快,graphchain将只缓存计算结果。

再次运行graphchain.get来获取'result'几乎是即时的,因为这一次的结果本身也可以从缓存中获得:

>>>%timegraphchain.get(dsk,'result')CPUtimes:user4.79ms,sys:1.79ms,total:6.58msWalltime:5.34ms

现在假设我们要将结果的汇总方式从总和改为平均值:

defsummarise_dataframes(*dfs):print('Averaging DataFrames...')returnsum(df.mean().mean()fordfindfs)/len(dfs)

如果我们让graphchain获取'result',它将检测到只有summarise_dataframes发生了更改,因此仅使用从缓存加载的输入重新计算此函数:

>>>%timegraphchain.get(dsk,'result')AveragingDataFrames...CPUtimes:user123ms,sys:37.2ms,total:160msWalltime:86.6ms

远程存储GraphChain缓存

graphchain的缓存默认为./__graphchain_cache__,但您可以要求graphchain在任何PyFilesystem FS URL位置使用缓存,例如s3://mybucket/__graphchain_cache__

graphchain.get(dsk,'result',location='s3://mybucket/__graphchain_cache__')

不包括要缓存的密钥

在某些情况下,您可能不希望缓存密钥。为了避免将某些键写入graphchain缓存,可以使用skip_keys参数:

graphchain.get(dsk,'result',skip_keys=['result'])

在dask.delayed

中使用graphchain

或者,您可以将graphchain与dask.delayed一起使用,以便更轻松地创建dask图:

@dask.delayeddefcreate_dataframe(num_rows,num_cols):print('Creating DataFrame...')returnpd.DataFrame(data=[range(num_cols)]*num_rows)@dask.delayeddefcomplicated_computation(df,num_quantiles):print('Running complicated computation on DataFrame...')returndf.quantile(q=[i/num_quantilesforiinrange(num_quantiles)])@dask.delayeddefsummarise_dataframes(*dfs):print('Summing DataFrames...')returnsum(df.sum().sum()fordfindfs)df_a=create_dataframe(num_rows=50_000,num_cols=500,seed=42)df_b=create_dataframe(num_rows=50_000,num_cols=500,seed=42)df_c=complicated_computation(df_a,window=3)df_d=complicated_computation(df_b,window=3)result=summarise_dataframes(df_c,df_d)

之后,您可以通过将delayed_optimize方法设置为graphchain.optimize,来计算result

withdask.config.set(scheduler='sync',delayed_optimize=graphchain.optimize):result.compute(location='s3://mybucket/__graphchain_cache__')

由基数ai开发

radix.ai,我们发明、设计和开发人工智能软件。

下面是我们使用机器学习的一些例子,这是人工智能背后的技术:

  • 帮助求职者找到工作。在Belgian Public Employment Service website上,我们根据您的简历提供工作推荐。
  • 帮助医院节省时间。我们从病人出院信中提取诊断信息。
  • 通过检测模仿的文章,帮助出版商计算他们的影响力。

你可以在medium上跟随我们的冒险。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java组织。冬眠hql。内部的阿斯特。QuerySyntaxException:<table\u name>未映射[来自<table\u name>]   异常无法有效使用来自Java的多捕获   java Hibernate Search是Lucene的干净抽象吗?   组织。xml。萨克斯。SAXException:没有针对的反序列化程序{http://www.w3.org/2001/XMLSchema}Java中使用Axis的anyType   java是独立于Eclipse平台的吗?   java PigLatin语句转换器方法给出了“indexoutofboundsexception”错误等   java Spring+Jackson+反序列化泛型对象列表   java获取firefox配置文件的配置文件当配置文件计数超过3(包括默认值)时,我没有选择给定的配置文件   java XPages应用程序无法运行   java为什么字符连接返回“int”和?   java如何以正确的方式从静态助手类中的异步方法返回值?   java如何将字符串值转换为整数数组列表   javajackcess表限制?   java如何使用SMACK API和GTALK发送消息?   java按钮显示在左上角,具有不同的外观和感觉