java在Spring引导时为Spring AMQP和RabbitMQ动态设置主机
我有一个问题,我不知道如何动态设置主机并在不同的主机上执行RPC操作
情况是这样的
我在不同的服务器和网络上运行了多个RabbitMQ(即192.168.1.0/24、192.168.2.0/24)
行为将是我有一个IP地址列表,我将使用它执行RPC。
因此,对于ip地址列表中的每个条目,我希望执行convertSendAndReceive
并处理回复,依此类推
在文档中尝试了一些代码,但它似乎不起作用,即使有效的RabbitMQ(例如,运行在192.168.1.1上的)接收到无效地址(没有运行有效RabbitMQ的地址,或者网络上不存在事件的地址,例如1.1.1)
注意:我可以在正确的地址上成功执行RPC调用,但是,我也可以在无效的地址上成功执行RPC调用,我不认为无效的地址是
有人知道这件事吗
这是我的消息来源
任务计划配置。java
@Configuration
@EnableScheduling
public class TaskSchedulerConfiguration {
@Autowired
private IpAddressRepo ipAddressRepo;
@Autowired
private RemoteProcedureService remote;
@Scheduled(fixedDelayString = "5000", initialDelay = 2000)
public void scheduledTask() {
ipAddressRepo.findAll().stream()
.forEach(ipaddress -> {
boolean status = false;
try {
remote.setIpAddress(ipaddress);
remote.doSomeRPC();
} catch (Exception e) {
logger.debug("Unable to Connect to licenser server: {}", license.getIpaddress());
logger.debug(e.getMessage(), e);
}
});
}
}
远程过程保留。java
@Service
public class RemoteProcedureService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange exchange;
public boolean doSomeRPC() throws JsonProcessingException {
//I passed this.factory.getHost() so that i will know if only the valid ip address will be received by the other side
//at this point, other side receives invalid ipaddress which supposedly will not be receive by the oher side
boolean response = (Boolean) template.convertSendAndReceive(exchange.getName(), "rpc", this.factory.getHost());
return response;
}
public void setIpAddress(String host) {
factory.setHost(host);
factory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
factory.setPort(prop.getRabbitMQPort());
factory.setUsername(prop.getRabbitMQUsername());
factory.setPassword(prop.getRabbitMQPassword());
template.setConnectionFactory(factory);
}
}
AMQP配置。爪哇
@Configuration
public class AmqpConfiguration {
public static final String topicExchangeName = "testExchange";
public static final String queueName = "rpc";
@Autowired
private LicenseVisualizationProperties prop;
//Commented this out since this will only be assigne once
//i need to achieve to set it dynamically in order to send to different hosts
//so put it in RemoteProcedureService.java, but it never worked
// @Bean
// public ConnectionFactory connectionFactory() {
// CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// connectionFactory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
// connectionFactory.setPort(prop.getRabbitMQPort());
// connectionFactory.setUsername(prop.getRabbitMQUsername());
// connectionFactory.setPassword(prop.getRabbitMQPassword());
// return connectionFactory;
// }
@Bean
public DirectExchange exhange() {
return new DirectExchange(topicExchangeName);
}
}
更新1
似乎在循环期间,当在CachingConnectionFactory
后续ip寻址循环中设置有效ip时,CachingConnectionFactory
中的第一个有效ip集收到有效或无效的消息
更新2
我发现,一旦它能够成功建立连接,它就不会创建新连接。如何强制RabbitTemplate
建立新连接
# 1 楼答案
这是一个相当奇怪的用例,不会很好地执行;最好有一个连接工厂和模板池
但是,要回答您的问题:
调用
resetConnection()
关闭连接