当我试图使用来自z的传出AMQP获取消息时,rabbitMQ返回队列为空

2024-04-26 18:44:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个由ESB(zato)调用的服务,该服务的作用是通过AMQP传出在rabbitMQ中发布消息,但是当我咨询rabbitMQ并生成get message时,答案是queue is空的。这个在扎托服役吗

from zato.server.service import Service

class HelloService(Service):
    def handle(self):

        # Request parameters
        msg = 'Hello AMQP broker!'
        out_name = 'My CRM connection'
        exchange = 'My exchange'
        routing_key = ''
        properties = {'app_id': 'ESB'}
        headers = {'X-Foo': 'bar'}

        # Send a message to the broker
        self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
            properties, headers)

Tags: keynameselfamqpmessageexchangemyservice
1条回答
网友
1楼 · 发布于 2024-04-26 18:44:08

从zato服务的rabbit队列中消费的完整工作示例如下:

在兔子身上

  1. 创建交换
  2. 创建队列
  3. 将队列绑定到交换机
  4. 在zato中创建连接定义
  5. 在zato中创建传出AMQP连接定义
  6. 编写要发布或使用的zato服务

前三个步骤可以通过多种方式完成,下面是一个简单的python脚本,您可以使用它(只需安装kombu,然后单击):

import click
import os
import sys
import settings
from kombu import Connection, Exchange, Queue


BROKER_URL = 'amqp://{user}:{password}@{server}:{port}/{vhost}'.format(user=settings.RABBIT_USER,
                                                                       password=settings.RABBIT_PASS,
                                                                       server=settings.RABBIT_SERVER,
                                                                       port=settings.RABBIT_PORT,
                                                                       vhost=settings.RABBIT_VHOST)


@click.command()
@click.option(' remove/ no-remove', default=False, help='Remove current Queues/Exchanges.')
@click.option(' create/ no-create', default=False, help='Create needed Queues/Exchanges')
def job(remove, create):
    exchanges = {'dead_letter': Exchange(name=settings.DEAD_LETTER_EXCHANGE,
                                         type=settings.DEAD_LETTER_EXCHANGE_TYPE,
                                         durable=settings.DEAD_LETTER_EXCHANGE_DURABLE),
                 'results': Exchange(name=settings.RESULTS_EXCHANGE_NAME,
                                     type=settings.RESULTS_EXCHANGE_TYPE,
                                     durable=settings.RESULTS_EXCHANGE_DURABLE)}

    queues = {'dead_letter': Queue(name=settings.DEAD_LETTER_QUEUE,
                                   exchange=exchanges['dead_letter'],
                                   routing_key=settings.DEAD_LETTER_ROUTING,
                                   durable=settings.DEAD_LETTER_EXCHANGE_DURABLE),
              'results': Queue(name=settings.RESULTS_QUEUE_NAME,
                               exchange=exchanges['results'],
                               routing_key=settings.RESULTS_QUEUE_ROUTING,
                               durable=settings.RESULTS_EXCHANGE_DURABLE),
              'task': Queue(name=settings.TASK_QUEUE_NAME,
                            exchange=exchanges['results'],
                            routing_key=settings.TASK_ROUTING_KEY,
                            queue_arguments={
                                "x-message-ttl": settings.TASK_QUEUE_TTL,
                                "x-dead-letter-exchange": settings.DEAD_LETTER_EXCHANGE,
                                "x-dead-letter-routing-key": settings.DEAD_LETTER_ROUTING})}

    print 'using broker: {}'.format(BROKER_URL)

    with Connection(BROKER_URL) as conn:
        channel = conn.channel()
        if remove:
            # remove exchanges
            for (key, exchange) in exchanges.items():
                print 'removing exchange: {}'.format(exchange.name)
                bound_exchange = exchange(channel)
                bound_exchange.delete()

            # remove queues
            for (key, queue) in queues.items():
                print 'removing queue {} '.format(queues[key].name)
                bound_queue = queues[key](channel)
                bound_queue.delete()

        if create:
            # create exchanges
            for (key, exchange) in exchanges.items():
                print 'creating exchange: {}'.format(exchange.name)
                bound_exchange = exchange(channel)
                bound_exchange.declare()

            # add queues
            for (key, queue) in queues.items():
                # if key in exchanges:
                print 'binding queue {} to exchange {} with routing key {}'.format(queue.name,
                                                                                   queue.exchange.name,
                                                                                   queue.routing_key)
                bound_queue = queue(channel)
                bound_queue.declare()


if __name__ == '__main__':
    job()

以及设置文件:

^{pr2}$

现在,让我们在一个新的virtualenv上使用python2.7创建运行上述脚本的队列:

$ virtualenv rabbit_test
New python executable in /home/ivan/rabbit_test/bin/python
Installing setuptools, pip, wheel...done.

$ source /home/ivan/rabbit_test/bin/activate

