Amazon EC2中pika-rabbitmq的合适心跳间隔是多少

2 投票
1 回答
3450 浏览
提问于 2025-04-17 16:43

我正在使用最新的 pika 库(0.9.9+)来处理 rabbitmq。以下是我使用 rabbitmq 和 pika 的情况:

  1. 我有一些长时间运行的任务(大约 5 分钟),这些任务作为工作者来执行。它们的请求来自 rabbitmq,而这些请求的到来非常不频繁,也就是说请求之间有很长的空闲时间。
  2. 我之前遇到的问题是与空闲连接有关(因为空闲连接而导致的连接关闭)。所以,我在 pika 中启用了心跳功能。
  3. 现在,心跳的选择成了一个问题。pika 似乎是一个单线程的库,心跳的接收和确认是在请求之间的时间段内进行的。
  4. 因此,如果心跳间隔设置得小于回调函数进行长时间计算所需的时间,服务器就收不到任何心跳确认,从而关闭连接。
  5. 所以,我认为最小的心跳间隔应该是回调函数在阻塞连接中进行计算的最大时间。

对于亚马逊 EC2,什么样的心跳值比较好,以防止关闭空闲连接呢?

还有一些人建议使用 rabbitmq 的 keepalive(或 libkeepalive)来保持 TCP 连接。我觉得在 TCP 层管理心跳会更好,因为应用程序就不需要去管理它们。这是真的吗?与 RMQ 的心跳相比,keepalive 是一种好的方法吗?

我看到有些人建议使用多个线程和队列来处理长时间运行的任务。但这是否是处理长时间运行任务的唯一选择?必须为这种情况使用另一个队列让我感到有些失望。

谢谢你们的帮助。我觉得我已经详细说明了问题。如果需要更多细节,请告诉我。

1 个回答

4

如果你不一定要使用pika的话,这个讨论串对我实现你想做的事情很有帮助,它是用kombu来做的:

#!/usr/bin/env python
import time, logging, weakref, eventlet
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from kombu.common import eventloop
from eventlet import spawn_after

eventlet.monkey_patch()

log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger('job_worker')
logger.setLevel(logging.INFO)


def long_running_function(body):
    time.sleep(300)

def job_worker(body, message):
    long_running_function(body)
    message.ack()

def monitor_heartbeats(connection, rate=2):
    """Function to send heartbeat checks to RabbitMQ. This keeps the
       connection alive over long-running processes."""
    if not connection.heartbeat:
        logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
        return
    interval = connection.heartbeat
    cref = weakref.ref(connection)
    logger.info("Starting heartbeat monitor.")

    def heartbeat_check():
        conn = cref()
        if conn is not None and conn.connected:
            conn.heartbeat_check(rate=rate)
            logger.info("Ran heartbeat check.")
            spawn_after(interval, heartbeat_check)
    return spawn_after(interval, heartbeat_check)

def main():
    setup_logging(loglevel='INFO')

    # process for heartbeat monitor
    p = None

    try:
        with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
            conn.ensure_connection()
            monitor_heartbeats(conn)
            queue = Queue('job_queue',
                          Exchange('job_queue', type='direct'),
                          routing_key='job_queue')
            logger.info("Starting worker.")
            with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
                consumer.qos(prefetch_count=1)
                for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
                    pass
    except KeyboardInterrupt:
        logger.info("Worker was shut down.")

if __name__ == "__main__":
    main()

我把我特定领域的代码去掉了,但基本上这就是我使用的框架。

撰写回答