需要在python中逐行合并2个大型csv文件

2024-05-23 16:28:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在为一个项目处理2个大的数据集文件。我设法一行一行地清理文件。但是,在尝试应用相同的逻辑来合并基于公共列的两个文件时,它失败了。问题是第二个循环完全运行,然后顶部循环运行(不知道为什么会这样)。我试过用纽比

buys = np.genfromtxt('buys_dtsep.dat',delimiter=",",dtype='str')
clicks = np.genfromtxt('clicks_dtsep.dat',delimiter=",",dtype='str')
f = open('combined.dat', 'w')
for s in clicks:
    for s2 in buys:
      #process data

但是,由于内存限制以及将数据加载到数组并进行处理所需的时间,将3300万条目的文件加载到数组中是不可行的。我试图逐行处理这些文件以避免内存不足。在

^{pr2}$

打印输出应为

file 1 row 0
file 2 row 0
 ...
file 2 row x
file 1 row 1
and so on

我得到的输出是

file 2 row 0
file 2 row 1
...
file 2 row x
file 1 row 0
...
file 1 row z

如果上述循环问题可以解决,我将能够合并文件逐行。在

更新:示例数据

购买文件样本

420374,2014-04-06,18:44:58.314,214537888,12462,1
420374,2014-04-06,18:44:58.325,214537850,10471,1
281626,2014-04-06,09:40:13.032,214535653,1883,1
420368,2014-04-04,06:13:28.848,214530572,6073,1
420368,2014-04-04,06:13:28.858,214835025,2617,1
140806,2014-04-07,09:22:28.132,214668193,523,1
140806,2014-04-07,09:22:28.176,214587399,1046,1

单击文件示例

420374,2014-04-06,18:44:58,214537888,0
420374,2014-04-06,18:41:50,214537888,0
420374,2014-04-06,18:42:33,214537850,0
420374,2014-04-06,18:42:38,214537850,0
420374,2014-04-06,18:43:02,214537888,0
420374,2014-04-06,18:43:10,214537888,0
420369,2014-04-07,19:39:43,214839373,0
420369,2014-04-07,19:39:56,214684513,0

Tags: 文件数据infornpdatfilerow
3条回答

例如,我已经将file替换为StringIO。与文件对象代码看起来一样。在

import  StringIO

file1 = StringIO.StringIO("""420374,2014-04-06,18:44:58.314,214537888,12462,1
420374,2014-04-06,18:44:58.325,214537850,10471,1
281626,2014-04-06,09:40:13.032,214535653,1883,1
420368,2014-04-04,06:13:28.848,214530572,6073,1
420368,2014-04-04,06:13:28.858,214835025,2617,1
140806,2014-04-07,09:22:28.132,214668193,523,1
140806,2014-04-07,09:22:28.176,214587399,1046,1""")

file2 = StringIO.StringIO("""420374,2014-04-06,18:44:58,214537888,0
420374,2014-04-06,18:41:50,214537888,0
420374,2014-04-06,18:42:33,214537850,0
420374,2014-04-06,18:42:38,214537850,0
420374,2014-04-06,18:43:02,214537888,0
420374,2014-04-06,18:43:10,214537888,0
420369,2014-04-07,19:39:43,214839373,0
420369,2014-04-07,19:39:56,214684513,0""")

outfile = StringIO.StringIO()

data1_iter, skip_1 = iter(file1), False
data2_iter, skip_2 = iter(file2), False

while True:
    out = []
    if not skip_1:
        try:
            out.append(next(data1_iter).split()[0])
        except StopIteration:
            skip_1 = True
    if not skip_2:
        try:
            out.append(next(data2_iter).split()[0])
        except StopIteration:
            skip_2 = True            

    outfile.write('\n'.join(out) + "\n")
    if skip_1 and skip_2:
        break

print(outfile.getvalue())

输出:

^{pr2}$

编辑:操作人员想把第二个文件都翻一遍,所以我改了答案

你在第一个文件的第一行上循环,然后在第二个文件中循环。 你的内部循环只能工作一次,因为csv_购买迭代器将在第一次循环的第一次运行中被消耗。在

for s in csv_clicks:  # < - looping over the 1st file works fine
    print 'file 1 row x'#to check when it loops
    for s2 in csv_buys: #< - loops all over the 2nd one and finish the iterator! this loop will ONLY work once!
        print s2[0] #check looped data  
         #do merge op

你需要做的是:

^{pr2}$

警告:以上代码的复杂性为O^2。在

如果你的脚本会很慢(而且会很慢),你就必须想出一个不同的解决方案

下面的方法有望有所帮助。它的设计速度更快,并减少内存需求:

from heapq import merge
from itertools import groupby, ifilter

def get_click_entries(key):
    with open('clicks.csv', 'rb') as f_clicks:
        for entry in ifilter(lambda x: int(x[0]) == key, csv.reader(f_clicks)):
            entry.insert(4, '')  # add empty missing column
            yield entry

# First create a set holding all column 0 click entries

with open('clicks.csv', 'rb') as f_clicks:
    csv_clicks = csv.reader(f_clicks)
    click_keys = {int(cols[0]) for cols in csv_clicks}

with open('buys.csv', 'rb') as f_buys, \
    open('clicks.csv', 'rb') as f_clicks,   \
    open('merged.csv', 'wb') as f_merged:

    csv_buys = csv.reader(f_buys)
    csv_clicks = csv.reader(f_clicks)
    csv_merged = csv.writer(f_merged)

    for k, g in groupby(csv_buys, key=lambda x: int(x[0])):
        if k in click_keys:
            buys = sorted(g, key=lambda x: (x[1], x[2]))
            clicks = sorted(get_click_entries(k), key=lambda x: (x[1], x[2]))
            csv_merged.writerows(merge(buys, clicks))       # merge the two lists based on the timestamp
            click_keys.remove(k)
        csv_merged.writerows(g)

    # Write any remaining click entries

    for k in click_keys:
        csv_merged.writerows(get_click_entries(k))

对于两个示例文件,将生成以下输出:

^{pr2}$

它的工作原理是首先创建一组列0中的所有条目,然后这意味着您可以避免在知道条目不存在的情况下重新读取整个click文件。然后,它尝试从buys读取一组匹配的列0项,并从clicks读取列0项的对应列表。然后根据时间戳对它们进行排序,并按顺序合并在一起。然后从集合中删除此项,这样就不会重新读取它们。在

相关问题 更多 >