处理从相关数据库导出数据框对象缺失数据的更好方案

1 投票
1 回答
1371 浏览
提问于 2025-04-17 18:15

几天前,我在网上发了个问题,问的是如何让pandas的HDFStore的“放入”操作更快。感谢Jeff的回答,我找到了一个更有效的方法来从数据库提取数据并存储到hdf5文件中。

不过通过这种方法,我需要根据每一列的数据类型来填补缺失的数据,并且要在每个表上都做这些工作(大多数情况下,这个工作是重复的)。否则,dataframe中的None对象在我把数据放入hdf5文件时会导致性能问题。

有没有更好的方法来完成这个工作呢?

我刚刚读了这个问题“增强:sql提供NaN/NaT转换”

  • NaT能和其他类型一起使用吗?(除了datetime64)
  • 我可以用它来替换dataframe中的所有None对象,而不用担心在存储dataframe到hdf5文件时出现性能问题吗?

更新1

  • pd.版本: 0.10.1
  • 我现在用np.nan来填补缺失的数据。但我遇到了两个问题。
    • 包含np.nan和datetime.datetime对象的列无法转换为'datetime64[ns]'类型,放入hdfstore时会引发异常。

    In [155]: len(df_bugs.lastdiffed[df_bugs.lastdiffed.isnull()])
    Out[155]: 150

    In [156]: len(df_bugs.lastdiffed)
    Out[156]: 1003387

    In [158]: df_bugs.lastdiffed.astype(df_bugs.creation_ts.dtype)

    ---------------------------------------------------------------------------
    ValueError                                Traceback (most recent call last)
     in ()
    ----> 1 df_bugs.lastdiffed.astype(df_bugs.creation_ts.dtype)

    /usr/local/lib/python2.6/dist-packages/pandas-0.10.1-py2.6-linux-x86_64.egg/pandas/core/series.pyc in astype(self, dtype)
        777         See numpy.ndarray.astype
        778         """
    --> 779         casted = com._astype_nansafe(self.values, dtype)
        780         return self._constructor(casted, index=self.index, name=self.name)
        781 

    /usr/local/lib/python2.6/dist-packages/pandas-0.10.1-py2.6-linux-x86_64.egg/pandas/core/common.pyc in _astype_nansafe(arr, dtype)
       1047     elif arr.dtype == np.object_ and np.issubdtype(dtype.type, np.integer):
       1048         # work around NumPy brokenness, #1987
    -> 1049         return lib.astype_intsafe(arr.ravel(), dtype).reshape(arr.shape)
       1050 
       1051     return arr.astype(dtype)

    /usr/local/lib/python2.6/dist-packages/pandas-0.10.1-py2.6-linux-x86_64.egg/pandas/lib.so in pandas.lib.astype_intsafe (pandas/lib.c:11886)()

    /usr/local/lib/python2.6/dist-packages/pandas-0.10.1-py2.6-linux-x86_64.egg/pandas/lib.so in util.set_value_at (pandas/lib.c:44436)()

    ValueError: Must be a datetime.date or datetime.datetime object


        # df_bugs_sample1 = df_bugs.ix[:10000]
    In [147]: %prun store.put('df_bugs_sample1', df_bugs_sample1, table=True)

    /usr/local/lib/python2.6/dist-packages/pandas-0.10.1-py2.6-linux-x86_64.egg/pandas/io/pytables.pyc in put(self, key, value, table, append, **kwargs)
        456             table
        457         """
    --> 458         self._write_to_group(key, value, table=table, append=append, **kwargs)
        459 
        460     def remove(self, key, where=None, start=None, stop=None):

    /usr/local/lib/python2.6/dist-packages/pandas-0.10.1-py2.6-linux-x86_64.egg/pandas/io/pytables.pyc in _write_to_group(self, key, value, index, table, append, complib, **kwargs)
        786             raise ValueError('Compression not supported on non-table')
        787 
    --> 788         s.write(obj = value, append=append, complib=complib, **kwargs)
        789         if s.is_table and index:
        790             s.create_index(columns = index)

    /usr/local/lib/python2.6/dist-packages/pandas-0.10.1-py2.6-linux-x86_64.egg/pandas/io/pytables.pyc in write(self, obj, axes, append, complib, complevel, fletcher32, min_itemsize, chunksize, expectedrows, **kwargs)
       2489         # create the axes
       2490         self.create_axes(axes=axes, obj=obj, validate=append,
    -> 2491                          min_itemsize=min_itemsize, **kwargs)
       2492 
       2493         if not self.is_exists:

    /usr/local/lib/python2.6/dist-packages/pandas-0.10.1-py2.6-linux-x86_64.egg/pandas/io/pytables.pyc in create_axes(self, axes, obj, validate, nan_rep, data_columns, min_itemsize, **kwargs)
       2252                 raise
       2253             except (Exception), detail:
    -> 2254                 raise Exception("cannot find the correct atom type -> [dtype->%s,items->%s] %s" % (b.dtype.name, b.items, str(detail)))
       2255             j += 1
       2256 

    Exception: cannot find the correct atom type -> [dtype->object,items->Index([bug_file_loc, bug_severity, bug_status, cf_branch, cf_bug_source, cf_eta, cf_public_severity, cf_public_summary, cf_regression, cf_reported_by, cf_type, guest_op_sys, host_op_sys, keywords, lastdiffed, priority, rep_platform, resolution, short_desc, status_whiteboard, target_milestone], dtype=object)] object of type 'datetime.datetime' has no len()

  • 而且另一个dataframe似乎不能完全放入dataframe中,比如下面的例子,条目数量是13742515,但当我把dataframe放入hdfstore并取出来时,条目数量变成了1041998。这真奇怪~

    In [123]:df_bugs_activity
    Out[123]:
    
    Int64Index: 13742515 entries, 0 to 13742514
    Data columns:
    added        13111366  non-null values
    attach_id    1041998  non-null values
    bug_id       13742515  non-null values
    bug_when     13742515  non-null values
    fieldid      13742515  non-null values
    id           13742515  non-null values
    removed      13612258  non-null values
    who          13742515  non-null values
    dtypes: datetime64[ns](1), float64(1), int64(4), object(2)


    In [121]: %time store.put('df_bugs_activity2', df_bugs_activity, table=True)

    CPU times: user 35.31 s, sys: 4.23 s, total: 39.54 s
    Wall time: 39.65 s

    In [122]: %time store.get('df_bugs_activity2')

    CPU times: user 7.56 s, sys: 0.26 s, total: 7.82 s
    Wall time: 7.84 s
    Out[122]:
    
    Int64Index: 1041998 entries, 2012 to 13354656
    Data columns:
    added        1041981  non-null values
    attach_id    1041998  non-null values
    bug_id       1041998  non-null values
    bug_when     1041998  non-null values
    fieldid      1041998  non-null values
    id           1041998  non-null values
    removed      1041991  non-null values
    who          1041998  non-null values
    dtypes: datetime64[ns](1), float64(1), int64(4), object(2)

