GAE Python上的Mapreduce导致reducepiline在finalize时发出回调?

2024-05-15 22:03:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在mapreduce作业完成后执行一个自定义回调函数。在

对于这个问题,我找到的唯一有用的引用是a somewhat outdated Google site和一个相关的,但似乎又过时了的Stackoverflow question。在

在cd4{1>中,指定一个参数的run方法生成一个Mapreduce管道:

yield mapreduce_pipeline.MapreducePipeline(
    "word_count",
    "main.word_count_map",
    "main.word_count_reduce",
    "mapreduce.input_readers.BlobstoreZipInputReader",
    "mapreduce.output_writers.BlobstoreOutputWriter",
    mapper_params={
        "blob_key": blobkey,
    },
    reducer_params={
        "mime_type": "text/plain",
    },
    shards=16)

MapReducepLine的签名不允许mapreduce_parameters参数。我只能在源代码中看到回调引用出现在mapper_pipeline.MapperPipeline.run中,但它似乎只在内部使用。在

那么,有没有一种方法可以在里面获取回调参数?在

如果没有,是否有人对如何扩展库以提供这样的功能有很好的想法?在


Tags: 方法函数run参数pipelinemaincountgoogle
2条回答

我将Mapreduce管道范例设置为如下所示:

class MRRecalculateSupportsPipeline(base_handler.PipelineBase):

    def run(self, user_key):
        # ...
        yield mapreduce_pipeline.MapreducePipeline('user_recalculate_supports',
                'myapp.mapreduces.user_recalculate_supports_map',
                'myapp.mapreduces.user_recalculate_supports_reduce',
                'mapreduce.input_readers.DatastoreInputReader', output_writer_spec=None,
                mapper_params={"""..."""})

如果您想捕获此管道的完成情况,您有两个选项。在

A)使用管道。之后在MR管道完成后运行完成管道。在

^{pr2}$

{1>使用top}方法处理^-1>的完成。就个人而言,我还是选择A选项,因为您可以在/_ah/*/status?root=视图中跟踪路径。在

class EmailNewReleasePipeline(base_handler.PipelineBase):
    """Email followers about a new release"""
    # TODO: product_key is the name of the parameter, but it's built for albums ...

    def run(self, product_key, testing=False):
            # Send those emails ...
            yield mapreduce_pipeline.MapreducePipeline(...)

    def finalized(self):
        """Save product as launched"""
        ...
        product.launched = True
        product.put()

这是finalization of a pipeline上的文档。在

对于这个问题,至少一个不需要太多投资的解决方法是简单地生成另一个Map/Mapreduce管道来完成所需的后处理。在

例如:

class MainPipeline(base_handler.PipelineBase):
    def run(self):
        mapper_params = { ... }
        reducer_params = { ... }
        yield mapreduce_pipeline.MapReducePipeline(
            ...,
            mapper_params=mapper_params,
            reducer_params=reducer_params)
        yield PostprocessPipeline(reducer_params)


class PostprocessPipeline(base_handler.PipelineBase):
    def run(self, reducer_params):
        do_some_postprocessing(reducer_params)

该解决方案没有访问Mapreduce状态的权限,我认为可以从管道ID中检索到Mapreduce状态,但我还不清楚如何进行检索。因此,您必须设置另一个标志/memcache/ds条目来检查管道是否成功完成(如果这与后处理相关)。在

相关问题 更多 >