用Dask并行化HDF-readtranslatewri

2024-06-16 10:06:20 发布

您现在位置:Python中文网/ 问答频道 /正文

TL;DR:我们在将Pandas代码与Dask(从同一HDF读取和写入)并行时遇到问题

我正在做一个项目,通常需要三个步骤:阅读、翻译(或组合数据)和编写这些数据。在上下文中,我们处理的是医疗记录,我们接收不同格式的索赔,将其转换为标准格式,然后重新写入磁盘。理想情况下,我希望以某种形式保存中间数据集,以便以后通过Python/Pandas访问。在

目前,我选择HDF作为我的数据存储格式,但是我在运行时问题上遇到了麻烦。对于大量的用户,我的代码目前可能需要几天以上的时间。这让我调查了达斯克,但我不确定我是否将达斯克最好地应用于我的情况。在

下面是我的工作流程示例,希望有足够的示例数据来了解运行时问题。在

读取(在本例中为创建)数据

import pandas as pd
import numpy as np
import dask
from dask import delayed
from dask import dataframe as dd
import random
from datetime import timedelta
from pandas.io.pytables import HDFStore

member_id = range(1, 10000)
window_start_date = pd.to_datetime('2015-01-01')
start_date_col = [window_start_date + timedelta(days=random.randint(0, 730)) for i in member_id]

# Eligibility records
eligibility = pd.DataFrame({'member_id': member_id,
                            'start_date': start_date_col})
eligibility['end_date'] = eligibility['start_date'] + timedelta(days=365)
eligibility['insurance_type'] = np.random.choice(['HMO', 'PPO'], len(member_id), p=[0.4, 0.6])
eligibility['gender'] = np.random.choice(['F', 'M'], len(member_id), p=[0.6, 0.4])
(eligibility.set_index('member_id')
 .to_hdf('test_data.h5',
         key='eligibility',
         format='table'))

# Inpatient records
inpatient_record_number = range(1, 20000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in inpatient_record_number]
inpatient = pd.DataFrame({'inpatient_record_number': inpatient_record_number,
                          'service_date': service_date})
inpatient['member_id'] = np.random.choice(list(range(1, 10000)), len(inpatient_record_number))
inpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(inpatient_record_number))
(inpatient.set_index('member_id')
 .to_hdf('test_data.h5',
         key='inpatient',
         format='table'))

# Outpatient records
outpatient_record_number = range(1, 30000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in outpatient_record_number]
outpatient = pd.DataFrame({'outpatient_record_number': outpatient_record_number,
                           'service_date': service_date})
outpatient['member_id'] = np.random.choice(range(1, 10000), len(outpatient_record_number))
outpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(outpatient_record_number))
(outpatient.set_index('member_id')
 .to_hdf('test_data.h5',
         key='outpatient',
         format='table'))

转换/写入数据

顺序法

^{pr2}$

上面的代码在我的机器上运行大约9分钟。在

Dask方法

def create_visits_dask_version(visits_stacked):
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
    len_of_visits = visits_stacked.shape[0]
    visits_stacked_1 = (visits_stacked
                        .sort_values('service_date')
                        .assign(visit_id=range(1, len_of_visits + 1))
                        .set_index('visit_id')
                        )
    return visits_stacked_1


def run_translate_dask():
    # Approach 2: Dask, with individual writes to HDF
    inpatient_dask = dd.read_hdf('test_data.h5', 'inpatient')
    outpatient_dask = dd.read_hdf('test_data.h5', 'outpatient')
    stacked = dd.concat([inpatient_dask, outpatient_dask])
    visits = stacked.groupby('member_id').apply(create_visits_dask_version)
    visits.to_hdf('test_data_dask.h5', 'visits')

run_translate_dask()

这种Dask方法需要13秒(!)在

虽然这是一个很大的改进,但我们通常对以下几点感到好奇:

  1. 在这个简单的例子中,使用Dask数据帧、连接它们并使用groupby/apply的方法是最佳方法吗?

  2. 实际上,我们有多个这样的进程,它们从同一个HDF读取,然后写入同一个HDF。我们最初的代码库的结构允许一次运行一个member_id整个工作流。当我们试图将它们并行化时,它有时对小样本有效,但大多数时候都会产生分割错误。像这样使用HDFs读/写这样的并行化工作流是否存在已知问题?我们也在做一个例子,但是我们想把这个放在这里,以防引发建议(或者如果这段代码能帮助面临类似问题的人)。

感谢所有的反馈!在


Tags: 数据importidnumberdatelennprandom
1条回答
网友
1楼 · 发布于 2024-06-16 10:06:20

一般来说,groupby应用会相当慢。特别是在这种情况下,特别是在有限的记忆中。在

一般来说,我建议使用拼花地板格式(数据帧必须使用和读取拼花地板功能)。与HDF文件相比,您获得segfaults的可能性要小得多。在

相关问题 更多 >