使用pandas的“大数据”工作流程

2024-04-26 06:07:21 发布

您现在位置:Python中文网/ 问答频道 /正文

几个月来,在学习熊猫的过程中,我一直试图找出这个问题的答案。我在日常工作中使用SAS,这是非常好的,因为它是核心支持之外的。然而,由于许多其他原因,SAS作为一个软件是可怕的。

有一天,我希望用python和pandas代替SAS,但我目前缺乏一个用于大型数据集的核心工作流。我指的不是需要分布式网络的“大数据”,而是那些大到无法装入内存,小到足以装入硬盘的文件。

我的第一个想法是使用HDFStore将大型数据集保存在磁盘上,并仅将所需的片段拉入数据帧进行分析。其他人提到MongoDB是一个更容易使用的替代品。我的问题是:

完成以下工作的最佳实践工作流是什么:

  1. 将平面文件加载到永久的磁盘上数据库结构中
  2. 查询该数据库以检索要输入pandas数据结构的数据
  3. 在熊猫中操作碎片后更新数据库

现实世界的例子将非常值得赞赏,特别是从任何人谁使用大数据熊猫。

编辑——一个我希望这样做的例子:

  1. 迭代地导入一个大型平面文件并将其存储在永久的磁盘上数据库结构中。这些文件通常太大,无法放入内存。
  2. 为了使用Pandas,我想读取这些数据的子集(通常一次只有几列),这些子集可以放在内存中。
  3. 我将通过对选定列执行各种操作来创建新列。
  4. 然后我必须将这些新列追加到数据库结构中。

我正在努力寻找执行这些步骤的最佳实践方法。阅读有关pandas和pytables的链接,似乎追加一个新列可能是个问题。

编辑——具体回答杰夫的问题:

  1. 我正在建立消费信贷风险模型。这些数据包括电话、SSN和地址特征;财产价值;诸如犯罪记录、破产等贬义信息。。。我每天使用的数据集平均有1000到2000个混合数据类型的字段:数字和字符数据的连续、标称和序数变量。我很少附加行,但我确实执行了许多创建新列的操作。
  2. 典型的操作包括使用条件逻辑将多个列组合成一个新的复合列。例如,if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'。这些操作的结果是为数据集中的每个记录创建一个新列。
  3. 最后,我想将这些新列追加到磁盘上的数据结构中。我将重复第2步,使用交叉表和描述性统计来探索数据,试图找到有趣的、直观的关系来建模。
  4. 典型的项目文件通常约为1GB。文件以这样一种方式组织:行由使用者数据记录组成。每一行的每一条记录都有相同数量的列。情况总是这样。
  5. 在创建新列时很少按行进行子集。但是,在创建报表或生成描述性统计数据时,我通常会将数据子集放在行上。例如,我可能想为特定的业务线创建一个简单的频率,比如零售信用卡。为此,除了要报告的列之外,我只选择业务线=零售的那些记录。但是,在创建新列时,我将提取所有数据行,并且只提取操作所需的列。
  6. 建模过程要求我分析每一列,寻找与某个结果变量的有趣关系,并创建新的复合列来描述这些关系。我所研究的专栏通常都是以小篇幅完成的。例如,我会我将集中在一组20列中,这些列只处理属性值,并观察它们与拖欠贷款的关系。一旦对这些内容进行了探索并创建了新的专栏,我就会转到另一组专栏,比如大学教育,然后重复这个过程。我所做的是创建候选变量来解释我的数据和一些结果之间的关系。在这个过程的最后,我应用一些学习技巧,从这些复合列中创建一个方程。

我很少向数据集添加行。我几乎总是在创建新的列(统计/机器学习术语中的变量或特性)。


Tags: 文件数据内存数据库pandas核心关系过程
3条回答

我经常以这种方式使用几十亿字节的数据 e、 我在磁盘上有一些表,我通过查询读取,创建数据并追加回来。

值得阅读the docslate in this thread来获得一些关于如何存储数据的建议。

会影响数据存储方式的详细信息,如:
尽可能多地提供细节;我可以帮助你建立一个结构。

  1. 数据大小,#行、列、列类型;是否追加 行还是列?
  2. 典型的操作会是什么样子。E、 g.查询列以选择一组行和特定列,然后执行操作(在内存中),创建新列,保存这些列。
    (举个玩具的例子可以让我们提供更具体的建议。)
  3. 处理完之后,你会怎么做?步骤2是临时的,还是可重复的?
  4. 输入平面文件:多少,大致总大小(Gb)。这些是如何组织的,例如按记录组织的?每个文件是否包含不同的字段,或者每个文件中包含所有字段的某些记录?
  5. 您是否曾经根据条件选择行(记录)的子集(例如,选择具有字段A>;5的行)?然后做些什么,还是只选择字段A、B、C和所有记录(然后做些什么)?
  6. 您是否“处理”了所有列(以组为单位),或者是否有一个很好的比例您只能用于报表(例如,您希望保留数据,但在最终结果出来之前不需要明确地拉入该列)?

解决方案

确保已安装pandas at least ^{}

阅读iterating files chunk-by-chunkmultiple table queries

由于pytables被优化为按行操作(这是您查询的内容),我们将为每组字段创建一个表。这样可以很容易地选择一个小字段组(它可以与一个大表一起工作,但是这样做更有效。。。我想我将来也许能解决这个限制。。。无论如何,这更直观):
(以下是伪代码。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读取文件并创建存储(基本上做append_to_multiple所做的事情):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

现在文件中已经有了所有表(实际上,如果愿意,可以将它们存储在单独的文件中,您可能需要将文件名添加到组映射中,但这可能不是必需的)。

这是获取列并创建新列的方式:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

当您准备好进行后期处理时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于数据列,您实际上不需要定义任何数据列;它们允许您根据列子选择行。E、 例如:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最后的报表生成阶段,它们可能是您最感兴趣的(实际上,数据列与其他列是分离的,如果您定义了很多,这可能会在一定程度上影响效率)。

您可能还想:

  • 创建一个函数,该函数接受一个字段列表,在groups_映射中查找组,然后选择这些组并连接结果,以便得到结果帧(这基本上就是select_as_multiple所做的)。这样结构对你来说就相当透明了。
  • 某些数据列上的索引(使行子集设置更快)。
  • 启用压缩。

有问题请告诉我!

现在,在这个问题两年后,有一个“核心外”的熊猫等价物:dask。太棒了!虽然它不支持pandas的所有功能,但是您可以使用它。

我认为上面的答案缺少了一个简单的方法,我发现这个方法非常有用。

当我有一个文件太大而无法加载到内存中时,我会将该文件分解为多个较小的文件(按行或列)

示例:如果30天的交易数据大小约为30GB,我会将其分解为一个每天大小约为1GB的文件。我随后分别处理每个文件并在最后汇总结果

最大的优点之一是它允许并行处理文件(多线程或进程)

另一个优点是文件操作(比如在示例中添加/删除日期)可以通过常规的shell命令来完成,这在更高级/复杂的文件格式中是不可能的

这种方法并不涵盖所有场景,但在很多场景中都非常有用

相关问题 更多 >