普通Postgres队列
tpq的Python项目详细描述
普通Postgres队列
这是一个简单的库,可以将json blob放入fifo队列中,然后 找回他们。
此队列与其他类似队列的区别在于 FOR UPDATE SKIP LOCKED。
这里的优点是,队列项在当前 事务已提交。回滚时,项目将留在队列中 原封不动。
如果其他记录作为更大事务的一部分进行修改,则这些更改是 也向后滚动。使更大的队列处理操作原子化。拿着 遵循操作顺序
BEGINSELECT...FROMqueueFORUPDATESKIPLOCKEDINSERTINTO...DELETEFROM...ROLLBACK
在上面的语句中,没有一个语句有任何影响,并且队列项保持不变 在表中被另一个消费者“重试”。由于使用了用于更新的, 队列项保持锁定,以避免多个使用者获取该项 从队列中。
python库的使用
数据库连接信息可以通过库api提供。
importtpq# Explicitly provide database connection informationq=tpq.Queue('queue_name',host='localhost',dbname='foobar')q.put('{"foo": "bar"}')# Or use shortcut functions:tpq.put('queue_name','{"foo": "bar"}',host='localhost',dbname='foobar')tpq.get('queue_name',host='localhost',dbname='foobar')# Or to take advantage of cooperative transactions, provide a connection:q=tpq.Queue('queue_name',conn=connection)q.put('{"foo": "bar"}')# Which is also supported by shortcut functions:tpq.put('queue_name','{"foo": "bar"}',conn=connection)tpq.get('queue_name',conn=connection)
或者,您可以在环境中设置连接信息:
$ # Export as URL $ export TPQ_URL="postgresql://user:pass@localhost/foobar" $ # Or separately $ export TPQ_HOST=localhost $ export TPQ_DB=foobar $ export TPQ_USER=user $ export TPQ_PASS=pass
然后省略参数:
importtpq# Use an instance for multiple operationswithtpq.Queue('queue_name')asq:q.put('{"foo": "bar"}')data=q.get()# Or use shortcut functions:tpq.put('queue_name','{"foo": "bar"}')tpq.get('queue_name')
等待
您可以使用wait参数等待项目到达。
importtpq# Wait forevertpq.get('queue_name',wait=0)# Don't wait (also can omit the param).tpq.get('queue_name',wait=-1)# Wait specified number of seconds.tpq.get('queue_name',wait=5)
命令行界面
还提供了命令行界面。json可以通过文件或 stdin(默认值)。
$ # Configure your database. $ export TPQ_URL="postgresql://user:pass@localhost/foobar" $ # JSON via stdin (default). $ echo "{\"foo\": \"bar\"}" | tpq produce queue_name $ # JSON via file. $ tpq produce queue_name --file=message.json $ # Explicitly provide JSON via stdin. $ tpq produce queue_name --file=- < message.json $ # Then read the item to stdout. $ tpq consume queue_name {'foo': 'bar'} $ # If you have trouble (or for logging). Debug output goes to stderr. $ TPQ_URL="postgresql://user:pass@localhost/foobar" tpq consume queue_name --debug Read database config from environment Parsing TPQ_URL Database config found Attempting to read item Item read, returning {'foo': 'bar'} $ # You can wait on the CLI too... $ # Forever: $ tpq consume queue_name --wait=0 $ # Specified number of seconds: $ tpq consume queue_name --wait=5 $ # The return code signals whether an item was received or not. $ tpq consume queue_name --wait=-1 {'foo': 'bar'} $ echo $? 0 # For an empty queue, you get 1 $ tpq consume queue_name --wait=-1 Queue empty Traceback (most recent call last): File "/home/btimby/Code/tpq/tpq/__main__.py", line 24, in consume print(get(opt['<name>'], wait=opt['--wait'])) File "/home/btimby/Code/tpq/tpq/__init__.py", line 266, in get return q.get(wait=wait) File "/home/btimby/Code/tpq/tpq/__init__.py", line 233, in get raise QueueEmpty() queue.Empty $ echo $? 1