如何使用dask高效地计算自定义统计信息?

2024-04-20 09:41:53 发布

您现在位置:Python中文网/ 问答频道 /正文

问题

我有一个自定义类CachedStatistics,其中包含一个dask数据帧。它还具有自定义方法,这些方法可以依赖于dask方法或其他自定义方法。你知道吗

这个类是一个数据帧,具有dask中原来不存在的新操作。你知道吗

简化的实现如下:

class CachedStatistics:
    def __init__(self, parquet)
        self.df = dd.read_parquet(parquet)
        self.cached = ..

    # method to implement cache
    def _call_method(self, name):
        if self.cached[name] is None:
            self.cached[name] = self.getattr(name).__call__()

        return self.cached[name]

    def nrows(self):
        return len(self.df)

    def count(self):
        return self.df.count()

    def missing_pct(self):
        return self._call_method("nrows") / self._call_method("count")

    def test_missing(self):
        if self._call_method("missing_pct") < 0.5:
            return True
        else:
            return False

    def col_mean(self, col)
        return self.df[col].mean()

    def summary(self):
        df_dict = { 
            'missing_pct': self._call_method("missing_pct") , 
            'mean' : self._call_method("mean")
        }

        return pd.Series(df_dict)

我的主要要求是:

缓存所有计算

例如,我希望能够在missing_pct()上调用dask.compute,这样不仅可以保存missing_pct()的结果,还可以保存每个依赖项(nrows()count())的结果。你知道吗

我试图找到一种方法,以防实现自定义集合,但不知道如何实现。你知道吗

优化计算

我想用一个简单的数据计算出几个数据dask.compute公司(),以避免开销并最大限度地提高性能。你知道吗

实施

我尝试过使每个方法都延迟,但是当我在外部方法上调用compute时,嵌套的延迟对象不会得到计算,因为dask解压延迟对象的方式。 例子: nested delayed objects

从我从文档中读到的内容来看,让我所有的方法都输出一个HighLevelGraph似乎是一种可行的方法,但是我不确定如何将我当前定义的方法转换成一个依赖字典,因为我更希望能够像现在这样定义方法。你知道吗

感谢您的帮助或指导性建议。你知道吗


Tags: 数据方法nameselfdfreturndefcount