在Hadoop流处理中的TotalOrderPartitioner使用
我正在用Python和Hadoop流处理做一个项目,我需要一些和Hadoop里的TotalOrderPartitioner
和InputSampler
类似的功能。简单来说,我需要先对数据进行抽样,创建一个分区文件,然后用这个分区文件来决定哪个键值对会被送到哪个Reducer。在这个项目中,我需要使用Hadoop 1.0.4版本。
我只找到了一些使用KeyFieldBasedPartitioner
和自定义分区器的Hadoop流处理示例,这些示例通过命令中的-partitioner
选项告诉Hadoop使用这些分区器。我找到的使用TotalOrderPartitioner
和InputSampler
的示例都是用Java写的,它们需要用到InputSampler
的writePartitionFile()
方法和DistributedCache
类来完成这个工作。所以我在想,是否可以在Hadoop流处理里使用TotalOrderPartitioner
?如果可以,我该如何组织我的代码来使用它?如果不行,先用Python实现一个总分区器再使用它是否可行?
2 个回答
在Hadoop流处理(streaming)中使用TotalOrderPartitioner的一种方法是,稍微修改一下它的代码,从环境变量中获取分区文件的路径名,然后编译它。你需要在你的系统上定义这个环境变量,并通过-cmdenv选项将它的名字传递给流处理任务(具体可以参考这个链接)。
TotalOrderPartitioner的源代码可以在这里找到。在代码中,getPartitionFile()这个方法从第143行开始定义,第二行显示如果没有传入参数,它会使用DEFAULT_PATH作为分区文件的名字。DEFAULT_PATH在第54行被定义为"_partition.lst",第83行有个注释说明它假设这个文件在DistributedCache中。因此,只要在DistributedCache中有这个文件,就可以不修改getPartitionFile(),直接使用_partition.lst作为分区文件名。
接下来就是如何运行InputSampler来写入分区文件的问题。我认为最好的办法是运行一个已经写好的Java MapReduce任务,这个任务使用了TotalOrderPartitioner,至少可以用来获取InputSampler的输出示例,以确定它的格式。如果这个示例任务可以修改以处理你想要的数据类型,那么你就可以用它来创建一个适合你需求的分区文件。有几个使用TotalOrderPartitioner的MapReduce示例任务可以参考,分别是TotalOrderSorting.java和TotalSortMapReduce.java。
另外,在twittomatic项目中,有一个简单的自定义分区器IntervalPartitioner.java,其中分区文件的路径名被硬编码为/partitions.lst。在sorter目录下,还有一个脚本sample.sh,它使用Hadoop、实时的Twitter数据流和sample.py来构建partition.lst。你可以很容易地根据自己的需求调整这个系统,首先可以把Twitter数据流替换为你的数据样本。
我没有尝试过,但可以用一个例子来说明。假设你在使用 KeyFieldBasedPartitioner,然后只需要把:
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
换成:
-partitioner org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner
这样应该就可以了。