问题
我的工作流程包括下载数十万个文件,解析数据,然后在本地保存到csv。我正在尝试使用Dask设置此工作流,但它似乎没有并行处理。Dask仪表板显示每个工人的cpu%较低,任务选项卡为空。状态也没有显示任何内容htop
似乎一次处理的“运行”次数不超过1或2次。我不知道如何从这里开始
相关:How should I write multiple CSV files efficiently using dask.dataframe?(本问题所依据的旧问题)
示例
from dask.delayed import delayed
from dask import compute
from dask.distributed import Client, progress
import pandas as pd
import wget
import zipfile
import multiprocessing
def get_fn(dat):
### Download file and unzip based on input dat
url = f"http://www.urltodownloadfrom.com/{dat['var1']}/{dat['var2']}.csv"
wget.download(url)
indat = unzip()
### Process file
outdat = proc_dat(indat)
### Save file
outdat.to_csv('file_path')
### Trash collection with custom download fn
delete_downloads()
if __name__ == '__main__':
### Dask setup
NCORES = multiprocessing.cpu_count() - 1
client = Client(n_workers=NCORES, threads_per_worker=1)
### Build df of needed dates and variables
beg_dat = "2020-01-01"
end_dat = "2020-01-31"
date_range = pd.date_range(beg_dat, end_dat)
var = ["var1", "var2"]
lst_ = [(x, y) for x in date_range for y in var]
date = [x[0] for x in lst_]
var = [x[1] for x in lst_]
indf = pd.DataFrame({'date': date, 'var': var}).reset_index()
### Group by each row to process
gb = indf.groupby('index')
gb_i = [gb.get_group(x) for x in gb.groups]
### Start dask using delayed
compute([delayed(get_fn)(thisRow) for thisRow in gb_i], scheduler='processes')
仪表板
在这方面:
您显式地使用一个调度程序其他,而不是在脚本中前面设置的分布式调度程序。如果不在此处指定
scheduler=
,则将使用正确的客户端,因为它已被设置为默认值。您将在仪表板中看到一些内容请注意,您可能仍然看不到高CPU使用率,因为似乎大部分时间都在等待下载
相关问题 更多 >
编程相关推荐