无法使用dask将csv读取到数据帧中,使数据帧与FBI匹配

2024-04-19 17:06:46 发布

您现在位置:Python中文网/ 问答频道 /正文

参考资料:


需要注意的几点:

  • 我总共有48gb的ram

  • 以下是我使用的库的版本

    • Python 3.7.7
    • dask==2.18.0
    • fB=0.6
    • 熊猫==1.0.3

im导入熊猫的原因仅适用于此行
pd.options.mode.chained_assignment = None
这有助于在im使用dask.distributed时处理dask错误

所以,我有一个21gb的csv文件,我正在使用dask和jupyter笔记本阅读。。。 我试图从mysql数据库表中读取它,但是,内核最终崩溃了

我尝试了多种组合使用我的本地工作网络、线程和可用内存、可用存储和内存,甚至尝试根本不使用distributed。我也尝试过使用pandas进行分块(不是使用上面提到的与pandas相关的行),但是,即使使用分块,内核仍然崩溃

我现在可以用dask加载csv,并应用一些转换,例如设置索引,添加fbprophet需要的列(名称)。。。但是我仍然无法用df.compute()计算数据帧,因为这就是为什么我认为我收到了fbprophet的错误。在我用适当的数据类型添加了y列和ds列之后,我收到了错误Truth of Delayed objects is not supported,我认为这是因为fbprophet期望数据帧不是惰性的,这就是我试图预先运行compute的原因。我还增加了客户机上的ram,以允许它使用完整的48gb,因为我怀疑它可能会尝试两次加载数据,但是,这仍然失败,因此很可能这不是问题的原因

除此之外,dask的文档中也提到了fbpropphet,用于将机器学习应用于数据帧,然而,我真的不明白为什么这不起作用。。。我也试过莫丁和雷,还有达斯克,结果基本相同

另一个问题。。。关于内存使用 distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 32.35 GB -- Worker memory limit: 25.00 GB 我在分配客户端、读取csv文件以及对数据帧应用操作/转换时遇到此错误,但是分配的大小大于csv文件本身,因此这让我感到困惑

我自己做了些什么来尝试解决这个问题: -当然,谷歌搜索没有发现任何东西:-/ -多次询问不和谐帮助频道 -多次询问IIRC帮助频道

无论如何,我真的非常感谢在这个问题上的任何帮助!!! 提前感谢:)

MCVE

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from fbprophet import Prophet

pd.options.mode.chained_assignment = None
client = Client(n_workers=2, threads_per_worker=4, processes=False, memory_limit='4GB')
csv_file = 'provide_your_own_csv_file_here.csv'
df = dd.read_csv(csv_file, parse_dates=['Time (UTC)'])
df = df.set_index('Time (UTC)')
df['y'] = df[['a','b']].mean(axis=1)
m = Prophet(daily_seasonality=True)
m.fit(df)
# ERROR: Truth of Delayed objects is not supported

Tags: 文件csv数据importpandasdfis错误
2条回答

不幸的是,Prophet今天不支持Dask数据帧

您提到的示例显示了使用Dask加速Prophet在数据帧上的拟合。Dask数据帧只是人们使用Dask的一种方式

作为already suggested,一种方法是将dask.delayed与pandas DataFrame一起使用,并跳过dask.dataframe

您可以使用所示的简化版^{}-^{}-^{} pipeline进行使用Dask的自定义计算

这里有一种基于这种定制管道的可能方法,使用小的数据集(创建MCVE)-管道中的每一步都将延迟

进口

import numpy as np
import pandas as pd
from dask import delayed
from dask.distributed import Client
from fbprophet import Prophet

.csv中生成一些列名为Time (UTC)ab的数据

def generate_csv(nrows, fname):
    df = pd.DataFrame(np.random.rand(nrows, 2), columns=["a", "b"])
    df["Time (UTC)"] = pd.date_range(start="1850-01-01", periods=nrows)
    df.to_csv(fname, index=False)

首先从管道中写入load函数,用Pandas加载.csv,并使用^{} decorator延迟其执行

  • 最好使用^{} with ^{}来查看管道对数据子集的执行情况,而不是全部加载
  • 这将返回一个dask.delayed对象,而不是pandas.DataFrame
@delayed
def load_data(fname, nrows=None):
    return pd.read_csv(fname, nrows=nrows)

现在创建process函数,使用pandas处理数据,再次延迟,因为它的输入是dask.delayed对象而不是pandas.DataFrame

@delayed
def process_data(df):
    df = df.rename(columns={"Time (UTC)": "ds"})
    df["y"] = df[["a", "b"]].mean(axis=1)
    return df

最后一个函数-此函数将对数据(从.csv加载并处理,但延迟)进行训练fbprophet以进行预测。这个analyze函数也会延迟,因为它的一个输入是dask.delayed对象

@delayed
def analyze(df, horizon):
    m = Prophet(daily_seasonality=True)
    m.fit(df)
    future = m.make_future_dataframe(periods=horizon)
    forecast = m.predict(future)
    return forecast

运行管道(如果从Python脚本运行,requires__name__ == "__main__"

  • 管道的输出(由fbprophet进行的预测)存储在变量result中,该变量被延迟
    • 计算此输出时,将生成一个pandas.DataFrame(对应于fbprophet预测的输出),因此可以使用result.compute()对其进行评估
if __name__ == "__main__":
    horizon = 8
    num_rows_data = 40
    num_rows_to_load = 35
    csv_fname = "my_file.csv"

    generate_csv(num_rows_data, csv_fname)

    client = Client()  # modify this as required

    df = load_data(csv_fname, nrows=num_rows_to_load)
    df = process_data(df)
    result = analyze(df, horizon)
    forecast = result.compute()

    client.close()

    assert len(forecast) == num_rows_to_load + horizon
    print(forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].head())

输出

          ds      yhat  yhat_lower  yhat_upper
0 1850-01-01  0.330649    0.095788    0.573378
1 1850-01-02  0.493025    0.266692    0.724632
2 1850-01-03  0.573344    0.348953    0.822692
3 1850-01-04  0.491388    0.246458    0.712400
4 1850-01-05  0.307939    0.066030    0.548981

相关问题 更多 >