使用Dask下载、处理并保存到csv

2024-04-20 15:51:26 发布

您现在位置:Python中文网/ 问答频道 /正文

问题

我的工作流程包括下载数十万个文件,解析数据,然后在本地保存到csv。我正在尝试使用Dask设置此工作流,但它似乎没有并行处理。Dask仪表板显示每个工人的cpu%较低,任务选项卡为空。状态也没有显示任何内容htop似乎一次处理的“运行”次数不超过1或2次。我不知道如何从这里开始

相关:How should I write multiple CSV files efficiently using dask.dataframe?(本问题所依据的旧问题)

示例

from dask.delayed import delayed
from dask import compute
from dask.distributed import Client, progress
import pandas as pd
import wget
import zipfile
import multiprocessing


def get_fn(dat):    
    ### Download file and unzip based on input dat
    url = f"http://www.urltodownloadfrom.com/{dat['var1']}/{dat['var2']}.csv"
    wget.download(url)
    indat = unzip()

    ### Process file
    outdat = proc_dat(indat)
    
    ### Save file
    outdat.to_csv('file_path')

    ### Trash collection with custom download fn
    delete_downloads()


if __name__ == '__main__':

    ### Dask setup    
    NCORES = multiprocessing.cpu_count() - 1
    client = Client(n_workers=NCORES, threads_per_worker=1)

    ### Build df of needed dates and variables    
    beg_dat = "2020-01-01"
    end_dat = "2020-01-31"
    date_range = pd.date_range(beg_dat, end_dat)
    var = ["var1", "var2"]

    lst_ = [(x, y) for x in date_range for y in var]
    date = [x[0] for x in lst_]
    var = [x[1] for x in lst_]

    indf = pd.DataFrame({'date': date, 'var': var}).reset_index()

    ### Group by each row to process
    gb = indf.groupby('index')
    gb_i = [gb.get_group(x) for x in gb.groups]

    ### Start dask using delayed
    compute([delayed(get_fn)(thisRow) for thisRow in gb_i], scheduler='processes')

仪表板

enter image description here

enter image description here

enter image description here


Tags: csvinfromimportforgetdatevar
1条回答
网友
1楼 · 发布于 2024-04-20 15:51:26

在这方面:

compute([...], scheduler='processes')

您显式地使用一个调度程序其他,而不是在脚本中前面设置的分布式调度程序。如果不在此处指定scheduler=,则将使用正确的客户端,因为它已被设置为默认值。您将在仪表板中看到一些内容

请注意,您可能仍然看不到高CPU使用率,因为似乎大部分时间都在等待下载

相关问题 更多 >