This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 75f51f0198 Check auto-create for OpenWire JMS topic sub
75f51f0198 is described below

commit 75f51f0198221355e694c4db31e74a268c963cd6
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Mar 16 10:05:18 2026 -0500

    Check auto-create for OpenWire JMS topic sub
    
    Some tests needed to be updated to deal with the new behavior.
---
 .../core/protocol/openwire/OpenWireConnection.java |  2 +-
 .../core/protocol/openwire/amq/AMQSession.java     | 57 +++++++------
 .../core/protocol/openwire/util/OpenWireUtil.java  |  3 +
 .../integration/amqp/AmqpMessageRoutingTest.java   |  2 +-
 .../integration/amqp/JMSClientTestSupport.java     | 10 ++-
 .../JMSMismatchedRoutingTypeTest.java              |  4 +
 .../JMSTopicSubscriberWithSecurityTest.java        | 95 ++++++++++++++++++++++
 .../MultiprotocolJMSClientTestSupport.java         | 13 ++-
 .../openwire/OpenWireFlowControlFailTest.java      |  5 +-
 .../paging/MessagesExpiredPagingTest.java          | 17 ++--
 10 files changed, 165 insertions(+), 43 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 15415c68b3..6cb4cae0e6 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1756,7 +1756,7 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
          } catch (Exception e) {
             if (tx != null) {
                tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
-            } else if (e instanceof ActiveMQNonExistentQueueException && 
producerInfo.getDestination() == null) {
+            } else if (e instanceof InvalidDestinationException && 
producerInfo.getDestination() == null) {
                //Send exception for non transacted anonymous producers using 
an incorrect destination
                sendException(e);
             }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 4b35b2e656..ad035b0303 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -38,7 +38,6 @@ import 
org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConvert
 import 
org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.security.CheckType;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -180,14 +179,17 @@ public class AMQSession implements SessionCallback {
             }
             isInternalAddress = 
connection.isSuppressInternalManagementObjects();
          }
+
          if (openWireDest.isQueue()) {
             openWireDest = 
protocolManager.virtualTopicConsumerToFQQN(openWireDest);
-            SimpleString queueName = 
SimpleString.of(convertWildcard(openWireDest));
-
-            if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary(), 
OpenWireUtil.extractFilterStringOrNull(info, openWireDest))) {
-               throw new InvalidDestinationException("Destination doesn't 
exist: " + queueName);
-            }
          }
+
+         SimpleString queueName = 
SimpleString.of(convertWildcard(openWireDest));
+
+         boolean temporary = openWireDest.isTemporary() || 
AdvisorySupport.isAdvisoryTopic(openWireDest);
+
+         checkAutoCreate(queueName, temporary, 
OpenWireUtil.extractFilterStringOrNull(info, openWireDest), 
openWireDest.isQueue());
+
          AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, 
scheduledPool, isInternalAddress);
 
          long nativeID = consumerIDGenerator.generateID();
@@ -198,7 +200,7 @@ public class AMQSession implements SessionCallback {
       return consumersList;
    }
 
