在mrjob执行期间解压缩+解压缩输入文件

2024-03-28 11:15:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用mrjob(使用EMR)高效地处理S3中的大量数据。我可以以任何方式构造数据,但很明显,我希望尽我所能发挥EMR在S3数据上运行的优势。在

我的数据由数百万个网页组成(比如说,每5万个)。直觉上,我觉得创建一个集合是有意义的。焦油gz每个文件都有数千页,因此.tgz文件大小约为2GB。然后,我想将这些.tgz文件加载到S3上,并编写一个mrjob任务来处理这些文件(比如,在10个EC2实例上)。在

我对构建这些.tgz文件很感兴趣,因为它们代表一种非常压缩的数据形式,因此它们应该最小化网络流量(大小和传输延迟)。我还喜欢构建多个.tgz文件,因为我显然希望利用计划为作业分配的多个EMR实例。在

如果必须的话,我可以对文件进行压缩,这样就避免了归档(tar)步骤,而只处理.gz文件,但只对原始数据进行tar压缩会更容易。在

我的想法是正确的吗?如果是这样的话,我如何配置/指定mrjob来解压和解压,以便一个实例只处理其中一个.tgz文件的全部?在


Tags: 文件数据实例网页s3方式tar意义
1条回答
网友
1楼 · 发布于 2024-03-28 11:15:50

我强烈建议您不要使用所描述的与EMR配对的方法,因为它显然违反了map-reduce范式。您可能会克服网络流量瓶颈,如果有的话,但将有困难的时间负载平衡映射程序。Map-reduce方法是记录处理,当记录量无限增长时,它可以很好地伸缩。也就是说,您的体系结构是可行的,但最好与不同的任务处理工具一起使用,例如使用celery。在

如果您希望在map reduce范例(使用EMR和mrjob)中处理数据,您可以压缩并对每个页面进行base64编码,以确保页面作为一行存储在文本文件中。有关工作示例,请参阅以下mrjob兼容协议:

import cPickle
import zlib
import base64

from mrjob import protocol

class CompressedPickleProtocol(protocol.PickleProtocol):
    """
    Protocol that compresses pickled `key` and `value` with 
    `zlib.compress` and encodes result with `base64.b64encode`.
    """

    @classmethod
    def encode(cls, value):
        return base64.b64encode(value)

    @classmethod
    def decode(cls, raw_value):
        return base64.b64decode(raw_value)

    @classmethod
    def _writes(cls, value):
        return cls.encode(zlib.compress(cPickle.dumps(value)))

    @classmethod
    def _reads(cls, raw_value):
        return cPickle.loads(zlib.decompress(cls.decode(raw_value)))

    @classmethod
    def read(cls, line):
        raw_key, raw_value = line.split('\t', 1)
        return (cls._reads(raw_key), cls._reads(raw_value))

    @classmethod
    def write(cls, key, value):
        return '%s\t%s' % (cls._writes(key),
                           cls._writes(value))

相关问题 更多 >