pymongo遍历20万条记录耗时超过24小时

3 投票
1 回答
1033 浏览
提问于 2025-04-17 12:50

我在数据库里有两个集合,分别叫做 pagepagearchive,我想把它们整理一下。我发现新的文档被创建在 pagearchive 里,而不是像我想的那样把值添加到嵌套文档中。所以,简单来说,这个脚本的作用就是遍历 page 中的每个文档,然后在 pagearchive 中找到所有该文档的副本,把我想要的数据移动到一个文档里,并删除多余的副本。

问题是,pagearchive 里只有20万份文档,但根据我在底部打印的计数变量,处理1000条记录需要30分钟到60分钟以上。这实在是太慢了。我见过的重复文档数量最多是88个。但大多数情况下,当我在 pageArchive 中查询 uu 时,看到的重复文档只有1到2个。

mongodb 运行在一台64位的单实例机器上,内存有16GB。
我在 pageArchive 集合中遍历的 uu 字段是一个字符串。我确保在这个字段上有索引 db.pagearchive.ensureIndex({uu:1}),我还做了 mongod --repair 来确保一切正常。

我猜问题可能出在我写的 Python 代码上(我对这方面不太擅长),或者我可能遗漏了某些 mongodb 必须的东西。为什么速度这么慢,或者我能做些什么来大幅提高速度呢?

我想也许是因为 uu 字段是字符串,导致了瓶颈,但这正是文档中的唯一属性(在我整理好这个集合后会是)。而且,当我停止这个过程再重新启动时,速度能提升到每秒处理1000条记录。但一旦开始找到重复项,速度又会变得非常慢(每10到20分钟才删除大约100条记录)。

from pymongo import Connection
import datetime


def match_dates(old, new):
    if old['coll_at'].month == new['coll_at'].month and old['coll_at'].day == new['coll_at'].day and old['coll_at'].year == new['coll_at'].year:
        return False

    return new

connection = Connection('dashboard.dev')


db = connection['mydb']

pageArchive = db['pagearchive']
pages = db['page']

count = 0
for page in pages.find(timeout=False):

    archive_keep = None
    ids_to_delete = []
    for archive in pageArchive.find({"uu" : page['uu']}):

        if archive_keep == None:
            #this is the first record we found, so we will store data from duplicate records with this one; delete the rest
            archive_keep = archive
        else:
            for attr in archive_keep.keys():
                #make sure we are dealing with an embedded document field
                if isinstance(archive_keep[attr], basestring) or attr == 'updated_at':
                    continue
                else:
                    try:
                        if len(archive_keep[attr]) == 0:
                            continue
                    except TypeError:
                        continue
                    try:
                        #We've got our first embedded doc from a property to compare against
                        for obj in archive_keep[attr]:
                            if archive['_id'] not in ids_to_delete:
                                ids_to_delete.append(archive['_id'])
                            #loop through secondary archive doc (comparing against the archive keep)
                            for attr_old in archive.keys():
                                #make sure we are dealing with an embedded document field
                                if isinstance(archive[attr_old], basestring) or attr_old == 'updated_at':
                                    continue
                                else:
                                    try:
                                        #now we know we're dealing with a list, make sure it has data
                                        if len(archive[attr_old]) == 0:
                                            continue
                                    except TypeError:
                                        continue
                                    if attr == attr_old:
                                        #document prop. match; loop through embedded document array and make sure data wasn't collected on the same day
                                        for obj2 in archive[attr_old]:
                                            new_obj = match_dates(obj, obj2)
                                            if new_obj != False:
                                                archive_keep[attr].append(new_obj)
                    except TypeError, te:
                        'not iterable'
        pageArchive.update({
                            '_id':archive_keep['_id']}, 
                           {"$set": archive_keep}, 
                           upsert=False)
        for mongoId in ids_to_delete:
            pageArchive.remove({'_id':mongoId})
        count += 1
        if count % 100 == 0:
            print str(datetime.datetime.now()) + ' ### ' + str(count) 

1 个回答

2

我会对代码做以下修改:

  • match_dates 函数中,返回 None 代替 False,然后用 if new_obj is not None: 来检查引用,这样就不会调用对象的 __ne____nonzero__ 方法了。

  • for page in pages.find(timeout=False): 这行代码中,如果只使用 uu 这个键,并且页面数据很大,可以给 find 加上 fields=['uu'] 参数,这样会加快查询速度。

  • archive_keep == None 改成 archive_keep is None

  • archive_keep[attr] 这段代码被调用了4次。为了提高一点速度,可以先保存 keep_obj = archive_keep[attr],然后再使用 keep_obj

  • ids_to_delete = [] 改成 ids_to_delete = set()。这样在 if archive['_id'] not in ids_to_delete: 这行代码中,检查是否在集合里会更快,时间复杂度是 O(1)。

撰写回答