在大数据上使用多进程或Hadoop加速Python-Pandas脚本
我正在处理一个很大的CSV数据文件,这个文件里有几个字段:user_id
(用户ID)、timestamp
(时间戳)、category
(类别)。我想为每个用户的每个类别计算分数。
我首先把CSV文件分成小块,然后对这些小块进行分组,分组的依据是用户ID的最后两个数字。这样我就能把用户分成100组,并把这些组存储在一个HDF5的数据库里。
接下来,我用一个大的循环来处理这些存储的文件,一个接一个地处理。对于每个文件,我先按用户ID分组,然后计算每个用户的分数。最后,我会生成一个输出的CSV文件,每一行对应一个用户,里面包含了他的所有分数。
我发现这个主要的循环在我个人电脑上需要4个小时,我想加快这个过程,因为看起来这个过程是可以并行处理的。我该怎么做呢?我考虑过使用multiprocessing
(多进程)或者hadoop streaming
,哪种方法更好呢?
以下是我的(简化版)代码:
def sub_group_hash(x):
return x['user_id'].str[-2:]
reader = read_csv('input.csv', chunksize=500000)
with get_store('grouped_input.h5') as store:
for chunk in reader:
groups = chunk.groupby(sub_group_hash(chunk))
for grp, grouped in groups:
store.append('group_%s' % grp, grouped,
data_columns=['user_id','timestamp','category'])
with open('stats.csv','wb') as outfile:
spamwriter = csv.writer(outfile)
with get_store('grouped_input.h5') as store:
for grp in store.keys(): #this is the loop I would like to parallelize
grouped = store.select(grp).groupby('user_id')
for user, user_group in grouped:
output = my_function(user,user_group)
spamwriter.writerow([user] + output)
1 个回答
我建议使用多线程。线程库非常简单易懂。你可以查看这个链接了解更多:https://docs.python.org/3/library/threading.html#thread-objects
我有点不太明白你说的主循环是什么意思,但我猜应该是你提到的所有过程。如果是这样的话,可以把它放在一个定义里,用更简单的方式来处理。
import thread
t1 = threading.thread(process, ("any", "inputs"))
t1.start()
这里有一个不错的教程,可以帮助你理解。它还展示了一些更高级的多线程技巧,如果你对Python比较熟悉的话,可以尝试一下。链接在这里:http://www.tutorialspoint.com/python/python_multithreading.htm
需要注意的是,当你写入文件时,不想让所有的进程同时写入文件。幸运的是,你可以使用锁来创建一个控制点。使用acquire()
和release()
这两个函数可以确保一次只有一个线程在写入。
另外,要注意你电脑上有多少个核心。如果你运行的线程数量超过了电脑的核心数,那么每个线程就得等待CPU的时间,这样速度提升就不明显了。而且,如果你创建了无限数量的进程,可能会导致电脑崩溃。