Mikkoo是一个PGQ到RabbitMQ的中继

mikkoo的Python项目详细描述


一个PgQRabbitMQ继电器。Mikkoo是PGQ的消费者 发布给RabbitMQ的。此外,它还包括一个内置的审计 可用于确认所有PGQ事件均由 拉比马克。

在“聪明的兔子和大象”的寓言中,米谷是以兔子命名的。

VersionDownloadsStatus

安装

Mikkoo在Python Package Index上有售 可通过pip安装:

pip install mikkoo

一旦你设置好了Skytools你可能想 在mikkoo.sql中安装可选的包含实用程序函数以使用 更容易的。

您可以结合使用curlpsql

curl -L https://github.com/gmr/mikkoo/blob/master/mikkoo.sql | psql

这将在mikkoo模式中安装多个存储过程和一个审计表。 看一下ddl,了解每个函数是什么,以及它是如何实现的。 被利用。

PGQ设置

  1. pgq安装到数据库中并创建队列:

    #CREATEEXTENSIONpgq;CREATEEXTENSION#SELECTpgq.create_queue('test');create_queue--------------
    1(1row)
  2. 确保pgqd 正在运行。

PGQ事件到AMQP映射

将事件插入pgq队列时,pgq.insert_event/7函数 应与以下字段映射一起使用:

PgQ EventAMQP
^{tt5}$Routing Key
^{tt6}$Message body
^{tt7}$Exchange
^{tt8}$Content-Type Property
^{tt9}$AMQP Properties [1]
^{tt10}$Headers [2]
^{tt11}$Headers.timetamp
^{tt12}$Headers.txid
[1]AMQP properties should be set as a JSON blob. Values set in the ^{tt9}$ field will overwrite the automatically created properties ^{tt14}$, ^{tt15}$, ^{tt16}$, ^{tt17}$, and ^{tt18}$.
[2]If ^{tt10}$ is specified and is a JSON key/value dictionary, it will be assigned to the ^{tt17}$ AMQP property.

mikkoo.sql文件中有一个方便的模式 用于在pgq中创建格式正确的mikkoo事件的存储过程。在 此外,还有一些审计功能允许创建 发送到PGQ的事件的审核日志。

AMQP消息属性

下表定义了可以在json blob中设置的可用字段 在插入事件时在ev_extra3字段中。

PropertyPgSQL Type
^{tt14}$text
^{tt23}$text
^{tt15}$text
^{tt16}$text
^{tt26}$int2
^{tt27}$text
^{tt28}$text
^{tt17}$text/json [3]
^{tt18}$int4
^{tt31}$text
^{tt32}$int4
^{tt33}$text
[3]^{tt17}$ should be sent to a key/value JSON blob if specified

在提供给ev_extra3的json blob中分配的值优先于 自动分配的app_idcontent_typecorrelation_idheaders,和timestamp由mikkoo在处理时创建的值。

从1.0开始,mikkoo将自动添加四个amqp headers属性值。这些 值不会覆盖在ev_extra4中指定的同名值。 使用相同的名称覆盖值,即使在 sequence值是一个动态生成的id,它试图提供 模糊分布式订购信息。timestamp值是iso-8601 在添加事件时创建的ev_time字段的表示 去PGQ。txid值是ev_txid值,pgq事务id 事件。添加这些值有助于提供某种程度的确定性 命令。origin值是mikkoo正在运行的服务器的主机名 打开。

事件插入示例

下面的示例插入一个json blob消息体{"foo": "bar"} 将使用^{tt50}发布到rabbitmq中的postgresexchange$ 路由密钥。内容类型在ev_extra2和amqp^{tt31}中指定$ 消息属性在ev_extra3中指定。

#SELECTpgq.insert_event('test','test.routing-key','{"foo": "bar"}','postgres','application/json','{"type": "example"}','');insert_event--------------
4(1row)

当rabbitmq接收到此消息时,它的消息体将为:

{"foo":"bar"}

它将具有类似于以下的消息属性:

配置

mikkoo配置文件使用YAML进行标记,并允许 一个或多个要处理的PGQ队列。

如果您有哨兵或哨兵帐户,则Application/sentry_dsn设置 如果 raven已安装客户端库。

队列按名称在Application/workers节下配置。这个 下面的示例为处理名为 invoices。每个工作进程都连接到本地postgresql和rabbitmq 使用默认凭据的实例。