-   private boolean checkCachedExistingQueues(final SimpleString address,
+   private void checkCachedExistingQueues(final SimpleString address,
                                              final String physicalName,
                                              final boolean isTemporary) throws 
Exception {
       String[] existingQueuesCache = this.existingQueuesCache;
@@ -216,26 +218,19 @@ public class AMQSession implements SessionCallback {
       final String existingQueue = existingQueuesCache[index];
       if (existingQueue != null && existingQueue.equals(physicalName)) {
          //if the information is stale (ie no longer valid) it will fail later
-         return true;
+         return;
       }
-      final boolean hasQueue = checkAutoCreateQueue(address, isTemporary);
-      if (hasQueue) {
-         existingQueuesCache[index] = physicalName;
-      }
-      return hasQueue;
-   }
-
-   private boolean checkAutoCreateQueue(SimpleString queueName, boolean 
isTemporary) throws Exception {
-      return checkAutoCreateQueue(queueName, isTemporary, null);
+      checkAutoCreate(address, isTemporary, null, true);
+      existingQueuesCache[index] = physicalName;
    }
 
-   private boolean checkAutoCreateQueue(SimpleString queueName, boolean 
isTemporary, String filter) throws Exception {
-
-      boolean hasQueue = true;
-      if (!connection.containsKnownDestination(queueName)) {
-         RoutingType routingTypeToUse = RoutingType.ANYCAST;
-         if (CompositeAddress.isFullyQualified(queueName.toString())) {
-            SimpleString addressToUse = 
CompositeAddress.extractAddressName(queueName);
+   private void checkAutoCreate(SimpleString destinationName, boolean 
isTemporary, String filter, boolean isQueue) throws Exception {
+      if (isQueue && connection.containsKnownDestination(destinationName)) {
+         return;
+      } else {
+         RoutingType routingTypeToUse;
+         if (CompositeAddress.isFullyQualified(destinationName.toString())) {
+            SimpleString addressToUse = 
CompositeAddress.extractAddressName(destinationName);
             AddressInfo addressInfo = server.getAddressInfo(addressToUse);
             if (addressInfo != null) {
                routingTypeToUse = addressInfo.getRoutingType();
@@ -243,18 +238,22 @@ public class AMQSession implements SessionCallback {
                AddressSettings as = 
server.getAddressSettingsRepository().getMatch(addressToUse.toString());
                routingTypeToUse = as.getDefaultAddressRoutingType();
             }
+         } else if (isQueue) {
+            routingTypeToUse = RoutingType.ANYCAST;
+         } else {
+            routingTypeToUse = RoutingType.MULTICAST;
          }
-         AutoCreateResult autoCreateResult = 
coreSession.checkAutoCreate(QueueConfiguration.of(queueName)
-                                                                            
.setAddress(queueName)
+         AutoCreateResult autoCreateResult = 
coreSession.checkAutoCreate(QueueConfiguration.of(destinationName)
+                                                                            
.setAddress(destinationName)
                                                                             
.setRoutingType(routingTypeToUse)
                                                                             
.setTemporary(isTemporary)
                                                                             
.setFilterString(filter));
          if (autoCreateResult == AutoCreateResult.NOT_FOUND) {
-            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
+            throw new InvalidDestinationException("Destination '" + 
destinationName + "' doesn't exist and auto-creation is disabled.");
+         } else if (isQueue) {
+            connection.addKnownDestination(destinationName);
          }
-         connection.addKnownDestination(queueName);
       }
-      return hasQueue;
    }
 
    public void start() {
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 074e729dd0..0250c5ca79 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -33,6 +33,9 @@ public class OpenWireUtil {
    public static final String SELECTOR_AWARE_OPTION = "selectorAware";
 
    public static String extractFilterStringOrNull(final ConsumerInfo info, 
final ActiveMQDestination openWireDest) {
+      if (openWireDest.isTopic()) {
+         return info.getSelector();
+      }
       if (info.getSelector() != null) {
          if (openWireDest.getOptions()  != null) {
             if 
(Boolean.valueOf(openWireDest.getOptions().get(SELECTOR_AWARE_OPTION))) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
index 302e7d1e40..ebdaf9f8d1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
@@ -181,7 +181,7 @@ public class AmqpMessageRoutingTest extends 
JMSClientTestSupport {
    @Test
    @Timeout(60)
    public void testAMQPRouteMessageToJMSOpenWire() throws Throwable {
-      testAMQPRouteMessageToJMS(createOpenWireConnection());
+      
testAMQPRouteMessageToJMS(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(),
 null, null, null, true, false));
    }
 
    @Test
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index 12c8d7ce44..f37e86f464 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -259,9 +259,17 @@ public abstract class JMSClientTestSupport extends 
AmqpClientTestSupport {
       return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), 
username, password, null, true);
    }
 
-   private Connection createOpenWireConnection(String connectionString, String 
username, String password, String clientId, boolean start) throws JMSException {
+   protected Connection createOpenWireConnection(String connectionString, 
String username, String password, String clientId, boolean start) throws 
JMSException {
+      return createOpenWireConnection(connectionString, username, password, 
clientId, start, true);
+   }
+
+   protected Connection createOpenWireConnection(String connectionString, 
String username, String password, String clientId, boolean start, boolean 
watchAdvisories) throws JMSException {
       ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionString);
 
+      if (!watchAdvisories) {
+         factory.setWatchTopicAdvisories(false);
+      }
+
       Connection connection = 
trackJMSConnection(factory.createConnection(username, password));
 
       connection.setExceptionListener(exception -> 
exception.printStackTrace());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java
index 68cd847bb0..6a65a14114 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java
@@ -44,6 +44,10 @@ public class JMSMismatchedRoutingTypeTest extends 
MultiprotocolJMSClientTestSupp
    protected final String ANYCAST_ADDRESS = RandomUtil.randomUUIDString();
    protected final String MULTICAST_ADDRESS = RandomUtil.randomUUIDString();
 
+   protected ConnectionSupplier OpenWireConnection = () -> {
+      return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), 
null, null, null, true, false);
+   };
+
    @Override
    protected boolean isAutoCreateAddresses() {
       return false;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberWithSecurityTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberWithSecurityTest.java
new file mode 100644
index 0000000000..e07bf27333
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberWithSecurityTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import 
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class JMSTopicSubscriberWithSecurityTest extends 
MultiprotocolJMSClientTestSupport {
+
+   private SecureConnectionSupplier connectionSupplier;
+   private static final String USER = "USER";
+   private static final String PASS = "PASS";
+   private static final String ROLE = "ROLE";
+
+   @Parameters(name = "protocol={0}")
+   public static Collection getParameters() {
+      return Arrays.asList(new Object[][]{
+         {Protocol.AMQP},
+         {Protocol.CORE},
+         {Protocol.OPENWIRE}
+      });
+   }
+
+   public JMSTopicSubscriberWithSecurityTest(Protocol protocol) {
+      switch (protocol) {
+         case AMQP -> this.connectionSupplier = (username, password) -> 
createConnection(username, password);
+         case CORE -> this.connectionSupplier = (username, password) -> 
createCoreConnection(username, password);
+         case OPENWIRE -> this.connectionSupplier = (username, password) -> 
createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), username, 
password, null, true, false);
+      }
+   }
+
+   @Override
+   protected boolean isAutoCreateAddresses() {
+      return false;
+   }
+
+   @Override
+   protected boolean isSecurityEnabled() {
+      return true;
+   }
+
+   @Override
+   protected void enableSecurity(ActiveMQServer server, String... 
securityMatches) {
+      super.enableSecurity(server);
+
+      // add a new user/role who can only create non-durable queues (i.e., 
non-durable JMS subscriptions) and consume from them
+      ActiveMQJAASSecurityManager securityManager = 
(ActiveMQJAASSecurityManager) server.getSecurityManager();
+      securityManager.getConfiguration().addUser(USER, PASS);
+      securityManager.getConfiguration().addRole(USER, ROLE);
+      HierarchicalRepository<Set<Role>> securityRepository = 
server.getSecurityRepository();
+      Set<Role> value = securityRepository.getMatch(getTopicName());
+      value.add(new Role(ROLE, false, true, false, false, true, false, false, 
false, false, false));
+   }
+
+   @TestTemplate
+   public void testCreateConsumer() throws Throwable {
+      Connection connection = connectionSupplier.createConnection(USER, PASS);
+
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      assertThrows(JMSException.class, () -> 
session.createConsumer(session.createTopic(getTopicName())));
+      assertNull(server.getAddressInfo(SimpleString.of(getTopicName())));
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
index 0c315c469f..812bc247e3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
@@ -477,8 +477,19 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
                                                  String password,
                                                  String clientId,
                                                  boolean start) throws 
JMSException {
-      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionString);
+      return createOpenWireConnection(connectionString, username, password, 
clientId, start, true);
+   }
 
+   protected Connection createOpenWireConnection(String connectionString,
+                                                 String username,
+                                                 String password,
+                                                 String clientId,
+                                                 boolean start,
+                                                 boolean watchAdvisories) 
throws JMSException {
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionString);
+      if (!watchAdvisories) {
+         factory.setWatchTopicAdvisories(false);
+      }
       Connection connection = 
trackJMSConnection(factory.createConnection(username, password));
 
       connection.setExceptionListener(exception -> 
exception.printStackTrace());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
index 7125009c38..2c48e24f20 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
@@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -63,7 +62,8 @@ public class OpenWireFlowControlFailTest extends 
OpenWireTestBase {
       
server.createQueue(QueueConfiguration.of(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST));
 
       String textBody = " ".repeat(10);
-      ConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(urlString);
+      factory.setWatchTopicAdvisories(false);
       int numberOfMessage = 0;
       try (Connection connection = factory.createConnection()) {
          Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -93,6 +93,7 @@ public class OpenWireFlowControlFailTest extends 
OpenWireTestBase {
       }
 
       factory = new ActiveMQConnectionFactory(urlString);
+      factory.setWatchTopicAdvisories(false);
       try (Connection connection2 = factory.createConnection()) {
          Session session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
          javax.jms.Queue queue = 
session2.createQueue(addressInfo.getName().toString());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java
index 8f803cbc52..7b0b8b8827 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -125,32 +126,32 @@ public class MessagesExpiredPagingTest extends 
ActiveMQTestBase {
 
    @Test
    public void testSendReceiveCORELarge() throws Exception {
-      testSendReceive("CORE", 50, 20, 10, 500 * 1024);
+      testSendReceive(CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616"), 50, 20, 10, 500 * 1024);
    }
 
    @Test
    public void testSendReceiveCORE() throws Exception {
-      testSendReceive("CORE", 5000, 1000, 100, 0);
+      testSendReceive(CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616"), 5000, 1000, 100, 0);
    }
 
    @Test
    public void testSendReceiveAMQP() throws Exception {
-      testSendReceive("AMQP", 5000, 1000, 100, 0);
+      testSendReceive(CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:61616"), 5000, 1000, 100, 0);
    }
 
    @Test
    public void testSendReceiveAMQPLarge() throws Exception {
-      testSendReceive("AMQP", 50, 20, 10, 500 * 1024);
+      testSendReceive(CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:61616"), 50, 20, 10, 500 * 1024);
    }
 
    @Test
    public void testSendReceiveOpenWire() throws Exception {
-      testSendReceive("OPENWIRE", 5000, 1000, 100, 0);
+      ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory) 
CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616");
+      cf.setWatchTopicAdvisories(false);
+      testSendReceive(cf, 5000, 1000, 100, 0);
    }
 
-   public void testSendReceive(String protocol, int numberOfMessages, int 
numberOfMessageSecondWave, int pagingInterval, int bodySize) throws Exception {
-      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
-
+   public void testSendReceive(ConnectionFactory factory, int 
numberOfMessages, int numberOfMessageSecondWave, int pagingInterval, int 
bodySize) throws Exception {
       String extraBody = "*".repeat(bodySize);
 
       Consumer[] consumers = new Consumer[NUMBER_OF_QUEUES];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to