谷歌应用引擎Python的迷你可迭代模型映射器

3 投票
1 回答
548 浏览
提问于 2025-04-16 16:08

我觉得我想要的东西可能不存在。有没有人能给我创建一个简单的迷你映射器类?详细的伪代码或实际的Python代码都可以。 更新:帖子底部有一个简单可用的版本。

更新 2 - 6月20日:

  • 更安全的执行:现在在iterall()中可以使用continue/break/return。
  • 添加了defer_db标志,用于将数据库的添加和删除操作发送到任务队列。
  • 为了抽象化,可以指定一个过滤函数,只有通过这个函数的实体才会在迭代时返回。
  • 将.bdelete()改为.bdel()。

更新 3 - 6月21日:

  • 修复了上次更新中引入的一个重大错误,这个错误导致无法保存。

我希望这对除了我自己以外的人也有用。我经常使用它,它在MapReduce和当你不确定会有多少结果时需要游标之间提供了一个舒适的解决方案。


这是什么?

gae的mapreduce库很好,但我想要一些轻量级和一次性的东西。在Python gae的教程中,通常会看到数据库模型被迭代、修改和保存。我觉得没有更多这样的例子,因为我们知道这样做效率很低,每次循环都会调用数据存储,而不是批量处理。不过我喜欢这个接口,我经常发现自己需要一种简单快速的方法来遍历我的数据库模型。

它会是什么样子?

使用方法

  1. 导入这个类。
  2. 告诉它你想要映射哪个模型。
  3. 给它可选的查询过滤条件。
  4. 获取迭代器对象。
  5. 开始循环,放心地知道你不会进行成千上万的无用数据库调用。

幕后

这就是我需要你们帮助的地方,因为我觉得自己有点力不从心。

生成器 (我从未使用过生成器,只是有点理解它们) 对数据存储中的项目进行批量抓取(抓取多少是安全的?有没有硬性限制,还是取决于项目大小?),并以可迭代的方式呈现它们。一旦达到 MAX_AMOUNT 批量大小,就将这些项目批量保存到数据存储中,并无缝抓取下一批(带游标)。

我考虑过使用defer将项目保存到数据库,目的是在我们循环处理许多项目时节省一些时间。可能的缺点是下一段代码期望映射已经完成。因此,我认为设置或忽略一个'defer_db'标志会很好,这取决于用户的偏好。如果你只期望处理少量项目,那么就不需要设置defer标志。

结论

请为这个小项目贡献代码概念。被接受的答案将在一周后获得最多的赞同票。老实说,我觉得请求SO为我想出解决方案有点不太好,但我真心觉得自己无法完成这个任务。 我希望你觉得它有用。

示例

相同的查询函数

country_mim = MIM(CountryModels.all()).filter("spoken_language =", "French")
country_mim.order("population")

嵌套迭代

some_mim = MIM(SomeModel.all())
for x in some_mim.iterall():
    if x.foo == 'ham sandwich':
        sandwich_mim = MIM(MySandwiches.all())
        for sandwich in sandwich_mim.iterall():
            if 'ham' in sandwich.ingredients:
                print 'yay'

批量保存和删除

country_mim = MIM(CountryModels.all()).order("drinking_age")
for country in country_mim.iterall():
    if country.drinking_age > 21:   # these countries should be nuked from orbit
        country_mim.bdel(country)   # delete
    if country.drinking_age == 18:
        country.my_thoughts = "god bless you foreigners"
        country_mim.bput(country)   # save
    if country.drinking_age < 10:   # panic
        country.my_thoughts = "what is this i don't even..."
        country_mim.bput(country)
        break   # even though we panicked, the bput still resolves

一些代码:MiniIterMapper.py

我已经使用这段代码好几个 星期了,一切似乎都很好。 尚未包含defer。 查询外观代码是从伟大的 PagedQuery 模块中借来的(经过许可)。支持批量保存和批量删除。

import google.appengine.ext.db as db
from google.appengine.ext.deferred import defer

class MIM(object):
    """
    All standard Query functions (filter, order, etc) supported*. Default batch
    size is 100. defer_db=True will cause put and delete datastore operations to
    be deferred. allow_func accepts any function you wish and only the entities
    that cause the function to return a true value will be returned during
    iterall(). Using break/continue/return while iterating doesn't cause things
    to explode (like it did in the 1st version).

    * - thanks to http://code.google.com/p/he3-appengine-lib/wiki/PagedQuery
    """

    def __init__(self, query, batch_size=100, defer_db=False, allow_func=None):

        self._query =       query
        self._batch_size =  batch_size
        self._defer_db =    defer_db
        self._allow_func =  allow_func
        self._to_save =     []
        self._to_delete =   []

        # find out if we are dealing with another facade object
        if query.__dict__.has_key('_query'): query_to_check = query._query
        else: query_to_check  = query

        if isinstance(query_to_check, db.Query):        self._query_type = 'Query'
        elif isinstance(query_to_check, db.GqlQuery):   self._query_type = 'GqlQuery'
        else: raise TypeError('Query type not supported: ' + type(query).__name__)

    def iterall(self):
        "Return iterable over all datastore items matching query. Items pulled from db in batches."

        results =               self._query.fetch(self._batch_size) # init query
        savedCursor =           self._query.cursor()                # init cursor

        try:
            while results:

                for item in results:
                    if self._allow_func:
                        if self._allow_func(item):
                            yield item
                    else:
                        yield item

                if len(results) ==  self._batch_size:
                    results =       self._query.with_cursor(savedCursor).fetch(self._batch_size)
                    savedCursor =   self._query.cursor()

                else:                   # avoid additional db call if we don't have max amount
                    results =       []  # while loop will end, and go to else section.
            else:
                self._finish()
        except GeneratorExit:
            self._finish()

    def bput(self, item):
        "Batch save."
        self._to_save.append(item)
        if len(self._to_save) >= self._batch_size:
            self._bput_go()

    def bdel(self, item):
        "Batch delete."
        self._to_delete.append(item)
        if len(self._to_delete) >= self._batch_size:
            self._bdel_go()

    def _bput_go(self):
        if self._defer_db:
            defer(db.put, self._to_save)
        else: db.put(self._to_save)
        self._to_save = []

    def _bdel_go(self):
        if self._defer_db:
            defer(db.delete, self._to_delete)
        else: db.delete(self._to_delete)
        self._to_delete = []

    def _finish(self):
        "When done iterating through models, could be that the last few remaining weren't put/deleted yet."
        if self._to_save:   self._bput_go()
        if self._to_delete: self._bdel_go()

    # FACADE SECTION >>>

    def fetch(self, limit, offset=0):
        return self._query.fetch(limit,offset)

    def filter(self, property_operator, value):
        self._check_query_type_is('Query')
        self._query = self._query.filter(property_operator, value)
        return self

    def order(self, property):
        self._check_query_type_is('Query')
        self._query.order(property)
        return self

    def ancestor(self, ancestor):
        self._check_query_type_is('Query')
        self._query.ancestor(ancestor)
        return self

    def count(self, limit=1000):
        return self._query.count(limit)

    def _check_query_type_is(self, required_query_type):
        if self._query_type != required_query_type:
            raise TypeError('Operation not allowed for query type ('\
                            + type(self._query).__name__)

1 个回答

1

你为什么不想用Mapreduce呢?它就是为这种情况设计的,已经能做到你想要的所有功能,而且可以通过编程来调用。‘轻量级’这个词很模糊,但我不知道有什么理由让你觉得mapreduce库不适合你的任务——其实没有太多理由去重复实现这些功能。

撰写回答