更新2

  • 创建dataframe的代码:

    def grab_data(table_name, size_of_page=20000):
        '''
        Grab data from a db table

        size_of_page: the second argument of sql's limit subclass
        '''
        cur.execute('select count(*) from %s' % table_name)
        records_number = cur.fetchone()[0]
        loop_number = records_number / size_of_page + 1
        print '****\nStart Grab %s\n****\nrecords_number: %s\nloop_number: %s' % (table_name, records_number, loop_number)

        start_position = 0
        df = DataFrame()  # WARNING: this dataframe object will contain all records of a table, so BE CAREFUL of the MEMORY USAGE!

        for i in range(0, loop_number):
            sql_export = 'select * from %s limit %s, %s' % (table_name, start_position, size_of_page)
            df = df.append(psql.read_frame(sql_export, conn), verify_integrity=False, ignore_index=True)

            start_position += size_of_page
            print 'start_position: %s' % start_position

        return df

    df_bugs = grab_data('bugs')
    df_bugs = df_bugs.fillna(np.nan)
    df_bugs = df_bugs.convert_objects()

  • df_bugs的结构:

Int64Index: 1003387 entries, 0 to 1003386
Data columns:
alias                  0  non-null values
assigned_to            1003387  non-null values
bug_file_loc           498160  non-null values
bug_id                 1003387  non-null values
bug_severity           1003387  non-null values
bug_status             1003387  non-null values
category_id            1003387  non-null values
cclist_accessible      1003387  non-null values
cf_attempted           102160  non-null values
cf_branch              691834  non-null values
cf_bug_source          1003387  non-null values
cf_build               357920  non-null values
cf_change              324933  non-null values
cf_doc_impact          1003387  non-null values
cf_eta                 7223  non-null values
cf_failed              102123  non-null values
cf_i18n_impact         1003387  non-null values
cf_on_hold             1003387  non-null values
cf_public_severity     1003387  non-null values
cf_public_summary      587944  non-null values
cf_regression          1003387  non-null values
cf_reported_by         1003387  non-null values
cf_reviewer            1003387  non-null values
cf_security            1003387  non-null values
cf_test_id             13475  non-null values
cf_type                1003387  non-null values
cf_viss                1423  non-null values
component_id           1003387  non-null values
creation_ts            1003387  non-null values
deadline               0  non-null values
delta_ts               1003387  non-null values
estimated_time         1003387  non-null values
everconfirmed          1003387  non-null values
found_in_phase_id      1003387  non-null values
found_in_product_id    1003387  non-null values
found_in_version_id    1003387  non-null values
guest_op_sys           1003387  non-null values
host_op_sys            1003387  non-null values
keywords               1003387  non-null values
lastdiffed             1003237  non-null values
priority               1003387  non-null values
product_id             1003387  non-null values
qa_contact             1003387  non-null values
remaining_time         1003387  non-null values
rep_platform           1003387  non-null values
reporter               1003387  non-null values
reporter_accessible    1003387  non-null values
resolution             1003387  non-null values
short_desc             1003387  non-null values
status_whiteboard      1003387  non-null values
target_milestone       1003387  non-null values
votes                  1003387  non-null values
dtypes: datetime64[ns](2), float64(10), int64(19), object(21)

