有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

JavaActiveMQ Artemis前缀为“jms.topic”到Spring引导客户端上定义的所有主题名称

我在Spring引导客户机上使用ActiveMQ Artemis 2.18.0和spring-boot-starter-artemis依赖项的2.5.5版。在我的用例中,客户需要通过主题相互交流。问题是,在客户机上定义的每个主题前面都会加上字符串jms.topic.。例如,主题foo.sendInfo变成jms.topic.foo.sendInfo

broker.xml文件如下所示。Spring引导客户机使用的acceptor61617端口上的netty-ssl-acceptor

<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>

      <persistence-enabled>true</persistence-enabled>

      <journal-type>NIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-device-block-size>4096</journal-device-block-size>

      <journal-file-size>10M</journal-file-size>
      
      <!--
       This value was determined through a calculation.
       Your system could perform 1.18 writes per millisecond
       on the current journal configuration.
       That translates as a sync write every 844000 nanoseconds.

       Note: If you specify 0 the system will perform writes directly to the disk.
             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
      -->
      <journal-buffer-timeout>844000</journal-buffer-timeout>

      <journal-max-io>1</journal-max-io>

      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>90</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      
      <page-sync-timeout>844000</page-sync-timeout>

      <acceptors>

         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
         <!-- amqpCredits: The number of credits sent to AMQP producers -->
         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
         <!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
                                      as duplicate detection requires applicationProperties to be parsed on the server. -->
         <!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
                                       default: 102400, -1 would mean to disable large mesasge control -->

         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->


         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>

         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

         <!-- STOMP Acceptor. -->
         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
         <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor>
         
         <!-- SSL Acceptor -->
         <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>

         <acceptor name ="mqtt+ssl">tcp://0.0.0.0:8883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor>

      </acceptors>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="admins, users"/>
            <permission type="deleteNonDurableQueue" roles="admins, users"/>
            <permission type="createDurableQueue" roles="admins, users"/>
            <permission type="deleteDurableQueue" roles="admins, users"/>
            <permission type="createAddress" roles="admins, users"/>
            <permission type="deleteAddress" roles="admins, users"/>
            <permission type="consume" roles="admins, users"/>
            <permission type="browse" roles="admins, users"/>
            <permission type="send" roles="admins, users"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="admins"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
            <auto-delete-addresses>false</auto-delete-addresses>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>
   </core>
</configuration>

Spring Boot客户端上的连接工厂配置如下所示

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;


@Configuration
@EnableJms
public class MQTTConfig {

    @Value("${JMS_BROKER_TRUSTSTORE}")
    private String pathToTrustStore;

    @Value("${JMS_BROKER_KEYSTORE}")
    private String pathToKeystore;

    @Value("${JMS_BROKER_TRUSTSTORE_PASSWORD}")
    private String truststorePassword;

    @Value("${JMS_BROKER_KEYSTORE_PASSWORD}")
    private String keystorePassword;



    @Bean
    public ActiveMQConnectionFactory artemisSSLConnectionFactory() {
        ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" +
                "trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit");
        artemisConnectionFactory.setUser("user");
        artemisConnectionFactory.setPassword("password");
        return artemisConnectionFactory;
    }

    /**
     * Initialise {@link JmsTemplate} as required
     */
    @Bean
    public JmsTemplate jmsTemplate() throws JMSException {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(artemisSSLConnectionFactory());
        jmsTemplate.setExplicitQosEnabled(true);

        //setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    }

    /**
     * Initialise {@link DefaultJmsListenerContainerFactory} as required
     */
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(artemisSSLConnectionFactory());
        //setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues
        factory.setPubSubDomain(true);
        return factory;
    }
}

下面是POM文件,只有相关的依赖项

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
    <version>2.5.5</version>
</dependency>

下面的代码片段显示了发布主题server.weatherForecast的生产者和订阅同一主题的消费者。消息在生产者和消费者之间毫无问题地交换,因为在Spring引导客户机上定义的每个主题前面都有jms.topic.。但是,当我使用外部工具订阅MQTT消息时,除非订阅的主题从server.weatherForecast更改为jms.topic.server.weatherForecast,否则不会收到关于该工具上定义的主题的消息

import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public void samplePC() {

    @Autowired
    private JMSTemplate jmsTemplate;

    //producer that is called by a cron job
    public void tester() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("serialNumber", "105");
        jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray());
    }

    //consumer (a message from the producer should be received here, but nothing arrives)
    @JmsListener(destination = "server/forecast")
    private void consumeWeatherForecastRequest(char[] incomingMessage) {
        //some logic
        jmsTemplate.convertAndSend("someTopic", "someMessage");
    }
}

在为RemotingConnectionImpl启用TRACE日志记录时,我看到在CreateSessionResponseMessage中,serverVersion属性的值为131,在CreateSessionMessage中,version属性的值为127

