将'Flatten Dictionary Column'的英文标题翻译成中文,不包含任何特殊字符或引号:Dask压平字典列

2024-06-12 07:55:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我是新来的达斯克,正在寻找一种方法,以扁平化字典列在熊猫数据帧。以下是1600万行数据帧的第一行截图:

screenshot of first two rows of data

下面是三行的文本示例:

{{u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'}}

我通常会使用以下代码将Form990PartVIISectionAGrp列展平:

^{pr2}$

我希望在Dask中执行此操作,但得到以下错误:“ValueError:计算数据中的列与提供的元数据中的列不匹配。”

我使用的是python2.7。我进口相关的包裹

    from dask import dataframe as dd
    from dask.multiprocessing import get
    from multiprocessing import cpu_count
    nCores = cpu_count()

为了测试代码,我创建了一个随机的数据样本:

    dfs = df.sample(1000)

然后生成Dask数据帧:

    ddf = dd.from_pandas(dfs, npartitions=nCores)

该列当前为字符串格式,因此我将其转换为字典。通常,我只写一行代码:

dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval) 

但我在这里尝试用一种更“Dask-like”的形式来实现它,因此我编写了以下函数,然后应用它:

    def make_dict(dfs):
        dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)   
        return dfs
    ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()

这是可行的——它返回一个PANDAS数据帧,其中Form990PartVIISectionAGrp列是字典格式的(但是,它并不比非daskapply快)。在

ddf_out

然后重新创建Dask DF:

    ddf = dd.from_pandas(ddf_out, npartitions=nCores)

然后编写一个函数来展开列:

    def flatten(ddf_out):
        ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
        #ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
    return ddf_out

如果我运行这个代码:

    result = ddf.map_partitions(flatten)

我得到以下输出,其中列尚未展平:

result

我还得到了关于缺少元数据的错误,并且考虑到上面的内容没有帮助解析dictionary列,所以我创建了一个由纯Python展开列生成的列的列表,并用它来创建列和数据类型的字典:

metadir = {u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
       u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
       u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
       u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
       u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
       u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
       u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'}

然后我对这些元数据应用flatten函数:

    result = ddf.map_partitions(flatten, meta=metadir)

我得到以下输出结果:

result

跑步结果.列结果是:

result.columns

失败的地方是运行compute(),在那里我得到以下错误消息:“ValueError:计算数据中的列与提供的元数据中的列不匹配。”无论我写什么,都会得到相同的错误:

result.compute()

或者

result.compute(meta=metadir)

我不知道我做错了什么。result中的列似乎与metadir中的列相匹配。如有任何建议,将不胜感激。在

更新: 这是我在更新扁平函数方面的尝试。在

    meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
     'F9_07_PZ_DIRTRSTKEY_NAME',
     'F9_07_PZ_COMP_OTHER',
     'F9_07_PZ_COMP_RELATED',
     'F9_07_PZ_TITLE',
     'F9_07_PZ_AVE_HOURS_WEEK',
     'F9_07_PZ_COMP_DIRECT',
     'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
     'F9_07_PC_OFFICER',
     'F9_07_PC_HIGH_COMP_EMPLOYEE',
     'BusinessName',
     'F9_07_PC_KEY_EMPLOYEE',
     'F9_07_PC_TRUSTEE_INSTITUTIONAL',
     'NameBusiness',
     'F9_07_PC_FORMER'], dtype="O")

    def flatten(ddf_out):
        ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
        for m in meta:
            if m not in ddf_out:
                df[m] = '' 
        return ddf_out

然后我运行:

result = ddf.map_partitions(flatten, meta=meta).compute()

Tags: 数据outmetarelatedweekpccomphours
2条回答

对于一个小型或中型数据集,plain PANDAS解决方案将起作用:

df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)

然而,拥有1600万行的PANDAS解决方案既不能在16GB内存的Macbook上运行,也不能在96GB的Windows机器上运行。所以我找了达斯克。但是,正如上面的答案和注释所示,Dask解决方案不起作用,因为我的数据集中的每个观察不一定都有字典键。总共,1600万个Form990PartVIISectionAGrp的观察结果包含以下列表中的15个键:

^{pr2}$

因此,我的解决方案包括采用上面@mdurant提供的一些提示,并首先向每行添加任何缺少的键:

for index, row in df[:].iterrows():
    for k in newkeys:
        row['Form990PartVIISectionAGrp'].setdefault(k, np.nan)

在我的Macbook上花了100分钟。根据mdurant的评论,我将dataframe保存为JSON格式:

df.to_json('df.json', orient='records', lines=True)

并将文件作为文本读入Dask:

import json
import dask.bag as db
b = db.read_text('df.json').map(json.loads)

