Mikkoo是一个PGQ到RabbitMQ的中继
mikkoo的Python项目详细描述
一个PgQ到 RabbitMQ继电器。Mikkoo是PGQ的消费者 发布给RabbitMQ的。此外,它还包括一个内置的审计 可用于确认所有PGQ事件均由 拉比马克。
在“聪明的兔子和大象”的寓言中,米谷是以兔子命名的。
安装
Mikkoo在Python Package Index上有售 可通过pip安装:
pip install mikkoo
一旦你设置好了Skytools你可能想 在mikkoo.sql中安装可选的包含实用程序函数以使用 更容易的。
您可以结合使用curl和psql:
curl -L https://github.com/gmr/mikkoo/blob/master/mikkoo.sql | psql
这将在mikkoo模式中安装多个存储过程和一个审计表。 看一下ddl,了解每个函数是什么,以及它是如何实现的。 被利用。
PGQ设置
将pgq安装到数据库中并创建队列:
#CREATEEXTENSIONpgq;CREATEEXTENSION#SELECTpgq.create_queue('test');create_queue-------------- 1(1row)
确保pgqd 正在运行。
PGQ事件到AMQP映射
将事件插入pgq队列时,pgq.insert_event/7函数 应与以下字段映射一起使用:
PgQ Event | AMQP |
---|---|
^{tt5}$ | Routing Key |
^{tt6}$ | Message body |
^{tt7}$ | Exchange |
^{tt8}$ | Content-Type Property |
^{tt9}$ | AMQP Properties [1] |
^{tt10}$ | Headers [2] |
^{tt11}$ | Headers.timetamp |
^{tt12}$ | Headers.txid |
[1] | AMQP properties should be set as a JSON blob. Values set in the ^{tt9}$ field will overwrite the automatically created properties ^{tt14}$, ^{tt15}$, ^{tt16}$, ^{tt17}$, and ^{tt18}$. |
[2] | If ^{tt10}$ is specified and is a JSON key/value dictionary, it will be assigned to the ^{tt17}$ AMQP property. |
在mikkoo.sql文件中有一个方便的模式 用于在pgq中创建格式正确的mikkoo事件的存储过程。在 此外,还有一些审计功能允许创建 发送到PGQ的事件的审核日志。
AMQP消息属性
下表定义了可以在json blob中设置的可用字段 在插入事件时在ev_extra3字段中。
Property | PgSQL Type |
---|---|
^{tt14}$ | text |
^{tt23}$ | text |
^{tt15}$ | text |
^{tt16}$ | text |
^{tt26}$ | int2 |
^{tt27}$ | text |
^{tt28}$ | text |
^{tt17}$ | text/json [3] |
^{tt18}$ | int4 |
^{tt31}$ | text |
^{tt32}$ | int4 |
^{tt33}$ | text |
[3] | ^{tt17}$ should be sent to a key/value JSON blob if specified |
在提供给ev_extra3的json blob中分配的值优先于 自动分配的app_id,content_type,correlation_id, headers,和timestamp由mikkoo在处理时创建的值。
从1.0开始,mikkoo将自动添加四个amqp headers属性值。这些 值不会覆盖在ev_extra4中指定的同名值。 使用相同的名称覆盖值,即使在 sequence值是一个动态生成的id,它试图提供 模糊分布式订购信息。timestamp值是iso-8601 在添加事件时创建的ev_time字段的表示 去PGQ。txid值是ev_txid值,pgq事务id 事件。添加这些值有助于提供某种程度的确定性 命令。origin值是mikkoo正在运行的服务器的主机名 打开。
事件插入示例
下面的示例插入一个json blob消息体{"foo": "bar"} 将使用^{tt50}发布到rabbitmq中的postgresexchange$ 路由密钥。内容类型在ev_extra2和amqp^{tt31}中指定$ 消息属性在ev_extra3中指定。
#SELECTpgq.insert_event('test','test.routing-key','{"foo": "bar"}','postgres','application/json','{"type": "example"}','');insert_event-------------- 4(1row)
当rabbitmq接收到此消息时,它的消息体将为:
{"foo":"bar"}
它将具有类似于以下的消息属性:
Property | Example Value | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
^{tt14}$ | ^{tt55}$ | ||||||||||
^{tt15}$ | ^{tt57}$ | ||||||||||
^{tt16}$ | ^{tt59}$ | ||||||||||
^{tt17}$ |
| ||||||||||
timestamp | 1449600290 | ||||||||||
type | example |
Key | Description |
---|---|
^{tt76}$ | Enable/Disable RabbitMQ Publisher Confirmations. Default: ^{tt77}$ |
^{tt78}$ | Overwrite the default PgQ consumer name. Default: ^{tt55}$ |
^{tt80}$ | Maximum failures before discarding an event. Default: ^{tt81}$ |
^{tt82}$ | The url for connecting to PostgreSQL |
^{tt83}$ | The AMQP url for connecting to RabbitMQ |
^{tt84}$ | How long in seconds until PgQ emits failed events. Default: ^{tt81}$ |
^{tt86}$ | Unregister a consumer with PgQ on shutdown. Default: ^{tt77}$ |
^{tt88}$ | How long to wait before checking the queue after the last empty result. Default: ^{tt89}$ |
示例配置
以下是完整配置文件的示例:
Application:poll_interval:10sentry_dsn:[YOUR SENTRY DSN]statsd:enabled:truehost:localhostport:8125workers:test:confirm:Falseconsumer_name:my_consumermax_failures:5postgres_url:postgresql://localhost:5432/postgresrabbitmq_url:amqp://localhost:5672/%2fretry_delay:5unregister:Falsewait_duration:5Daemon:user:mikkoopidfile:/var/run/mikkooLogging:version:1formatters:verbose:format:'%(levelname)-10s%(asctime)s%(process)-6d%(processName)-20s%(name)-18s:%(message)s'datefmt:'%Y-%m-%d%H:%M:%S'handlers:console:class:logging.StreamHandlerformatter:verbosedebug_only:Trueloggers:helper:handlers:[console]level:INFOpropagate:truemikkoo:handlers:[console]level:INFOpropagate:truepika:handlers:[console]level:ERRORpropagate:truequeries:handlers:[console]level:ERRORpropagate:truetornado:handlers:[console]level:ERRORpropagate:trueroot:handlers:[console]level:CRITICALpropagate:truedisable_existing_loggers:trueincremental:false
运行Mikkoo
在为mikkoo创建一个配置文件(如上所述)之后,只需运行mikkoo应用程序,提供配置文件的路径:
mikkoo -c mikkoo.yml
除非使用-f前台cli开关,否则应用程序将尝试进行后台监控。
mikkoo的cli帮助可以用--help调用,并产生以下输出:
$ mikkoo -h usage: mikkoo [-h][-c CONFIG][-f] Mikkoo is a PgQ to RabbitMQ Relay optional arguments: -h, --help show this help message and exit -c CONFIG, --config CONFIG Path to the configuration file -f, --foreground Run the application interactively
推荐PyPI第三方库
- 热门话题
- java XStream自定义转换器,可从列表生成平面XML结构? java GridView项目文本不改变颜色 java您必须在主类中扩展JApplet吗? 如何使用java编写基于Excel的csv文件? java如何从这段代码中得到整个多边形的颜色? 具有多个表连接和sum()的java HQL,first() java如何使用一种方法将两个数组添加到一起 带鼠标+键盘的VBO Java LWJGL java如何在XDB中创建小数点为2的浮点字段 java如何从XAuthToken获得身份验证? 内存管理如何正确使用java。终结者先生? 音频如何在Java中使用PortAudio(带处理的jpab)绘制波形? java是Youtube上的视频列表,包含400个错误代码 爪哇我的巴恩斯利蕨太瘦了 java为什么Android会忽略READ_SMS权限? jpeg问题从JAVA启动MATLAB代码 java如何判断当前bash脚本是否从调用脚本调用 方法重写中的Java静态变量 java如何在peerpeer Ad hoc网络中的两个具有两跳距离的对等方之间中继RTP(实时传输协议)流 java Android:以编程方式检索资源字符串