如何批量上传数据到App Engine数据存储?旧方法无效
这个问题应该是很多人都会遇到的,简单来说,就是要把大量数据上传到appengine的数据库里。
不过,之前在stackoverflow上提到的一些老方法(下面有链接)现在似乎都不管用了。以前用DB API上传数据时,bulkloader方法是个不错的选择,但现在用NDB API就不行了。
而且现在bulkloader方法似乎已经被淘汰了,旧的链接虽然还在文档里,但指向的页面都是错的。这里有个例子:
https://developers.google.com/appengine/docs/python/tools/uploadingdata
这个链接现在还在这个页面上:https://developers.google.com/appengine/docs/python/tools/uploadinganapp
那么现在推荐的批量上传数据的方法是什么呢?
看起来有两个可行的替代方案:1)使用remote_api,或者2)把数据写成CSV文件放到GCS桶里,然后再读取。有没有人有成功使用这两种方法的经验?
任何建议都非常感谢!谢谢!
[*下面链接中的解决方案已经不再有效]
4 个回答
截至2018年,最好的方法是使用新的导入/导出功能。
你提供的链接中的远程API方法仍然可以正常使用,不过如果数据行超过几百行的话,它的速度会非常慢。
我成功地将GCS和MapReduce框架结合使用,下载数据存储的内容,而不是上传。不过,原理应该是一样的。你可以查看一下mapreduce的文档:实际上你只需要使用mapper这一步,所以你可以定义一个简单的函数,接受CSV中的一行数据,并根据这些数据创建一个数据存储实体。
方法一:使用 remote_api
怎么做:写一个 bulkloader.yaml 文件,然后在终端直接用 “appcfg.py upload_data” 命令运行它。我不推荐这个方法,原因有两个:1. 延迟很大 2. 不支持 NDB
方法二:使用 GCS 和 Mapreduce
上传数据文件到 GCS:
可以使用 “storage-file-transfer-json-python” 这个 GitHub 项目中的 chunked_transfer.py 来从本地系统上传文件到 GCS。确保从应用引擎管理控制台生成正确的 “client-secrets.json” 文件。
Mapreduce:
使用 "appengine-mapreduce" 这个 GitHub 项目。把 "mapreduce" 文件夹复制到你的项目顶层文件夹。
然后在你的 app.yaml 文件中添加以下内容:
includes:
- mapreduce/include.yaml
下面是你的 main.py 文件
import cgi
import webapp2
import logging
import os, csv
from models import DataStoreModel
import StringIO
from google.appengine.api import app_identity
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader
def testmapperFunc(newRequest):
f = StringIO.StringIO(newRequest)
reader = csv.reader(f, delimiter=',')
for row in reader:
newEntry = DataStoreModel(attr1=row[0], link=row[1])
yield op.db.Put(newEntry)
class TestGCSReaderPipeline(base_handler.PipelineBase):
def run(self, filename):
yield mapreduce_pipeline.MapreducePipeline(
"test_gcs",
"testgcs.testmapperFunc",
"mapreduce.input_readers.FileInputReader",
mapper_params={
"files": [filename],
"format": 'lines'
},
shards=1)
class tempTestRequestGCSUpload(webapp2.RequestHandler):
def get(self):
bucket_name = os.environ.get('BUCKET_NAME',
app_identity.get_default_gcs_bucket_name())
bucket = '/gs/' + bucket_name
filename = bucket + '/' + 'tempfile.csv'
pipeline = TestGCSReaderPipeline(filename)
pipeline.with_params(target="mapreducetestmodtest")
pipeline.start()
self.response.out.write('done')
application = webapp2.WSGIApplication([
('/gcsupload', tempTestRequestGCSUpload),
], debug=True)
记住:
- Mapreduce 项目使用了现在已经不再支持的 “Google Cloud Storage Files API”。所以未来可能没有支持。
- Mapreduce 会对数据存储的读取和写入增加一点额外的负担。
方法三:使用 GCS 和 GCS 客户端库
- 使用上面提到的文件传输方法,把 csv/text 文件上传到 GCS。
- 使用 GCS 客户端库(把 'cloudstorage' 文件夹复制到你的应用顶层文件夹)。
在应用的 main.py 文件中添加以下代码。
import cgi
import webapp2
import logging
import jinja2
import os, csv
import cloudstorage as gcs
from google.appengine.ext import ndb
from google.appengine.api import app_identity
from models import DataStoreModel
class UploadGCSData(webapp2.RequestHandler):
def get(self):
bucket_name = os.environ.get('BUCKET_NAME',
app_identity.get_default_gcs_bucket_name())
bucket = '/' + bucket_name
filename = bucket + '/tempfile.csv'
self.upload_file(filename)
def upload_file(self, filename):
gcs_file = gcs.open(filename)
datareader = csv.reader(gcs_file)
count = 0
entities = []
for row in datareader:
count += 1
newProd = DataStoreModel(attr1=row[0], link=row[1])
entities.append(newProd)
if count%50==0 and entities:
ndb.put_multi(entities)
entities=[]
if entities:
ndb.put_multi(entities)
application = webapp2.WSGIApplication([
('/gcsupload', UploadGCSData),
], debug=True)
有些朋友可能和我一样:我不能使用数据存储的导入/导出工具,因为我的数据在进入数据存储之前需要进行一些转换。
最后,我选择了使用apache-beam (谷歌云数据流)。
你只需要写几行“beam”代码来:
- 读取你的数据(比如存放在云存储上)——你会得到一个包含字符串的
PCollection
, - 进行你想要的任何转换(这样你就会得到一个包含数据存储实体的
PCollection
), - 将它们导入到数据存储接收器。
想了解具体的使用案例,可以查看如何使用多个工作者加速批量导入谷歌云数据存储?
我能够以每秒800个实体的速度将数据写入我的数据存储,使用了5个工作者。这让我在大约5小时内完成了导入任务(总共有1600万行)。如果你想更快,可以使用更多的工作者 :D