如何批量上传数据到App Engine数据存储?旧方法无效

16 投票
4 回答
8178 浏览
提问于 2025-04-18 18:43

这个问题应该是很多人都会遇到的,简单来说,就是要把大量数据上传到appengine的数据库里。

不过,之前在stackoverflow上提到的一些老方法(下面有链接)现在似乎都不管用了。以前用DB API上传数据时,bulkloader方法是个不错的选择,但现在用NDB API就不行了。

而且现在bulkloader方法似乎已经被淘汰了,旧的链接虽然还在文档里,但指向的页面都是错的。这里有个例子:

https://developers.google.com/appengine/docs/python/tools/uploadingdata

这个链接现在还在这个页面上:https://developers.google.com/appengine/docs/python/tools/uploadinganapp

那么现在推荐的批量上传数据的方法是什么呢?

看起来有两个可行的替代方案:1)使用remote_api,或者2)把数据写成CSV文件放到GCS桶里,然后再读取。有没有人有成功使用这两种方法的经验?

任何建议都非常感谢!谢谢!

[*下面链接中的解决方案已经不再有效]

[1] 如何批量上传数据到google appengine数据库?

[2] 如何在Google App Engine数据库中插入大量数据?

4 个回答

1

截至2018年,最好的方法是使用新的导入/导出功能

3

你提供的链接中的远程API方法仍然可以正常使用,不过如果数据行超过几百行的话,它的速度会非常慢。

我成功地将GCS和MapReduce框架结合使用,下载数据存储的内容,而不是上传。不过,原理应该是一样的。你可以查看一下mapreduce的文档:实际上你只需要使用mapper这一步,所以你可以定义一个简单的函数,接受CSV中的一行数据,并根据这些数据创建一个数据存储实体。

10

方法一:使用 remote_api

怎么做:写一个 bulkloader.yaml 文件,然后在终端直接用 “appcfg.py upload_data” 命令运行它。我不推荐这个方法,原因有两个:1. 延迟很大 2. 不支持 NDB

方法二:使用 GCS 和 Mapreduce

上传数据文件到 GCS:

可以使用 “storage-file-transfer-json-python” 这个 GitHub 项目中的 chunked_transfer.py 来从本地系统上传文件到 GCS。确保从应用引擎管理控制台生成正确的 “client-secrets.json” 文件。

Mapreduce:

使用 "appengine-mapreduce" 这个 GitHub 项目。把 "mapreduce" 文件夹复制到你的项目顶层文件夹。

然后在你的 app.yaml 文件中添加以下内容:

includes:
  - mapreduce/include.yaml

下面是你的 main.py 文件

import cgi
import webapp2
import logging
import os, csv
from models import DataStoreModel
import StringIO
from google.appengine.api import app_identity
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader

def testmapperFunc(newRequest):
    f = StringIO.StringIO(newRequest)
    reader = csv.reader(f, delimiter=',')
    for row in reader:
        newEntry = DataStoreModel(attr1=row[0], link=row[1])
        yield op.db.Put(newEntry)

class TestGCSReaderPipeline(base_handler.PipelineBase):
    def run(self, filename):
        yield mapreduce_pipeline.MapreducePipeline(
                "test_gcs",
                "testgcs.testmapperFunc",
                "mapreduce.input_readers.FileInputReader",
                mapper_params={
                    "files": [filename],
                    "format": 'lines'
                },
                shards=1)

class tempTestRequestGCSUpload(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())

        bucket = '/gs/' + bucket_name
        filename = bucket + '/' + 'tempfile.csv'

        pipeline = TestGCSReaderPipeline(filename)
        pipeline.with_params(target="mapreducetestmodtest")
        pipeline.start()
        self.response.out.write('done')

application = webapp2.WSGIApplication([
    ('/gcsupload', tempTestRequestGCSUpload),
], debug=True)

记住:

  1. Mapreduce 项目使用了现在已经不再支持的 “Google Cloud Storage Files API”。所以未来可能没有支持。
  2. Mapreduce 会对数据存储的读取和写入增加一点额外的负担。

方法三:使用 GCS 和 GCS 客户端库

  1. 使用上面提到的文件传输方法,把 csv/text 文件上传到 GCS。
  2. 使用 GCS 客户端库(把 'cloudstorage' 文件夹复制到你的应用顶层文件夹)。

在应用的 main.py 文件中添加以下代码。

import cgi
import webapp2
import logging
import jinja2
import os, csv
import cloudstorage as gcs
from google.appengine.ext import ndb
from google.appengine.api import app_identity
from models import DataStoreModel

class UploadGCSData(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())
        bucket = '/' + bucket_name
        filename = bucket + '/tempfile.csv'
        self.upload_file(filename)

    def upload_file(self, filename):
        gcs_file = gcs.open(filename)
        datareader = csv.reader(gcs_file)
        count = 0
        entities = []
        for row in datareader:
            count += 1
                newProd = DataStoreModel(attr1=row[0], link=row[1])
                entities.append(newProd)

            if count%50==0 and entities:
                ndb.put_multi(entities)
                entities=[]

        if entities:
            ndb.put_multi(entities)

application = webapp2.WSGIApplication([
    ('/gcsupload', UploadGCSData),
], debug=True)
1

有些朋友可能和我一样:我不能使用数据存储的导入/导出工具,因为我的数据在进入数据存储之前需要进行一些转换。

最后,我选择了使用apache-beam (谷歌云数据流)

你只需要写几行“beam”代码来:

  • 读取你的数据(比如存放在云存储上)——你会得到一个包含字符串的PCollection
  • 进行你想要的任何转换(这样你就会得到一个包含数据存储实体的PCollection),
  • 将它们导入到数据存储接收器

想了解具体的使用案例,可以查看如何使用多个工作者加速批量导入谷歌云数据存储?

我能够以每秒800个实体的速度将数据写入我的数据存储,使用了5个工作者。这让我在大约5小时内完成了导入任务(总共有1600万行)。如果你想更快,可以使用更多的工作者 :D

撰写回答