如何实现支持命名空间的FIFO队列
我正在使用一种方法来处理一个基于Google App Engine的FIFO队列,这种队列是通过db.Model实现的(可以查看这个问题)。
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import run_wsgi_app
class QueueItem(db.Model):
created = db.DateTimeProperty(required=True, auto_now_add=True)
data = db.BlobProperty(required=True)
@staticmethod
def push(data):
"""Add a new queue item."""
return QueueItem(data=data).put()
@staticmethod
def pop():
"""Pop the oldest item off the queue."""
def _tx_pop(candidate_key):
# Try and grab the candidate key for ourselves. This will fail if
# another task beat us to it.
task = QueueItem.get(candidate_key)
if task:
task.delete()
return task
# Grab some tasks and try getting them until we find one that hasn't been
# taken by someone else ahead of us
while True:
candidate_keys = QueueItem.all(keys_only=True).order('created').fetch(10)
if not candidate_keys:
# No tasks in queue
return None
for candidate_key in candidate_keys:
task = db.run_in_transaction(_tx_pop, candidate_key)
if task:
return task
这个队列运行得很好(非常好)。
现在我的代码中有一个方法,可以通过一个延迟队列来访问这个FIFO队列:
def deferred_worker():
data= QueueItem.pop()
do_something_with(data)
我想增强这个方法和队列的数据结构,添加一个client_ID参数,代表需要访问自己队列的特定客户。大概是这样的:
def deferred_worker(client_ID):
data= QueueItem_of_this_client_ID.pop() # I need to implement this
do_something_with(data)
我该如何让这个队列能够识别client_ID呢?
限制条件:
- 客户的数量是动态的,并不是预先定义好的
- 任务队列不是一个选项(1. 最多十个队列 2. 我想完全控制我的队列)
你知道我如何使用新的命名空间API来添加这个功能吗?(记住,我不是从webapp.RequestHandler调用db.Model)
另一个选项是:我可以在QueueItem中添加一个client_ID db.StringProperty
,并在拉取方法中使用它作为过滤器:
QueueItem.all(keys_only=True).filter(client_ID=an_ID).order('created').fetch(10)
有没有更好的主意?
2 个回答
1
正如我在你问我问题时提到的,你不需要做任何额外的事情来让这个和命名空间一起工作:这个队列是建立在数据存储上的,而数据存储本身已经支持命名空间了。你只需要按照说明设置你想要的命名空间就可以了,具体可以参考文档。
1
假设你的“客户端类”实际上是一个客户端调用的请求处理器,你可以这样做:
from google.appengine.api import users
from google.appengine.api.namespace_manager import set_namespace
class ClientClass(webapp.RequestHandler):
def get(self):
# For this example let's assume the user_id is your unique id.
# You could just as easily use a parameter you are passed.
user = users.get_current_user()
if user:
# If there is a user, use their queue. Otherwise the global queue.
set_namespace(user.user_id())
item = QueueItem.pop()
self.response.out.write(str(item))
QueueItem.push('The next task.')
另外,你也可以设置一个全局的命名空间,具体可以参考这个链接:app-wide。
通过设置默认的命名空间,所有对数据存储的调用都会在这个命名空间内进行,除非你特别指定其他的命名空间。需要注意的是,要获取和运行任务,你必须知道命名空间是什么。所以,你可能需要在默认命名空间中维护一个命名空间的列表,以便于清理。