有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java在Spring引导时为Spring AMQP和RabbitMQ动态设置主机

我有一个问题,我不知道如何动态设置主机并在不同的主机上执行RPC操作

情况是这样的

我在不同的服务器和网络上运行了多个RabbitMQ(即192.168.1.0/24、192.168.2.0/24)

行为将是我有一个IP地址列表,我将使用它执行RPC。 因此,对于ip地址列表中的每个条目,我希望执行convertSendAndReceive并处理回复,依此类推

在文档中尝试了一些代码,但它似乎不起作用,即使有效的RabbitMQ(例如,运行在192.168.1.1上的)接收到无效地址(没有运行有效RabbitMQ的地址,或者网络上不存在事件的地址,例如1.1.1)

注意:我可以在正确的地址上成功执行RPC调用,但是,我也可以在无效的地址上成功执行RPC调用,我不认为无效的地址是

有人知道这件事吗

这是我的消息来源

任务计划配置。java

@Configuration
@EnableScheduling
public class TaskSchedulerConfiguration {
    @Autowired
    private IpAddressRepo ipAddressRepo;

    @Autowired
    private RemoteProcedureService remote;

    @Scheduled(fixedDelayString  = "5000", initialDelay = 2000)
    public void scheduledTask() {
        ipAddressRepo.findAll().stream()
              .forEach(ipaddress -> {
                boolean status = false;
                try {
                    remote.setIpAddress(ipaddress);
                    remote.doSomeRPC();                 
                } catch (Exception e) {
                    logger.debug("Unable to Connect to licenser server: {}", license.getIpaddress());
                    logger.debug(e.getMessage(), e);
                } 
              });

    }

}

远程过程保留。java

@Service
public class RemoteProcedureService {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private DirectExchange exchange;


    public boolean doSomeRPC() throws JsonProcessingException {

        //I passed this.factory.getHost() so that i will know if only the valid ip address will be received by the other side
        //at this point, other side receives invalid ipaddress which supposedly will not be receive by the oher side
        boolean response = (Boolean) template.convertSendAndReceive(exchange.getName(), "rpc", this.factory.getHost());
        return response;
    }

    public void setIpAddress(String host) {
        factory.setHost(host);
        factory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
        factory.setPort(prop.getRabbitMQPort());
        factory.setUsername(prop.getRabbitMQUsername());
        factory.setPassword(prop.getRabbitMQPassword());
        template.setConnectionFactory(factory);

    }

}

AMQP配置。爪哇

@Configuration
public class AmqpConfiguration {
    public static final String topicExchangeName = "testExchange";

    public static final String queueName = "rpc";

    @Autowired
    private LicenseVisualizationProperties prop;

//Commented this out since this will only be assigne once
//i need to achieve to set it dynamically in order to send to different hosts
//so put it in RemoteProcedureService.java, but it never worked
//    @Bean
//    public ConnectionFactory connectionFactory() {
//        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
//        connectionFactory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
//        connectionFactory.setPort(prop.getRabbitMQPort());
//        connectionFactory.setUsername(prop.getRabbitMQUsername());
//        connectionFactory.setPassword(prop.getRabbitMQPassword());
//        return connectionFactory;
//    }

    @Bean
    public DirectExchange exhange() {
        return new DirectExchange(topicExchangeName);
    }

}

更新1

似乎在循环期间,当在CachingConnectionFactory后续ip寻址循环中设置有效ip时,CachingConnectionFactory中的第一个有效ip集收到有效或无效的消息

更新2

我发现,一旦它能够成功建立连接,它就不会创建新连接。如何强制RabbitTemplate建立新连接


共 (1) 个答案

  1. # 1 楼答案

    这是一个相当奇怪的用例,不会很好地执行;最好有一个连接工厂和模板池

    但是,要回答您的问题:

    调用resetConnection()关闭连接