更新3

  • 写入csv和从csv读取:

    In [184]: df_bugs.to_csv('df_bugs.sv')
    In [185]: df_bugs_from_scv = pd.read_csv('df_bugs.sv')
    In [186]: df_bugs_from_scv
    Out[186]:
    
    Int64Index: 1003387 entries, 0 to 1003386
    Data columns:
    Unnamed: 0             1003387  non-null values
    alias                  0  non-null values
    assigned_to            1003387  non-null values
    bug_file_loc           0  non-null values
    bug_id                 1003387  non-null values
    bug_severity           1003387  non-null values
    bug_status             1003387  non-null values
    category_id            1003387  non-null values
    cclist_accessible      1003387  non-null values
    cf_attempted           102160  non-null values
    cf_branch              345133  non-null values
    cf_bug_source          1003387  non-null values
    cf_build               357920  non-null values
    cf_change              324933  non-null values
    cf_doc_impact          1003387  non-null values
    cf_eta                 7223  non-null values
    cf_failed              102123  non-null values
    cf_i18n_impact         1003387  non-null values
    cf_on_hold             1003387  non-null values
    cf_public_severity     1003387  non-null values
    cf_public_summary      588  non-null values
    cf_regression          1003387  non-null values
    cf_reported_by         1003387  non-null values
    cf_reviewer            1003387  non-null values
    cf_security            1003387  non-null values
    cf_test_id             13475  non-null values
    cf_type                1003387  non-null values
    cf_viss                1423  non-null values
    component_id           1003387  non-null values
    creation_ts            1003387  non-null values
    deadline               0  non-null values
    delta_ts               1003387  non-null values
    estimated_time         1003387  non-null values
    everconfirmed          1003387  non-null values
    found_in_phase_id      1003387  non-null values
    found_in_product_id    1003387  non-null values
    found_in_version_id    1003387  non-null values
    guest_op_sys           805088  non-null values
    host_op_sys            806344  non-null values
    keywords               532941  non-null values
    lastdiffed             1003237  non-null values
    priority               1003387  non-null values
    product_id             1003387  non-null values
    qa_contact             1003387  non-null values
    remaining_time         1003387  non-null values
    rep_platform           424213  non-null values
    reporter               1003387  non-null values
    reporter_accessible    1003387  non-null values
    resolution             922282  non-null values
    short_desc             1003287  non-null values
    status_whiteboard      0  non-null values
    target_milestone       423276  non-null values
    votes                  1003387  non-null values
    dtypes: float64(12), int64(20), object(21)

1 个回答

1

我来回答一下自己的问题,也感谢杰夫的帮助。

首先,更新1中的第二个问题(“一个数据框似乎不能完全放入数据框中”)已经被解决了。

而我遇到的最大的问题是处理那些同时包含Python的datetime对象和None对象的列。幸运的是,从0.11-dev版本开始,pandas提供了一种更方便的方法。我在我的项目中使用了下面的代码,并为某些行添加了注释,希望能帮助到其他人 :)

cur.execute('select * from table_name')
result = cur.fetchall()

# For details: http://www.python.org/dev/peps/pep-0249/#description
db_description = cur.description
columns = [col_desc[0] for col_desc in db_description]

# As the pandas' doc said, `coerce_float`: Attempt to convert values to non-string, non-numeric objects (like decimal.Decimal) to floating point
df = DataFrame(result, columns=columns, coerce_float=True)

# dealing the missing data
for column_name in df.columns:
    # Currently, calling function `fillna(np.nan) on a `datetime64[ns]` column will cause an exception
    if df[column_name].dtype.str != '<M8[ns]':
        df[column_name].fillna(np.nan)

# convert the type of columns which both have np.nan and datetime obj from 'object' to 'datetime64[ns]'(short as'<M8[ns]')
# find the table columns whose type is Date or Datetime
column_name_type_tuple = [column[:2] for column in db_description if column[1] in (10, 12)]
# check whose type is 'object'
columns_need_conv = [column_name for column_name, column_type in column_name_type_tuple if str(df[column_name].dtype) == 'object']

# do the type converting
for column_name in columns_need_conv:
    df[column_name] = Series(df[column_name].values, dtype='M8[ns]')

df = df.convert_objects()

在这之后,数据框应该可以适合存储在h5文件中,不再需要使用'pickle'了。

附注:

一些配置:
压缩库:'lzo',压缩级别:1
表1,有7,810,561条记录,包含2个整数列和1个日期时间列,放入操作耗时49秒

表2,有1,008,794条记录,包含4个日期时间列,4个浮点数列,19个整数列,24个对象(字符串)列,放入操作耗时170秒

撰写回答