Application:workers:invoices:postgres_url:postgresql://localhost:5432/postgresrabbitmq_url:amqp://localhost:5672/%2fconfirm:False

队列配置选项

下表详细列出了每个队列可用的配置选项:

PropertyExample Value
^{tt14}$^{tt55}$
^{tt15}$^{tt57}$
^{tt16}$^{tt59}$
^{tt17}$
KeyExample Value
^{tt47}$^{tt62}$
^{tt42}$^{tt64}$
^{tt18}$^{tt66}$
^{tt45}$^{tt68}$
timestamp1449600290
typeexample
KeyDescription
^{tt76}$Enable/Disable RabbitMQ Publisher Confirmations. Default: ^{tt77}$
^{tt78}$Overwrite the default PgQ consumer name. Default: ^{tt55}$
^{tt80}$Maximum failures before discarding an event. Default: ^{tt81}$
^{tt82}$The url for connecting to PostgreSQL
^{tt83}$The AMQP url for connecting to RabbitMQ
^{tt84}$How long in seconds until PgQ emits failed events. Default: ^{tt81}$
^{tt86}$Unregister a consumer with PgQ on shutdown. Default: ^{tt77}$
^{tt88}$How long to wait before checking the queue after the last empty result. Default: ^{tt89}$

示例配置

以下是完整配置文件的示例:

Application:poll_interval:10sentry_dsn:[YOUR SENTRY DSN]statsd:enabled:truehost:localhostport:8125workers:test:confirm:Falseconsumer_name:my_consumermax_failures:5postgres_url:postgresql://localhost:5432/postgresrabbitmq_url:amqp://localhost:5672/%2fretry_delay:5unregister:Falsewait_duration:5Daemon:user:mikkoopidfile:/var/run/mikkooLogging:version:1formatters:verbose:format:'%(levelname)-10s%(asctime)s%(process)-6d%(processName)-20s%(name)-18s:%(message)s'datefmt:'%Y-%m-%d%H:%M:%S'handlers:console:class:logging.StreamHandlerformatter:verbosedebug_only:Trueloggers:helper:handlers:[console]level:INFOpropagate:truemikkoo:handlers:[console]level:INFOpropagate:truepika:handlers:[console]level:ERRORpropagate:truequeries:handlers:[console]level:ERRORpropagate:truetornado:handlers:[console]level:ERRORpropagate:trueroot:handlers:[console]level:CRITICALpropagate:truedisable_existing_loggers:trueincremental:false

运行Mikkoo

在为mikkoo创建一个配置文件(如上所述)之后,只需运行mikkoo应用程序,提供配置文件的路径:

mikkoo -c mikkoo.yml

除非使用-f前台cli开关,否则应用程序将尝试进行后台监控。

mikkoo的cli帮助可以用--help调用,并产生以下输出:

$ mikkoo -h
usage: mikkoo [-h][-c CONFIG][-f]

Mikkoo is a PgQ to RabbitMQ Relay

optional arguments:
  -h, --help            show this help message and exit
  -c CONFIG, --config CONFIG
                        Path to the configuration file
  -f, --foreground      Run the application interactively

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java XStream自定义转换器,可从列表生成平面XML结构?   java GridView项目文本不改变颜色   java您必须在主类中扩展JApplet吗?   如何使用java编写基于Excel的csv文件?   java如何从这段代码中得到整个多边形的颜色?   具有多个表连接和sum()的java HQL,first()   java如何使用一种方法将两个数组添加到一起   带鼠标+键盘的VBO Java LWJGL   java如何在XDB中创建小数点为2的浮点字段   java如何从XAuthToken获得身份验证?   内存管理如何正确使用java。终结者先生?   音频如何在Java中使用PortAudio(带处理的jpab)绘制波形?   java是Youtube上的视频列表,包含400个错误代码   爪哇我的巴恩斯利蕨太瘦了   java为什么Android会忽略READ_SMS权限?   jpeg问题从JAVA启动MATLAB代码   java如何判断当前bash脚本是否从调用脚本调用   方法重写中的Java静态变量   java如何在peerpeer Ad hoc网络中的两个具有两跳距离的对等方之间中继RTP(实时传输协议)流   java Android:以编程方式检索资源字符串