然后创建一个函数来展平列:

def flatten(record):
    return {
    'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
    'F9_07_PZ_COMP_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_RELATED'],
    'F9_07_PC_TRUSTEE_INDIVIDUAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INDIVIDUAL'],
    'F9_07_PZ_DIRTRSTKEY_NAME': record['Form990PartVIISectionAGrp']['F9_07_PZ_DIRTRSTKEY_NAME'],
    'F9_07_PZ_COMP_DIRECT': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_DIRECT'],
    'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],  
    'BusinessName': record['Form990PartVIISectionAGrp']['BusinessName'],  
    'F9_07_PC_FORMER': record['Form990PartVIISectionAGrp']['F9_07_PC_FORMER'],
    'F9_07_PC_HIGH_COMP_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_HIGH_COMP_EMPLOYEE'],
    'F9_07_PC_KEY_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_KEY_EMPLOYEE'],
    'F9_07_PC_OFFICER': record['Form990PartVIISectionAGrp']['F9_07_PC_OFFICER'],
    'F9_07_PC_TRUSTEE_INSTITUTIONAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INSTITUTIONAL'],
    'F9_07_PZ_AVE_HOURS_WEEK': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK'],
    'F9_07_PZ_AVE_HOURS_WEEK_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK_RELATED'],
    'F9_07_PZ_TITLE': record['Form990PartVIISectionAGrp']['F9_07_PZ_TITLE'],
    'NameBusiness': record['Form990PartVIISectionAGrp']['NameBusiness'],
    'URL': record['URL'],
}

然后我可以应用函数:

df = b.map(flatten).to_dataframe()

并将数据导出到CSV:

df.to_csv('compensation*.csv')

这个很有魅力!简而言之,根据mdurant上面有用的评论,关键是:1)为所有观察添加缺失的键;2)不从PANDAS将数据读入Dask(使用文本或CSV代替)。处理好这两个问题就可以很好地解决这个问题。在

开始的几点注意事项:

.apply(literal_eval)

这不是更好的map?在

I then re-create the Dask DF:

ddf = dd.from_pandas(ddf_out, npartitions=nCores)

ddf_out已经是一个dask数据帧,我不知道你为什么要这么做。在

The columns in result seem to match those in metadir.

result.columns的值取自您提供的元,除非您请求,否则不会进行任何计算(dask在大多数操作中是懒惰的)。ValueError异常是否没有提供进一步的信息?在

这是一个完整的例子

x = ({'F9_07_PZ_COMP_DIRECT': '0',
  'F9_07_PZ_DIRTRSTKEY_NAME': 'DEBRA MEALY',
  'F9_07_PZ_COMP_OTHER': '0',
  'F9_07_PZ_COMP_RELATED': '0',
  'F9_07_PZ_TITLE': 'CHAIR PERSON',
  'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
  'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'},
 {'F9_07_PZ_COMP_DIRECT': '0',
  'F9_07_PZ_DIRTRSTKEY_NAME': 'HELEN GORDON',
  'F9_07_PZ_COMP_OTHER': '0',
  'F9_07_PZ_COMP_RELATED': '0',
  'F9_07_PZ_TITLE': 'VICE CHAIR',
  'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
  'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'})
df = pd.DataFrame({'a': x})
d = dd.from_pandas(df, 1)
meta = pd.DataFrame(columns=['F9_07_PZ_COMP_DIRECT', 
       'F9_07_PZ_DIRTRSTKEY_NAME',
       'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE',
       'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PC_TRUSTEE_INDIVIDUAL'], dtype="O")
d.map_partitions(lambda df: df.a.apply(pd.Series), meta=meta).compute()

我怎么知道要用什么?我将这个函数应用于pandas数据帧-您可以使用数据帧的一小部分来执行此操作。在

其他注意事项:

  • 用pandas加载数据,传递给dask工作人员,然后将整个结果收集回pandas(内存中)数据帧是一种反模式,您不太可能看到这种加速,并且可能会产生大量开销。最好使用dd.read_csv之类的东西进行加载,并使用dask函数进行聚合或编写。只有compute()在将很小或不返回任何内容(因为它涉及到写入输出)。官方的例子没有使用来自泙熊猫。在
  • string和dict处理是python方法,因此持有任何python函数的解释器锁(GIL):线程实际上不会并行运行。要获得并行性,您需要在进程中运行,这是使用https://docs.dask.org/en/latest/setup/single-distributed.html最容易实现的
  • distributed scheduler还允许您访问仪表板,仪表板包含许多有用的信息来诊断系统的运行情况。您还可以配置许多关于其行为的信息,以防您有需要遵守的防火墙规则。在

相关问题 更多 >