Pandas.io.普通.CParserError:标记数据时出错。C错误:缓冲区溢出捕获到可能格式错误的输入fi

2024-06-06 09:32:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我有大的csv文件,每个文件大小超过10mb和大约50+这样的文件。这些输入有超过25列和超过50K行。在

所有这些都有相同的标题,我试图将它们合并成一个csv,标题只被提及一次。在

选项:一个 代码:适用于小尺寸csv——25+列,但文件大小以kbs为单位。在

import pandas as pd
import glob

interesting_files = glob.glob("*.csv")
df_list = []
for filename in sorted(interesting_files):
    df_list.append(pd.read_csv(filename))

full_df = pd.concat(df_list)

full_df.to_csv('output.csv')

但是上面的代码不适用于较大的文件,并给出了错误。在

错误:

^{pr2}$

代码:列25+但文件大小超过10mb

选项:Two 选项:Three

选项:四个

import pandas as pd
import glob

    interesting_files = glob.glob("*.csv")
    df_list = []
    for filename in sorted(interesting_files):
        df_list.append(pd.read_csv(filename))

    full_df = pd.concat(df_list)

    full_df.to_csv('output.csv')

错误:

Traceback (most recent call last):
  File "merge_large.py", line 6, in <module>
    allFiles = glob.glob("*.csv", sep=None)
TypeError: glob() got an unexpected keyword argument 'sep'

我已经进行了广泛的搜索,但我找不到一个解决方案,将具有相同标题的大型csv文件连接到一个文件中。在

编辑:

代码:

import dask.dataframe as dd  

ddf = dd.read_csv('*.csv')

ddf.to_csv('master.csv',index=False)

错误:

