couchdb-python 更改通知

9 投票
2 回答
3783 浏览
提问于 2025-04-17 04:42

我正在尝试使用couchdb.py来创建和更新数据库。我想实现通知变更,最好是持续模式。运行下面的测试代码后,我不太明白在Python中变更的机制是如何工作的。

class SomeDocument(Document):

#############################################################################

#    def __init__ (self):

    intField  = IntegerField()#for now - this should to be an integer
    textField = TextField()

couch = couchdb.Server('http://127.0.0.1:5984')
databasename = 'testnotifications'

if databasename in couch:
    print 'Deleting then creating database ' + databasename + ' from server'
    del couch[databasename]
    db = couch.create(databasename)
else:
    print 'Creating database ' + databasename + ' on server'
    db = couch.create(databasename)

for iii in range(5):

    doc = SomeDocument(intField=iii,textField='somestring'+str(iii))
    doc.store(db)
    print doc.id + '\t' + doc.rev

something = db.changes(feed='continuous',since=4,heartbeat=1000)

for iii in range(5,10):

    doc = SomeDocument(intField=iii,textField='somestring'+str(iii))
    doc.store(db)
    time.sleep(1)
    print something
    print db.changes(since=iii-1)

这个值

db.changes(since=iii-1) 

返回了一些我感兴趣的信息,但格式让我搞不清楚如何提取序列号、修订号或者文档信息:

{u'last_seq': 6, u'results': [{u'changes': [{u'rev': u'1-9c1e4df5ceacada059512a8180ead70e'}], u'id': u'7d0cb1ccbfd9675b4b6c1076f40049a8', u'seq': 5}, {u'changes': [{u'rev': u'1-bbe2953a5ef9835a0f8d548fa4c33b42'}], u'id': u'7d0cb1ccbfd9675b4b6c1076f400560d', u'seq': 6}]}

与此同时,我真正想用的代码是:

db.changes(feed='continuous',since=4,heartbeat=1000)

它返回了一个生成器对象,但似乎没有像CouchDB的指南所建议的那样提供实时通知……

有没有人成功使用couchdb-python中的变更功能?

2 个回答

4

我设置了一个邮件队列,使用的方式和这个类似。你还需要加载 couchdb.Session()。我还使用了一个过滤器,只接收未发送的邮件到队列的变化流。

from couchdb import Server

    s = Server('http://localhost:5984/')
    db = s['testnotifications']
    # the since parameter defaults to 'last_seq' when using continuous feed
    ch = db.changes(feed='continuous',heartbeat='1000',include_docs=True)

    for line in ch:
        doc = line['doc']
        // process doc here
        doc['priority'] = 'high'
        doc['recipient'] = 'Joe User'
        # doc['state'] + 'sent'
        db.save(doc)

这样你就可以直接从变化流中访问你的文档,按照自己的需要处理数据,最后更新你的文档。我在实际的 'db.save(doc)' 上使用了一个 try/except 块,这样我可以捕捉到在我编辑的时候文档被更新的情况,并在保存之前重新加载文档。

7

我使用的是长轮询,而不是持续轮询,这对我来说效果不错。在长轮询模式下,db.changes 会一直等待,直到至少有一个变化发生,然后返回所有的变化,结果以生成器对象的形式呈现。

下面是我用来处理变化的代码。settings.db 是我的 CouchDB 数据库对象。

since = 1
while True:
    changes = settings.db.changes(since=since)
    since = changes["last_seq"]
    for changeset in changes["results"]:
        try:
           doc = settings.db[changeset["id"]]
        except couchdb.http.ResourceNotFound:
           continue
        else:
           // process doc

你可以看到这是一个无限循环,每次循环都会调用 changes。调用 changes 会返回一个字典,里面有两个元素:最近一次更新的序列号和被修改的对象。我会遍历每个结果,加载相应的对象并进行处理。

如果你想要持续获取数据,可以把 while True: 这一行换成 for changes in settings.db.changes(feed="continuous", since=since)

撰写回答