链接地图减少了googleappengin

2024-04-19 18:14:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图把reduce的输出发送到一个map,通过在管道中链接,类似于这个家伙: I would like to chain multiple mapreduce jobs in google app engine in Python 我试过他的解决办法,但没用。 我的管道流是:
地图1
减少1
地图2
减少2
我将Reduce1的输出保存到blob\u键下的blobstore,然后尝试从Map2访问blob。但是我在执行第二个映射时遇到了以下错误:"BadReaderParamsError: Could not find blobinfo for key <blob_key here>"。你知道吗

管道代码如下:

class SongsPurchasedTogetherPipeline(base_handler.PipelineBase):

  def run(self, filekey, blobkey):
    bucket_name = app_identity.get_default_gcs_bucket_name()
    intermediate_output = yield mapreduce_pipeline.MapreducePipeline(
        "songs_purchased_together_intermediate",
        "main.songs_purchased_together_map1",
        "main.songs_purchased_together_reduce1",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.GoogleCloudStorageOutputWriter",
        mapper_params={
            "blob_keys": blobkey,
        },
        reducer_params={
            "output_writer": {
                "bucket_name": bucket_name,
                "content_type": "text/plain",
            }
        },
        shards=1)
    yield StoreOutput("SongsPurchasedTogetherIntermediate", filekey, intermediate_output)

    intermediate_output_key = yield BlobKey(intermediate_output)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "songs_purchased_together",
        "main.songs_purchased_together_map2",
        "main.songs_purchased_together_reduce2",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.GoogleCloudStorageOutputWriter",
        mapper_params=(intermediate_output_key),
        reducer_params={
            "output_writer": {
                "bucket_name": bucket_name,
                "content_type": "text/plain",
            }
        },
        shards=1)
    yield StoreOutput("SongsPurchasedTogether", filekey, output)

下面是BlobKey类,它获取中间输出并生成blob键供Map2使用:

class BlobKey(base_handler.PipelineBase):

  def run(self, output):
    blobstore_filename = "/gs" + output[0]
    blobstore_gs_key = blobstore.create_gs_key(blobstore_filename)
    return {
      "blob_keys": blobstore_gs_key
    }

StoreOutput类与Google的MapReduce demo https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/demo/main.py中的类相同,并且与BlobKey类做相同的事情,但是还将blob的URL作为链接发送到HTML。你知道吗

手动访问URL appname/blobstore/<blob_key>,通过在浏览器中键入它(在Reduce1成功,但Map2失败之后)显示Reduce1的预期输出。为什么Map2找不到blob?抱歉,我是一个新手,我可能会出错的地方,因为我不完全了解blob存储。你知道吗


Tags: keynameoutputbucketmainparamsblobyield
1条回答
网友
1楼 · 发布于 2024-04-19 18:14:41

好吧,我发现Google已经从GAE GitHub存储库的标准编写器列表中删除了BlobstoreOutputWriter,这让事情变得有点复杂。我不得不给谷歌云商店写信,然后在那里阅读。我编写了一个helper类,它为GoogleCloudStorageInputReader生成映射器参数。你知道吗

class GCSMapperParams(base_handler.PipelineBase):

  def run(self, GCSPath):
    bucket_name = app_identity.get_default_gcs_bucket_name()
    return {
            "input_reader": {
                "bucket_name": bucket_name,
                "objects": [path.split('/', 2)[2] for path in GCSPath],
            }
        }

该函数将使用GoogleCloudStorageOutputWriter的一个MapReduce阶段的输出作为参数,并返回一个字典,该字典可以分配给下一个MapReduce阶段的映射器参数。你知道吗

基本上,第一个MapReduce阶段的输出值是一个包含<app_name>/<pipeline_name>/key/output-[i]的列表,其中i是碎片的数量。为了使用GoogleCloudStorageInputReader,数据的键应该通过mapper_params中的变量objects传递。键的形式必须是key/output-[i],因此helper类只是从中删除<app_name>/<pipeline_name>/。你知道吗

相关问题 更多 >