PyMongo的批量写入操作与生成器的特性

2 投票
2 回答
1778 浏览
提问于 2025-04-29 20:23

我想使用PyMongo的批量写入操作功能,这样可以把写入操作分成一组一组来执行,这样可以减少网络请求的次数,提高写入的速度。

我还在这里发现,批量操作的数量可以设置为5000。

不过,我并不确定批量操作的最佳数量是多少,以及如何将PyMongo的批量写入功能和生成器结合在下面的代码中?

from pymongo import MongoClient
from itertools import groupby
import csv


def iter_something(rows):
    key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type']
    chr_key_names = ['letter', 'no']
    for keys, group in groupby(rows, lambda row: row[:6]):
        result = dict(zip(key_names, keys))
        result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group]
        yield result


def main():
    converters = [str, str, str, int, int, int, str, int]
    with open("/home/mic/tmp/test.txt") as c:
    reader = csv.reader(c, skipinitialspace=True)
    converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader)
    for object_ in iter_something(converted):
        print(object_)


if __name__ == '__main__':
    db = MongoClient().test
    sDB = db.snps 
    main()

test.txt 文件:

  Test, A, B01, 828288,  1,    7, C, 5
  Test, A, B01, 828288,  1,    7, T, 6
  Test, A, B01, 171878,  3,    7, C, 5
  Test, A, B01, 171878,  3,    7, T, 6
  Test, A, B01, 871963,  3,    9, A, 5
  Test, A, B01, 871963,  3,    9, G, 6
  Test, A, B01, 1932523, 1,   10, T, 4
  Test, A, B01, 1932523, 1,   10, A, 5
  Test, A, B01, 1932523, 1,   10, X, 6
  Test, A, B01, 667214,  1,   14, T, 4
  Test, A, B01, 667214,  1,   14, G, 5
  Test, A, B01, 67214,   1,   14, G, 6      
暂无标签

2 个回答

1

你有一个文档生成器,现在想把这些文档分成小块或者小组。这可以通过使用 grouper 生成器来优雅地实现,具体的做法可以参考 这个回答

然后,对于每一组文档,可以使用 pymongo 的 insert 方法来批量插入这些文档。

这样你就能得到:

def main():
    db = MongoClient().test
    sDB = db.snps 
    ...
    for docs_group in grouper(iter_something(converted), BULK_SIZE):
        docs_group = [ doc for doc in docs_group if doc is not None ]  # filter out Nones
        sDB.insert(docs_group, ...)

至于最佳的 BULK_SIZE,这个要根据不同的因素来决定,比如文档的典型大小、网络延迟等等。你需要自己尝试一下。

5

你可以简单地这样做:

sDB.insert(iter_something(converted))

PyMongo会自动处理这些事情:它会不断地从你的生成器中获取数据,直到获取到1000个文档或者16MB的数据为止。然后,它会暂停生成器,把这些数据批量插入到MongoDB中。一旦数据插入完成,PyMongo会继续从生成器中获取数据,创建下一个批次,直到所有文档都插入完毕。最后,insert()会返回一个包含已插入文档ID的列表。

最初对生成器的支持是在这个提交中加入的,从那以后我们一直保持对文档生成器的支持。

撰写回答