Python中的不同并行模块

1 投票
3 回答
663 浏览
提问于 2025-04-16 23:55

最近我需要处理非常大的日志数据(压缩后有700GB),性能问题非常关键。考虑到我工作的环境(8个核心),我在想利用并行编程来提高性能。现在我使用的是内置的多进程库,性能有所提升,但我希望能更好。我听说还有很多其他的并行编程库,比如pp。

所以我想问问,这些模块之间有什么区别?有没有哪个比其他的更好?

3 个回答

0

这里有一些你可能在寻找的解决方案。
http://groups.google.com/group/comp.lang.python/browse_thread/thread/fe55f38c64f58a9d/
http://wiki.python.org/moin/ParallelProcessing

你可以在Python中使用 multiprocessing 模块。
http://pypi.python.org/pypi/multiprocessing/
http://www.ibm.com/developerworks/aix/library/au-multiprocessing/

在执行操作时,不要 unzipextract 文件。Python 支持直接访问 7z 文件,而不需要解压。

0

你可以去看看 Celery

在他们的网站上说:

Celery 是一个异步任务队列/工作队列,基于分布式消息传递。它主要用于实时操作,但也支持定时任务。执行的单位叫做任务,这些任务可以在一个或多个工作服务器上同时执行,使用多进程、Eventlet 或 gevent。任务可以异步执行(在后台运行)或者同步执行(等到准备好再运行)。

另外,你也可以看看 EventletGevent 这两个 Python 库。

3

首先,有几个问题:

  • 700GB的压缩数据,解压后有多少?
  • 有多少个文件?
  • 你想用这些日志做什么?我们怎么能分工合作?

我觉得你应该考虑使用MapReduce来处理这么大规模的数据。

为了举个例子,我假设你有800GB的压缩广告服务器事件日志数据,你想做一些简单的事情,比如统计这个数据集中独立用户的数量。对于这么多数据和这种处理方式,使用多进程会有帮助,但使用MapReduce会让你更快地完成任务。我建议你了解一下EMRMrJob或者Dumbo。做一些简单的处理工作,比如用户计数,可以帮助你验证这个过程,并开始从映射器和归约器的角度思考问题。虽然理解更复杂的任务需要一些时间,但如果你打算长时间处理这么大规模的数据,这样的投资是值得的。

举个例子,统计独立用户的过程会从一个映射器开始,它会简单地读取每一行广告服务器的数据,并输出用户ID(比如cookieID、IP地址,或者其他可以区分用户的信息)。你还会有一个归约器,它会接收这些用户ID作为输入,并去除或统计重复的ID。

当然,一旦你决定尝试这个方法,还有很多工作要做。比如准备数据(将大文件拆分或将小文件组合成块,以便有效分配工作,存储数据为未压缩格式或EMR的Hadoop能理解的压缩格式),调整Hadoop的参数以适应可用资源和你的算法,上传数据到s3等等。

好消息是,你实际上可以在几个小时内处理800GB的数据。

下面是一个简单的Python MapReduce示例:

这是日志文件的格式:

AuctionID\tUserID\tSiteID\tURL\tUserAgent\tTimestamp

它只是一个简单的制表符分隔值(tsv)文件。

所以我们将写一个简单的映射器,从标准输入读取这样的行,并将用户ID写入标准输出。

import sys

def usercount_mapper(input):
    for line in input:
        line = line.strip()
        parts = line.split("\t")
        user_id = parts[1]
        print "%s\t%s"%(user_id, 1)

if __name__=="__main__":
    usercount_mapper(sys.stdin)

还有一个简单的归约器实现,用于统计独立的用户ID:

import sys

user_ids = {}
def usercount_reducer(input):
    for line in input:
        line = line.strip()
        user_id, count = line.split("\t")
        try:
            count = int(count)
        except ValueError:
            continue
        current_count = user_ids.get(user_id, 0)
        user_ids[user_id] = current_count + count

    for user_id, count in user_ids.iteritems():
        print "%s\t%s"%(user_id, count)

if __name__=="__main__":
    usercount_reducer(sys.stdin)

你可以在一小块数据上运行这个,先在本地测试一下,只需执行:

$ cat mydata.tsv | map.py | sort | reduce.py > result.tsv

MapReduce框架(如果你使用EMR就是Hadoop)会负责运行多个映射和归约任务,并在将数据交给归约器之前对映射器的数据进行排序。为了让归约器能够完成工作,MR框架还会对键值(来自映射器的制表符分隔输出中的第一个值,这里是用户ID)进行哈希处理,并将具有相同哈希值的映射器分配给同一个归约器。这样,用户ID为4的记录总是会被送到归约器1,ID为5的记录会送到归约器2,依此类推。

如果你想自己构建一些东西,可以直接看看Disco(Disco是用Python和Erlang写的,如果你对Java过敏,这可能是个不错的选择 :-)),或者Hadoop,这样你就可以自己搭建MapReduce基础设施,而不是使用EMR。在Hadoop/EMR的世界里,还有一些很酷的数据处理平台,比如Hive(类似SQL的环境,用于描述数据和MapReduce算法)或Pig(像grep和awk的增强版),这些可能比上面的脚本更适合你。

例如,如果你在Hive中定义了你的数据结构,你可以写以下查询来获取独立用户(假设你之前定义了一个用户表):

SELECT DISTINCT users.user_id FROM users;

撰写回答