面向postgresql的轻型pgq框架排队系统
pgqueue的Python项目详细描述
这个模块提供了一个方便的python api来集成 PostgreSQL PGQ具有任何Python应用程序的功能。
PGQ介绍
(来自Skytools自述文件)
pgq是用pl/pgsql、python和c代码编写的排队系统。它是 基于slony-i基于快照的事件处理思想, 为一般用途而写。
PGQ提供了一个高效的事务性排队系统 多节点支持(包括工作共享和拆分、故障转移和 切换,队列和消费者)。
规则:
- 数据库中可以有多个队列。
- 可能有多个生产者,无法插入到任何队列中。
- 一个队列中可以有多个消费者。
- 一个消费者可能有几个次消费。
PGQ分为三个层次:生产者、票务者和消费者。
producers和consumers分别将事件推送到 排队。生产者只需要调用postgresql存储过程 (就像表上的触发器或应用程序中的postgresql调用)。 使用者通常是用python编写的,但是任何能够 可以使用run postgresql存储过程。
ticker是一个守护进程,它将队列分割成一批事件和 负责系统的维护。
pgqueue模块
此模块提供python函数和类来编写producers 以及消费者。 它还包含ticker引擎的python实现,该引擎 模仿skytools的原始c ticker:它将事件分批处理, 执行维护任务。
安装
先决条件:
- python>;=2.6或python 3
- psycopg2作为依赖项自动安装
- (在服务器上)扩展版本PgQ3.1
在debian/ubuntu上添加the PostgreSQL APT repository,然后安装包 postgresql-x.x-pgq3取决于PostgreSQL版本。
最后在数据库中创建扩展:
CREATE EXTENSION IF NOT EXISTS pgq;
您可以将pgqueue模块安装到您的环境中。
pip install --update pgqueue
示例用法
您需要永久运行ticker。 如果ticker关闭,事件将存储在队列中, 但不会为消费者准备批处理,事件表将 快速成长。
对于ticker,您可以选择优化的pgqd 用c编写的多数据库ticker和skytools的一部分,或者使用 此模块提供了更简单的python实现:
python -m pgqueue 'host=127.0.0.1 port=5432 user=jules password=xxxx dbname=test_db'
我们创建一个新队列,并注册一个消费者:
conn = psycopg2.connect("dbname=test user=postgres") conn.autocommit = True cursor = conn.cursor() first_q = pgqueue.Queue('first_queue') first_q.create(cursor, ticker_max_lag='4 seconds') consum_q = pgqueue.Consumer('first_queue', 'consumer_one') consum_q.register(cursor)
我们准备向队列中生成事件,并使用事件 稍后在应用程序中:
first_q.insert_event(cursor, 'announce', 'Hello ...') first_q.insert_event(cursor, 'announce', 'Hello world!') # ... wait a little bit conn.autocommit = False for event in consum_q.next_events(cursor, commit=True): print(event)
您可以浏览源代码以了解高级用法,直到我们编写 更多的文件(欢迎提供)。
学分
pgq是由marko kreen开发的postgresql扩展。 是part of SkyTools, Skype中用于复制和故障转移的工具包。
skytools还嵌入了一个pgqpython框架,它提供了 API略有不同。