在pymongo中快速或批量更新插入

57 投票
6 回答
42778 浏览
提问于 2025-04-16 13:37

我想知道怎么在pymongo中批量更新或插入数据。我想一次性更新很多条记录,但一个一个地处理太慢了。

有一个几乎相同问题的答案在这里:MongoDB中的批量更新/插入

不过,接受的答案并没有真正解决问题。它只是给了一个关于如何使用mongo命令行工具进行导入/导出的链接。

我也希望有人能解释一下,为什么批量更新或插入可能不可行或者不是最佳做法,但请告诉我解决这类问题的推荐方案是什么。

6 个回答

12

如果你有很多数据,并且想用“_id”来判断数据是否存在,

你可以试试...

import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient('localhost', 27017)
db=client['sampleDB']

collectionInfo = db.sample

#sample data
datas=[
    {"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
    {"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
    {"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
    {"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
    {"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
    {"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]

#you should split judge item and other data 
ids=[data.pop("_id") for data in datas]

operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)]

collectionInfo.bulk_write(operations)

我的英语很差,如果你听不懂我说的话,我很抱歉

33

MongoDB 2.6及以上版本支持批量操作。这包括批量插入、更新等。这样做的目的是为了减少或消除逐条操作时的延迟,也就是减少每次操作都要来回的时间。

那么,这个功能是怎么实现的呢?下面用Python举个例子,因为我正在用这个语言。

>>> import pymongo
>>> pymongo.version
'2.7rc0'

使用这个功能时,我们需要创建一个“批量”对象,把要处理的文档添加进去,然后调用执行方法,这样就能一次性发送所有的更新。需要注意的是,所有操作的BSON大小(也就是它们的总大小)不能超过16MB的文档大小限制。当然,操作的数量可能会有很大差异,具体情况要看实际使用。

下面是使用Pymongo进行批量更新的例子:

import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}})
retval = bulkop.execute()

这是最基本的方法。更多信息可以查看:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

补充说明:从Python驱动的3.5版本开始,initialize_ordered_bulk_op这个方法已经不推荐使用了,建议用bulk_write()来替代。[ http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write ]

59

现在的 pymongo 版本(3.x 以上)把批量操作封装成一个统一的接口,这样即使服务器不支持批量操作,它也会自动降级处理。这种做法在 MongoDB 官方支持的驱动中也是一致的。

所以,推荐的编程方式是使用 bulk_write() 方法。在这个方法中,你可以使用 UpdateOne 或其他合适的操作。现在当然更推荐使用自然语言的列表,而不是特定的构建器。

这是旧文档的直接翻译:

from pymongo import UpdateOne

operations = [
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]

result = collection.bulk_write(operations)

或者经典的文档转换循环:

import random
from pymongo import UpdateOne

random.seed()

operations = []

for doc in collection.find():
    # Set a random number on every document update
    operations.append(
        UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
    )

    # Send once every 1000 in batch
    if ( len(operations) == 1000 ):
        collection.bulk_write(operations,ordered=False)
        operations = []

if ( len(operations) > 0 ):
    collection.bulk_write(operations,ordered=False)

返回的结果是 BulkWriteResult,它会包含匹配和更新的文档计数,以及任何发生的“插入或更新”操作的返回 _id 值。

关于批量操作数组大小有一些误解。实际上,发送到服务器的请求不能超过 16MB 的 BSON 限制,因为这个限制同样适用于发送到服务器的“请求”,而这个请求也是使用 BSON 格式的。

不过,这并不限制你可以构建的请求数组的大小,因为实际的操作会以每批 1000 个的方式发送和处理。唯一真正的限制是这 1000 个操作指令本身不能创建超过 16MB 的 BSON 文档。这确实是个不小的挑战。

批量方法的一般概念是“减少流量”,因为一次发送很多请求,只处理一个服务器的响应。这样减少了每个更新请求附带的开销,节省了很多时间。

撰写回答