如何确保jms.topic.不是主题名称的前缀

可以从thisGitHub存储库下载一个最小的可复制示例。 我试图在代码中记录前缀,但没有找到任何方法,所有日志只显示主题名,没有前缀。但是,从外部客户端订阅要发布到的主题时,应指明前缀。在订阅topicNamejms.topic.topicName时,很明显消息将被传递给后者。我注意到一些客户解析了“.”以“/”的形式出现,这样就可以在出现“.”的情况下尝试其他方法没用


共 (1) 个答案

  1. # 1 楼答案

    我接受了你的reproducer,我设法重新创建了你看到的客户机使用jms.topic.test.topic的地方的问题。然而,一旦我在broker.xml中的"artemis" ^{}中添加了multicastPrefix=jms.topic.,问题就消失了。代理现在去掉客户机的前缀,改为使用test.topic

    您确实在“netty ssl接受器”multicastPrefix=jms.topic.上设置了acceptor,但您的客户机实际上没有使用该接受器

    我还运行了mvn dependency:tree来了解应用程序为什么使用ActiveMQ Artemis 1.3.0客户端。这是它的输出(部分):

    [INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
    [INFO]    +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
    [INFO]    +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
    [INFO]    \- org.apache.activemq:artemis-jms-client:jar:1.3.0:compile
    [INFO]       +- org.apache.activemq:artemis-core-client:jar:1.3.0:compile
    [INFO]       |  +- org.jgroups:jgroups:jar:3.6.9.Final:compile
    [INFO]       |  +- org.apache.activemq:artemis-commons:jar:1.3.0:compile
    [INFO]       |  |  +- commons-beanutils:commons-beanutils:jar:1.9.2:compile
    [INFO]       |  |  |  \- commons-collections:commons-collections:jar:3.2.2:compile
    [INFO]       |  |  \- com.google.guava:guava:jar:18.0:compile
    [INFO]       |  \- io.netty:netty-all:jar:4.0.32.Final:compile
    [INFO]       +- org.apache.activemq:artemis-selector:jar:1.3.0:compile
    [INFO]       \- javax.inject:javax.inject:jar:1:compile
    

    因此,对org.apache.activemq:artemis-jms-client:jar:1.3.0的依赖似乎直接来自org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5,这非常奇怪,因为it has clearly defined依赖于org.apache.activemq:artemis-jms-client:jar:2.17.0。但是,如果我将<parent>改为使用2.5.5而不是1.4.1.RELEASE,问题就会消失,例如:

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.5.5</version>
            <relativePath/> <!  lookup parent from repository  >
        </parent>
    

    这就是mvn dependency:tree现在的输出(部分):

    [INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
    [INFO]    +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
    [INFO]    +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
    [INFO]    \- org.apache.activemq:artemis-jms-client:jar:2.17.0:compile
    [INFO]       +- org.apache.activemq:artemis-core-client:jar:2.17.0:compile
    [INFO]       |  +- org.jgroups:jgroups:jar:3.6.13.Final:compile
    [INFO]       |  +- org.apache.johnzon:johnzon-core:jar:1.2.14:compile
    [INFO]       |  +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.68.Final:compile
    [INFO]       |  |  \- io.netty:netty-transport-native-unix-common:jar:4.1.68.Final:compile
    [INFO]       |  +- io.netty:netty-transport-native-kqueue:jar:osx-x86_64:4.1.68.Final:compile
    [INFO]       |  +- io.netty:netty-codec-http:jar:4.1.68.Final:compile
    [INFO]       |  +- io.netty:netty-buffer:jar:4.1.68.Final:compile
    [INFO]       |  +- io.netty:netty-transport:jar:4.1.68.Final:compile
    [INFO]       |  |  \- io.netty:netty-resolver:jar:4.1.68.Final:compile
    [INFO]       |  +- io.netty:netty-handler:jar:4.1.68.Final:compile
    [INFO]       |  +- io.netty:netty-handler-proxy:jar:4.1.68.Final:compile
    [INFO]       |  +- io.netty:netty-codec:jar:4.1.68.Final:compile
    [INFO]       |  +- io.netty:netty-codec-socks:jar:4.1.68.Final:compile
    [INFO]       |  \- io.netty:netty-common:jar:4.1.68.Final:compile
    [INFO]       +- org.apache.activemq:artemis-commons:jar:2.17.0:compile
    [INFO]       |  +- org.jboss.logging:jboss-logging:jar:3.4.2.Final:compile
    [INFO]       |  \- commons-beanutils:commons-beanutils:jar:1.9.4:compile
    [INFO]       |     \- commons-collections:commons-collections:jar:3.2.2:compile
    [INFO]       \- org.apache.activemq:artemis-selector:jar:2.17.0:compile