$ pip install kombu
Collecting kombu
...
$ pip install click
Collecting click
...

复制上面的脚本

$ mkdir ~/rabbit_test/app
$ vi ~/rabbit_test/app/create_queues.py
$ vi ~/rabbit_test/app/settings.py

然后运行create_队列.py. 在

$ cd ~/rabbit_test/app
$ python create_queues.py  create
using broker: amqp://guest:guest@localhost:5672//
creating exchange: test.service.results
creating exchange: test.service.deadletter
binding queue test.service.request to exchange test.service.results with routing key request
binding queue test.service.results to exchange test.service.results with routing key results
binding queue test.service.deadletter to exchange test.service.deadletter with routing key deadletter

您可以使用cli工具或management plugin验证交换和队列是否在rabbit上:

$ rabbitmqadmin list exchanges
+            -+    -+
|          name           |  type   |
+            -+    -+
| test.service.deadletter | direct  |
| test.service.results    | direct  |
+            -+    -+

$ rabbitmqadmin list queues
+            -+     +
|          name           | messages |
+            -+     +
| test.service.deadletter | 0        |
| test.service.request    | 0        |
| test.service.results    | 0        |
+            -+     +

$ rabbitmqadmin list bindings
+            -+            -+            -+
|         source          |       destination       |       routing_key       |
+            -+            -+            -+
|                         | test.service.deadletter | test.service.deadletter |
|                         | test.service.request    | test.service.request    |
|                         | test.service.results    | test.service.results    |
| test.service.deadletter | test.service.deadletter | deadletter              |
| test.service.results    | test.service.request    | request                 |
| test.service.results    | test.service.results    | results                 |
+            -+            -+            -+

现在zato部分(步骤4、5和6)可以使用公共api或webadmin来完成,我将向您展示如何使用公共api来完成它,但是通过UI来完成更容易,因为这只需要很少的时间。在

创建AMQP连接定义doc

$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
    "cluster_id": 1,
    "name": "SO_Test",
    "host": "127.0.0.1",
    "port": "5672",
    "vhost": "/",
    "username": "guest",
    "frame_max": 131072,
    "heartbeat": 10
}' "http://localhost:11223/zato/json/zato.definition.amqp.create"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K04DWBPMYF8A7768C7N482E75YM3"
  },
  "zato_definition_amqp_create_response": {
    "id": 2,
    "name": "SO_Test"
  }
}

为我们的AMQP连接设置密码doc

$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw=="  -d '{
    "id": 2,
    "password1": "guest",
    "password2": "guest"
}' "http://localhost:11223/zato/json/zato.definition.amqp.change-password"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K07K9YY21XZAX4QKWJB3ZFXN2ZFT"
  }
}

创建传出AMQP连接定义doc

curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
    "cluster_id": 1,
    "name": "SO Test",
    "is_active": true,
    "def_id": 2,
    "delivery_mode": 1,
    "priority": 6,
    "content_type": "application/json",
    "content_encoding": "utf-8",
    "expiration": 30000
}' "http://localhost:11223/zato/json/zato.outgoing.amqp.create"

{
  "zato_outgoing_amqp_create_response": {
    "id": 1,
    "name": "SO Test"
  },
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K05F2CR954BFNBP14KGTM26V47PC"
  }
}

最后发送消息的服务

from zato.server.service import Service

class HelloService(Service):
    def handle(self):
        # Request parameters 
        msg = 'Hello AMQP broker!'
        out_name = 'SO Test'
        exchange = 'test.service.results'
        routing_key = 'request'
        properties = {'app_id': 'ESB', 'user_id': 'guest'}
        headers = {'X-Foo': 'bar'}

        # Send a message to the broker
        info = self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
            properties, headers)
        self.logger.info(info)

如果要使用用户标识属性,它必须与连接用户标识相匹配,否则请求将失败。在

另外请注意,我在这里创建了一个死信交换,如果消息仍在test.service.request队列中,则消息将在30秒后发送到此处

最后一步是测试

为了验证消息是否传递到队列,我们可以创建http/soap通道或直接调用服务,我使用公共api执行后者。在

curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
   "name": "test.hello-service",
   "data_format": "json"
}' "http://localhost:11223/zato/json/zato.service.invoke"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K050J64QQ8FXASXHKVCAQNC4JC4N"
  },
  "zato_service_invoke_response": {
    "response": ""
  }
}

之后,我们检查队列中是否有刚刚发送的消息:

$ rabbitmqadmin get queue=test.service.request requeue=true
+      -+           +       -+          +       -+         +      -+
| routing_key |       exchange       | message_count |      payload       | payload_bytes | payload_encoding | redelivered |
+      -+           +       -+          +       -+         +      -+
| request     | test.service.results | 0             | Hello AMQP broker! | 18            | string           | False       |
+      -+           +       -+          +       -+         +      -+

记住检查rabbit和zato服务器日志,以防您仍然有任何问题。在

相关问题 更多 >