普通Postgres队列

tpq的Python项目详细描述


Travis CI StatusCode Coveragehttps://badge.fury.io/py/tpq.svg

普通Postgres队列

这是一个简单的库,可以将json blob放入fifo队列中,然后 找回他们。

此队列与其他类似队列的区别在于 FOR UPDATE SKIP LOCKED

这里的优点是,队列项在当前 事务已提交。回滚时,项目将留在队列中 原封不动。

如果其他记录作为更大事务的一部分进行修改,则这些更改是 也向后滚动。使更大的队列处理操作原子化。拿着 遵循操作顺序

BEGINSELECT...FROMqueueFORUPDATESKIPLOCKEDINSERTINTO...DELETEFROM...ROLLBACK

在上面的语句中,没有一个语句有任何影响,并且队列项保持不变 在表中被另一个消费者“重试”。由于使用了用于更新的, 队列项保持锁定,以避免多个使用者获取该项 从队列中。

python库的使用

数据库连接信息可以通过库api提供。

importtpq# Explicitly provide database connection informationq=tpq.Queue('queue_name',host='localhost',dbname='foobar')q.put('{"foo": "bar"}')# Or use shortcut functions:tpq.put('queue_name','{"foo": "bar"}',host='localhost',dbname='foobar')tpq.get('queue_name',host='localhost',dbname='foobar')# Or to take advantage of cooperative transactions, provide a connection:q=tpq.Queue('queue_name',conn=connection)q.put('{"foo": "bar"}')# Which is also supported by shortcut functions:tpq.put('queue_name','{"foo": "bar"}',conn=connection)tpq.get('queue_name',conn=connection)

或者,您可以在环境中设置连接信息:

$ # Export as URL
$ export TPQ_URL="postgresql://user:pass@localhost/foobar"

$ # Or separately
$ export TPQ_HOST=localhost
$ export TPQ_DB=foobar
$ export TPQ_USER=user
$ export TPQ_PASS=pass

然后省略参数:

importtpq# Use an instance for multiple operationswithtpq.Queue('queue_name')asq:q.put('{"foo": "bar"}')data=q.get()# Or use shortcut functions:tpq.put('queue_name','{"foo": "bar"}')tpq.get('queue_name')

等待

您可以使用wait参数等待项目到达。

importtpq# Wait forevertpq.get('queue_name',wait=0)# Don't wait (also can omit the param).tpq.get('queue_name',wait=-1)# Wait specified number of seconds.tpq.get('queue_name',wait=5)

命令行界面

还提供了命令行界面。json可以通过文件或 stdin(默认值)。

$ # Configure your database.
$ export TPQ_URL="postgresql://user:pass@localhost/foobar"

$ # JSON via stdin (default).
$ echo "{\"foo\": \"bar\"}" | tpq produce queue_name

$ # JSON via file.
$ tpq produce queue_name --file=message.json

$ # Explicitly provide JSON via stdin.
$ tpq produce queue_name --file=- < message.json

$ # Then read the item to stdout.
$ tpq consume queue_name
{'foo': 'bar'}

$ # If you have trouble (or for logging). Debug output goes to stderr.
$ TPQ_URL="postgresql://user:pass@localhost/foobar" tpq consume queue_name --debug
Read database config from environment
Parsing TPQ_URL
Database config found
Attempting to read item
Item read, returning
{'foo': 'bar'}

$ # You can wait on the CLI too...
$ # Forever:
$ tpq consume queue_name --wait=0

$ # Specified number of seconds:
$ tpq consume queue_name --wait=5

$ # The return code signals whether an item was received or not.
$ tpq consume queue_name --wait=-1
{'foo': 'bar'}
$ echo $?
0

# For an empty queue, you get 1
$ tpq consume queue_name --wait=-1
Queue empty
Traceback (most recent call last):
  File "/home/btimby/Code/tpq/tpq/__main__.py", line 24, in consume
    print(get(opt['<name>'], wait=opt['--wait']))
  File "/home/btimby/Code/tpq/tpq/__init__.py", line 266, in get
    return q.get(wait=wait)
  File "/home/btimby/Code/tpq/tpq/__init__.py", line 233, in get
    raise QueueEmpty()
queue.Empty
$ echo $?
1

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Eclipse内存分析器(MAT):不显示当前正在运行的进程   java Apache Velocity:转义字符不能作为关联数组键用于PHP   不截断零的java格式十进制输出   在另一个类文件中调用时返回空值的java getter   java集合获取连接   java解析json使用Gson登录系统应用程序强制关闭   java DelferredResult带有两个请求的ajax请求   java可降低功耗,同时应使用无线   java BoxLayout无法共享错误?   java如何使用计时器制作闹钟   java使用OAuth2保护RESTWeb服务:一般原则   java在一个jframe上显示多个图像和按钮