Python中的不同并行模块
最近我需要处理非常大的日志数据(压缩后有700GB),性能问题非常关键。考虑到我工作的环境(8个核心),我在想利用并行编程来提高性能。现在我使用的是内置的多进程库,性能有所提升,但我希望能更好。我听说还有很多其他的并行编程库,比如pp。
所以我想问问,这些模块之间有什么区别?有没有哪个比其他的更好?
3 个回答
这里有一些你可能在寻找的解决方案。
http://groups.google.com/group/comp.lang.python/browse_thread/thread/fe55f38c64f58a9d/
http://wiki.python.org/moin/ParallelProcessing
你可以在Python中使用 multiprocessing
模块。
http://pypi.python.org/pypi/multiprocessing/
http://www.ibm.com/developerworks/aix/library/au-multiprocessing/
在执行操作时,不要 unzip
或 extract
文件。Python
支持直接访问 7z
文件,而不需要解压。
首先,有几个问题:
- 700GB的压缩数据,解压后有多少?
- 有多少个文件?
- 你想用这些日志做什么?我们怎么能分工合作?
我觉得你应该考虑使用MapReduce来处理这么大规模的数据。
为了举个例子,我假设你有800GB的压缩广告服务器事件日志数据,你想做一些简单的事情,比如统计这个数据集中独立用户的数量。对于这么多数据和这种处理方式,使用多进程会有帮助,但使用MapReduce会让你更快地完成任务。我建议你了解一下EMR和MrJob或者Dumbo。做一些简单的处理工作,比如用户计数,可以帮助你验证这个过程,并开始从映射器和归约器的角度思考问题。虽然理解更复杂的任务需要一些时间,但如果你打算长时间处理这么大规模的数据,这样的投资是值得的。
举个例子,统计独立用户的过程会从一个映射器开始,它会简单地读取每一行广告服务器的数据,并输出用户ID(比如cookieID、IP地址,或者其他可以区分用户的信息)。你还会有一个归约器,它会接收这些用户ID作为输入,并去除或统计重复的ID。
当然,一旦你决定尝试这个方法,还有很多工作要做。比如准备数据(将大文件拆分或将小文件组合成块,以便有效分配工作,存储数据为未压缩格式或EMR的Hadoop能理解的压缩格式),调整Hadoop的参数以适应可用资源和你的算法,上传数据到s3等等。
好消息是,你实际上可以在几个小时内处理800GB的数据。
下面是一个简单的Python MapReduce示例:
这是日志文件的格式:
AuctionID\tUserID\tSiteID\tURL\tUserAgent\tTimestamp
它只是一个简单的制表符分隔值(tsv)文件。
所以我们将写一个简单的映射器,从标准输入读取这样的行,并将用户ID写入标准输出。
import sys
def usercount_mapper(input):
for line in input:
line = line.strip()
parts = line.split("\t")
user_id = parts[1]
print "%s\t%s"%(user_id, 1)
if __name__=="__main__":
usercount_mapper(sys.stdin)
还有一个简单的归约器实现,用于统计独立的用户ID:
import sys
user_ids = {}
def usercount_reducer(input):
for line in input:
line = line.strip()
user_id, count = line.split("\t")
try:
count = int(count)
except ValueError:
continue
current_count = user_ids.get(user_id, 0)
user_ids[user_id] = current_count + count
for user_id, count in user_ids.iteritems():
print "%s\t%s"%(user_id, count)
if __name__=="__main__":
usercount_reducer(sys.stdin)
你可以在一小块数据上运行这个,先在本地测试一下,只需执行:
$ cat mydata.tsv | map.py | sort | reduce.py > result.tsv
MapReduce框架(如果你使用EMR就是Hadoop)会负责运行多个映射和归约任务,并在将数据交给归约器之前对映射器的数据进行排序。为了让归约器能够完成工作,MR框架还会对键值(来自映射器的制表符分隔输出中的第一个值,这里是用户ID)进行哈希处理,并将具有相同哈希值的映射器分配给同一个归约器。这样,用户ID为4的记录总是会被送到归约器1,ID为5的记录会送到归约器2,依此类推。
如果你想自己构建一些东西,可以直接看看Disco(Disco是用Python和Erlang写的,如果你对Java过敏,这可能是个不错的选择 :-)),或者Hadoop,这样你就可以自己搭建MapReduce基础设施,而不是使用EMR。在Hadoop/EMR的世界里,还有一些很酷的数据处理平台,比如Hive(类似SQL的环境,用于描述数据和MapReduce算法)或Pig(像grep和awk的增强版),这些可能比上面的脚本更适合你。
例如,如果你在Hive中定义了你的数据结构,你可以写以下查询来获取独立用户(假设你之前定义了一个用户表):
SELECT DISTINCT users.user_id FROM users;