Traceback (most recent call last):
  File "merge_csv_dask.py", line 5, in <module>
    ddf.to_csv('master.csv',index=False)
  File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/core.py", line 792, in to_csv
    return to_csv(self, filename, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/io.py", line 762, in to_csv
    compute(*values)
  File "/usr/local/lib/python2.7/dist-packages/dask/base.py", line 179, in compute
    results = get(dsk, keys, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/dask/threaded.py", line 58, in get
    **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 481, in get_async
    raise(remote_exception(res, tb))
dask.async.ValueError: could not convert string to float: {u'type': u'Point', u'coordinates': [4.34279, 50.8443]}

Traceback
---------
  File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 263, in execute_task
    result = _execute_task(task, data)
  File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 245, in _execute_task
    return func(*args2)
  File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 49, in bytes_read_csv
    coerce_dtypes(df, dtypes)
  File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 73, in coerce_dtypes
    df[c] = df[c].astype(dtypes[c])
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/generic.py", line 2950, in astype
    raise_on_error=raise_on_error, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2938, in astype
    return self.apply('astype', dtype=dtype, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2890, in apply
    applied = getattr(b, f)(**kwargs)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 434, in astype
    values=values, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 477, in _astype
    values = com._astype_nansafe(values.ravel(), dtype, copy=True)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/common.py", line 1920, in _astype_nansafe
    return arr.astype(dtype


Tags: csvinpypandasdflibpackagesusr
1条回答
网友
1楼 · 发布于 2024-06-06 09:32:57

如果我理解你的问题,你有一个大的csv文件的结构,你想合并成一个大的csv文件。在

我的建议是使用Continuum Analytics中的^{}来处理这项工作。你可以合并你的文件,但也可以像熊猫一样执行核心计算和数据分析。在

### make sure you include the [complete] tag
pip install dask[complete]

使用DropBox中的示例数据的解决方案

首先,检查dask的版本。对我来说,dask=0.11.0,pandas=0.18.1

^{pr2}$

这是所有CSV的代码。我使用你的DropBox示例数据没有错误。在

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
import glob

filenames = glob.glob('/Users/linwood/Downloads/stack_bundle/rio*.csv')

'''
The key to getting around the CParse error was using sep=None
Came from this post
http://stackoverflow.com/questions/37505577/cparsererror-error-tokenizing-data
'''

# custom saver function for dataframes using newfilenames
def reader(filename):
    return pd.read_csv(filename,sep=None)

# build list of delayed pandas csv reads; then read in as dask dataframe

dfs = [delayed(reader)(fn) for fn in filenames]
df = dd.from_delayed(dfs)


'''
This is the final step.  The .compute() code below turns the 
dask dataframe into a single pandas dataframe with all your
files merged. If you don't need to write the merged file to
disk, I'd skip this step and do all the analysis in 
dask. Get a subset of the data you want and save that.  
'''
df = df.reset_index().compute()
df.to_csv('./test.csv')

剩下的都是多余的东西

# print the count of values in each column; perfect data would have the same count
# you have dirty data as the counts will show

print (df.count().compute())

下一步是做一些类似熊猫的分析。下面是一些代码,我首先“清理”你的数据为'tweetFavoriteCt'列。所有的数据都不是整数,所以我用“0”替换字符串,并将其他所有数据转换为整数。一旦我得到整数转换,我展示了一个简单的分析,在这里我过滤整个数据帧,只包括favoriteCt大于3的行

# function to convert numbers to integer and replace string with 0; sample analytics in dask dataframe
# you can come up with your own..this is just for an example
def conversion(value):
    try:
        return int(value)
    except:
        return int(0)

# apply the function to the column, create a new column of cleaned data
clean = df['tweetFavoriteCt'].apply(lambda x: (conversion(x)),meta=('stuff',str))

# set new column equal to our cleaning code above; your data is dirty :-(
df['cleanedFavoriteCt'] = clean

最后一段代码展示了dask分析,以及如何将这个合并的文件加载到pandas中,并将合并后的文件写入磁盘。请注意,如果您有大量的csv,当您使用下面的.compute()代码时,它会将合并的csv加载到内存中。在

# retreive the 50 tweets with the highest favorite count 
print(df.nlargest(50,['cleanedFavoriteCt']).compute())

# only show me the tweets that have been favorited at least 3 times
# TweetID 763525237166268416, is VERRRRY popular....7000+ favorites
print((df[df.cleanedFavoriteCt.apply(lambda x: x>3,meta=('stuff',str))]).compute())

'''
This is the final step.  The .compute() code below turns the 
dask dataframe into a single pandas dataframe with all your
files merged. If you don't need to write the merged file to
disk, I'd skip this step and do all the analysis in 
dask. Get a subset of the data you want and save that.  
'''
df = df.reset_index().compute()
df.to_csv('./test.csv')

现在,如果要为合并的csv文件切换到pandas:

import pandas as pd
dff = pd.read_csv('./test.csv')

让我知道这是否有效。在

到此为止

归档:以前的解决方案;很好的使用dask合并csv的例子

第一步是确保安装了dask。有install instructions for ^{} in the documentation page,但这应该有效:

安装了dask后,很容易读入文件。在

先做些家务。假设我们有一个csv目录,其中文件名是my18.csvmy19.csvmy20.csv等。名称标准化和单个目录位置是关键。如果您将csv文件放在一个目录中并以某种方式序列化这些名称,那么这种方法是有效的。在

分步骤:

  1. 导入dask,使用通配符读取所有csv文件。这会将所有csv合并到一个单独的dask.dataframe对象中。如果你愿意的话,你可以在这一步之后马上做熊猫般的手术。在
import dask.dataframe as dd  
ddf = dd.read_csv('./daskTest/my*.csv')
ddf.describe().compute()
  1. 将合并的数据帧文件写入与原始文件相同的目录中的磁盘,并将其命名为master.csv
ddf.to_csv('./daskTest/master.csv',index=False)
  1. 可选,将master.csv读入数据帧对象进行计算。这也可以在上面的第一步之后完成;dask可以对暂存文件执行类似pandas的操作……这是在Python中实现“大数据”的一种方法
# reads in the merged file as one BIG out-of-core dataframe; can perform functions like pangas    
newddf = dd.read_csv('./daskTest/master.csv')

#check the length; this is now length of all merged files. in this example, 50,000 rows times 11 = 550000 rows.
len(newddf)

# perform pandas-like summary stats on entire dataframe
newddf.describe().compute()

希望这有助于回答你的问题。在三个步骤中,您读入所有文件,合并到单个数据帧,然后只使用一个头和所有行将大量数据帧写入磁盘。在

相关问题 更多 >