应用引擎的MapReduce遇到内存限制

8 投票
2 回答
1369 浏览
提问于 2025-04-17 12:54

我正在开发一个使用appengine-mapreduce功能的应用,并且已经修改了示例代码以满足我的需求。基本上,我有超过一百万行数据,格式是:用户ID,时间1,时间2。我的目标是计算每个用户ID的时间1和时间2之间的差值。

但是,当我在Google App Engine上运行这个程序时,在日志部分遇到了以下错误信息:

在处理总共130个请求后,超过了180.56 MB的软私有内存限制。在处理这个请求时,处理该请求的进程被发现使用了过多的内存,因此被终止。这可能会导致下一个请求使用新的进程。如果你经常看到这个信息,可能是你的应用存在内存泄漏。

def time_count_map(data):
  """Time count map function."""
  (entry, text_fn) = data
  text = text_fn()

  try:
    q = text.split('\n')
    for m in q:
        reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
  except IndexError, e:
    logging.debug(e)


def time_count_reduce(key, values):
  """Time count reduce function."""
  time = 0.0
  for subtime in values:
    time += float(subtime)
    realtime = int(time)
  yield "%s: %d\n" % (key, realtime)

有没有人能建议我如何更好地优化我的代码?谢谢!!

编辑:

这是管道处理程序:

class TimeCountPipeline(base_handler.PipelineBase):
  """A pipeline to run Time count demo.

  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """

  def run(self, filekey, blobkey):
    logging.debug("filename is %s" % filekey)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "time_count",
        "main.time_count_map",
        "main.time_count_reduce",
        "mapreduce.input_readers.BlobstoreZipInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_key": blobkey,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=32)
    yield StoreOutput("TimeCount", filekey, output)

Mapreduce.yaml:

mapreduce:
- name: Make messages lowercase
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.lower_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4
- name: Make messages upper case
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.upper_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4

其余的文件和示例完全相同。

我在Dropbox上上传了一份我的代码副本:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

2 个回答

6

你也可以考虑在代码的某些地方定期调用gc.collect()。我看到过很多关于超出软内存限制的问题,通过调用gc.collect()得到了缓解,很多问题都是和blobstore有关的。

2

你的输入文件可能超过了内存的软限制,文件太大了。对于这种大文件,可以使用 BlobstoreLineInputReader 或者 BlobstoreZipLineInputReader

这些输入读取器会给 map 函数传递一些不同的东西,它们会传递文件中的 start_position(起始位置)和文本行。

你的 map 函数可能看起来像这样:

def time_count_map(data):
    """Time count map function."""
    text = data[1]

    try:
        reader = csv.reader([text.replace('\0', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
    except IndexError, e:
        logging.debug(e)

使用 BlobstoreLineInputReader 可以让任务运行得更快,因为它可以使用多个分片,最多可以用到256个。不过,这样的话,你需要上传未压缩的文件,这可能会有点麻烦。我通常是先把压缩文件上传到一个EC2的Windows服务器上,然后再从那里解压并上传,因为上游的带宽很大。

撰写回答