我试图把reduce的输出发送到一个map,通过在管道中链接,类似于这个家伙:
I would like to chain multiple mapreduce jobs in google app engine in Python
我试过他的解决办法,但没用。
我的管道流是:
地图1
减少1
地图2
减少2
我将Reduce1的输出保存到blob\u键下的blobstore,然后尝试从Map2访问blob。但是我在执行第二个映射时遇到了以下错误:"BadReaderParamsError: Could not find blobinfo for key <blob_key here>"
。你知道吗
管道代码如下:
class SongsPurchasedTogetherPipeline(base_handler.PipelineBase):
def run(self, filekey, blobkey):
bucket_name = app_identity.get_default_gcs_bucket_name()
intermediate_output = yield mapreduce_pipeline.MapreducePipeline(
"songs_purchased_together_intermediate",
"main.songs_purchased_together_map1",
"main.songs_purchased_together_reduce1",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.GoogleCloudStorageOutputWriter",
mapper_params={
"blob_keys": blobkey,
},
reducer_params={
"output_writer": {
"bucket_name": bucket_name,
"content_type": "text/plain",
}
},
shards=1)
yield StoreOutput("SongsPurchasedTogetherIntermediate", filekey, intermediate_output)
intermediate_output_key = yield BlobKey(intermediate_output)
output = yield mapreduce_pipeline.MapreducePipeline(
"songs_purchased_together",
"main.songs_purchased_together_map2",
"main.songs_purchased_together_reduce2",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.GoogleCloudStorageOutputWriter",
mapper_params=(intermediate_output_key),
reducer_params={
"output_writer": {
"bucket_name": bucket_name,
"content_type": "text/plain",
}
},
shards=1)
yield StoreOutput("SongsPurchasedTogether", filekey, output)
下面是BlobKey类,它获取中间输出并生成blob键供Map2使用:
class BlobKey(base_handler.PipelineBase):
def run(self, output):
blobstore_filename = "/gs" + output[0]
blobstore_gs_key = blobstore.create_gs_key(blobstore_filename)
return {
"blob_keys": blobstore_gs_key
}
StoreOutput类与Google的MapReduce demo https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/demo/main.py中的类相同,并且与BlobKey类做相同的事情,但是还将blob的URL作为链接发送到HTML。你知道吗
手动访问URL appname/blobstore/<blob_key>
,通过在浏览器中键入它(在Reduce1成功,但Map2失败之后)显示Reduce1的预期输出。为什么Map2找不到blob?抱歉,我是一个新手,我可能会出错的地方,因为我不完全了解blob存储。你知道吗
好吧,我发现Google已经从GAE GitHub存储库的标准编写器列表中删除了BlobstoreOutputWriter,这让事情变得有点复杂。我不得不给谷歌云商店写信,然后在那里阅读。我编写了一个helper类,它为GoogleCloudStorageInputReader生成映射器参数。你知道吗
该函数将使用GoogleCloudStorageOutputWriter的一个MapReduce阶段的输出作为参数,并返回一个字典,该字典可以分配给下一个MapReduce阶段的映射器参数。你知道吗
基本上,第一个MapReduce阶段的输出值是一个包含
<app_name>/<pipeline_name>/key/output-[i]
的列表,其中i是碎片的数量。为了使用GoogleCloudStorageInputReader,数据的键应该通过mapper_params
中的变量objects
传递。键的形式必须是key/output-[i]
,因此helper类只是从中删除<app_name>/<pipeline_name>/
。你知道吗相关问题 更多 >
编程相关推荐