如何使用dask读取csv并处理行?

2024-04-19 15:36:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我想读取一个28Gb的csv文件并打印内容。但是,我的代码:

import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *


class IndexInKyoto:

    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()

    def dbproc(self, db):
        db[self.hash_string(self.row)] = self.row

    def index_row(self, row):
        self.row = row
        DB.process(self.dbproc, "index.kch")

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv", blocksize=1000000)
df = df.compute(scheduler='processes')     # convert to pandas
df = df.to_dict(orient='records')
for row in df:
    ob.index_row(row)
print("Total time:")
print(datetime.utcnow-start_time)

不起作用。当我运行命令htop时,我可以看到dask正在运行,但是没有任何输出。也没有索引.kch文件已创建。 我在没有使用dask的情况下抱怨了同样的事情,它运行得很好;我使用的是Pandas流式api(chunksize),但是太慢了,所以我想使用dask。在


Tags: 文件csvfromimportselfdfdatetimestring
1条回答
网友
1楼 · 发布于 2024-04-19 15:36:53
df = df.compute(scheduler='processes')     # convert to pandas

别这样!在

您将在单独的进程中加载片段,然后在主进程中将要缝合的所有数据传输到单个数据帧中。这只会增加处理的开销,并在内存中创建数据的副本。在

如果您只想(出于某种原因)将每一行打印到控制台,那么您可以很好地使用Pandas streaming CSV reader(pd.read_csv(chunksize=..))。你可以使用Dask的chunking来运行它,如果你在读取数据的工人中进行打印,可能会得到一个加速:

^{pr2}$

请注意,for row in df实际上为您提供了列,可能您需要iterrows,或者您实际上想以某种方式处理数据。在

相关问题 更多 >