从Java与Django/Celery互操作

14 投票
1 回答
3172 浏览
提问于 2025-04-16 22:50

我们公司有一个基于Python的网站,还有一些基于Python的工作节点,它们通过Django/Celery和RabbitMQ进行通信。我有一个基于Java的应用程序,需要向Celery的工作节点提交任务。我可以从Java顺利地将任务发送到RabbitMQ,但Celery的工作节点却从来没有接收到这些任务。从我查看的两种任务提交的数据包捕获来看,它们之间有一些不同之处,但我搞不清楚这些差异的原因,因为很多内容是二进制的,我找不到相关的解码文档。这里有没有人有关于Java/RabbitMQ和Celery一起工作的参考资料或经验?

1 个回答

12

我找到了答案。RabbitMQ的Java库提到了交换机、队列和路由键。在Celery中,队列的名字实际上是和Java库中提到的交换机对应的。默认情况下,Celery的队列名字就是“celery”。如果你的Django设置里定义了一个叫“myqueue”的队列,使用的语法如下:

CELERY_ROUTES = {
    'mypackage.myclass.runworker'      : {'queue':'myqueue'},
}

那么基于Java的代码需要做的事情大概是这样的:

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null ;
        try {
            connection = factory.newConnection(mqHost, mqPort);
        } catch (IOException ioe) {
            log.error("Unable to create new MQ connection from factory.", ioe) ;
        }

        Channel channel = null ;
        try {
            channel = connection.createChannel();
        } catch (IOException ioe) {
            log.error("Unable to create new channel for MQ connection.", ioe) ;
        }

        try {
            channel.queueDeclare("celery", false, false, false, true, null);
        } catch (IOException ioe) {
            log.error("Unable to declare queue for MQ channel.", ioe) ;
        }

        try {
            channel.exchangeDeclare("myqueue", "direct") ;
        } catch (IOException ioe) {
            log.error("Unable to declare exchange for MQ channel.", ioe) ;
        }

        try {
            channel.queueBind("celery", "myqueue", "myqueue") ;
        } catch (IOException ioe) {
            log.error("Unable to bind queue for channel.", ioe) ;
        }

            // Generate the message body as a string here.

        try {
            channel.basicPublish(mqExchange, mqRouteKey, 
                new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
                messageBody.getBytes("ASCII"));
        } catch (IOException ioe) {
            log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
        }

其实这只是术语上的差别。

撰写回答