在大数据上使用多进程或Hadoop加速Python-Pandas脚本

0 投票
1 回答
877 浏览
提问于 2025-04-18 18:42

我正在处理一个很大的CSV数据文件,这个文件里有几个字段:user_id(用户ID)、timestamp(时间戳)、category(类别)。我想为每个用户的每个类别计算分数。

我首先把CSV文件分成小块,然后对这些小块进行分组,分组的依据是用户ID的最后两个数字。这样我就能把用户分成100组,并把这些组存储在一个HDF5的数据库里。

接下来,我用一个大的循环来处理这些存储的文件,一个接一个地处理。对于每个文件,我先按用户ID分组,然后计算每个用户的分数。最后,我会生成一个输出的CSV文件,每一行对应一个用户,里面包含了他的所有分数。

我发现这个主要的循环在我个人电脑上需要4个小时,我想加快这个过程,因为看起来这个过程是可以并行处理的。我该怎么做呢?我考虑过使用multiprocessing(多进程)或者hadoop streaming,哪种方法更好呢?

以下是我的(简化版)代码:

def sub_group_hash(x):
    return x['user_id'].str[-2:]

reader = read_csv('input.csv', chunksize=500000)                                  
with get_store('grouped_input.h5') as store:
    for chunk in reader:
        groups = chunk.groupby(sub_group_hash(chunk))
        for grp, grouped in groups:
            store.append('group_%s' % grp, grouped,
                 data_columns=['user_id','timestamp','category'])

with open('stats.csv','wb') as outfile:
    spamwriter = csv.writer(outfile)
    with get_store('grouped_input.h5') as store:
        for grp in store.keys(): #this is the loop I would like to parallelize
            grouped = store.select(grp).groupby('user_id')
            for user, user_group in grouped:
                output = my_function(user,user_group)
                spamwriter.writerow([user] + output)

1 个回答

0

我建议使用多线程。线程库非常简单易懂。你可以查看这个链接了解更多:https://docs.python.org/3/library/threading.html#thread-objects

我有点不太明白你说的主循环是什么意思,但我猜应该是你提到的所有过程。如果是这样的话,可以把它放在一个定义里,用更简单的方式来处理。

import thread
t1 = threading.thread(process, ("any", "inputs"))
t1.start()

这里有一个不错的教程,可以帮助你理解。它还展示了一些更高级的多线程技巧,如果你对Python比较熟悉的话,可以尝试一下。链接在这里:http://www.tutorialspoint.com/python/python_multithreading.htm

需要注意的是,当你写入文件时,不想让所有的进程同时写入文件。幸运的是,你可以使用锁来创建一个控制点。使用acquire()release()这两个函数可以确保一次只有一个线程在写入。

另外,要注意你电脑上有多少个核心。如果你运行的线程数量超过了电脑的核心数,那么每个线程就得等待CPU的时间,这样速度提升就不明显了。而且,如果你创建了无限数量的进程,可能会导致电脑崩溃。

撰写回答