This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 75f51f0198 Check auto-create for OpenWire JMS topic sub
75f51f0198 is described below
commit 75f51f0198221355e694c4db31e74a268c963cd6
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Mar 16 10:05:18 2026 -0500
Check auto-create for OpenWire JMS topic sub
Some tests needed to be updated to deal with the new behavior.
---
.../core/protocol/openwire/OpenWireConnection.java | 2 +-
.../core/protocol/openwire/amq/AMQSession.java | 57 +++++++------
.../core/protocol/openwire/util/OpenWireUtil.java | 3 +
.../integration/amqp/AmqpMessageRoutingTest.java | 2 +-
.../integration/amqp/JMSClientTestSupport.java | 10 ++-
.../JMSMismatchedRoutingTypeTest.java | 4 +
.../JMSTopicSubscriberWithSecurityTest.java | 95 ++++++++++++++++++++++
.../MultiprotocolJMSClientTestSupport.java | 13 ++-
.../openwire/OpenWireFlowControlFailTest.java | 5 +-
.../paging/MessagesExpiredPagingTest.java | 17 ++--
10 files changed, 165 insertions(+), 43 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 15415c68b3..6cb4cae0e6 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1756,7 +1756,7 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
} catch (Exception e) {
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
- } else if (e instanceof ActiveMQNonExistentQueueException &&
producerInfo.getDestination() == null) {
+ } else if (e instanceof InvalidDestinationException &&
producerInfo.getDestination() == null) {
//Send exception for non transacted anonymous producers using
an incorrect destination
sendException(e);
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 4b35b2e656..ad035b0303 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -38,7 +38,6 @@ import
org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConvert
import
org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.security.CheckType;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -180,14 +179,17 @@ public class AMQSession implements SessionCallback {
}
isInternalAddress =
connection.isSuppressInternalManagementObjects();
}
+
if (openWireDest.isQueue()) {
openWireDest =
protocolManager.virtualTopicConsumerToFQQN(openWireDest);
- SimpleString queueName =
SimpleString.of(convertWildcard(openWireDest));
-
- if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary(),
OpenWireUtil.extractFilterStringOrNull(info, openWireDest))) {
- throw new InvalidDestinationException("Destination doesn't
exist: " + queueName);
- }
}
+
+ SimpleString queueName =
SimpleString.of(convertWildcard(openWireDest));
+
+ boolean temporary = openWireDest.isTemporary() ||
AdvisorySupport.isAdvisoryTopic(openWireDest);
+
+ checkAutoCreate(queueName, temporary,
OpenWireUtil.extractFilterStringOrNull(info, openWireDest),
openWireDest.isQueue());
+
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info,
scheduledPool, isInternalAddress);
long nativeID = consumerIDGenerator.generateID();
@@ -198,7 +200,7 @@ public class AMQSession implements SessionCallback {
return consumersList;
}
- private boolean checkCachedExistingQueues(final SimpleString address,
+ private void checkCachedExistingQueues(final SimpleString address,
final String physicalName,
final boolean isTemporary) throws
Exception {
String[] existingQueuesCache = this.existingQueuesCache;
@@ -216,26 +218,19 @@ public class AMQSession implements SessionCallback {
final String existingQueue = existingQueuesCache[index];
if (existingQueue != null && existingQueue.equals(physicalName)) {
//if the information is stale (ie no longer valid) it will fail later
- return true;
+ return;
}
- final boolean hasQueue = checkAutoCreateQueue(address, isTemporary);
- if (hasQueue) {
- existingQueuesCache[index] = physicalName;
- }
- return hasQueue;
- }
-
- private boolean checkAutoCreateQueue(SimpleString queueName, boolean
isTemporary) throws Exception {
- return checkAutoCreateQueue(queueName, isTemporary, null);
+ checkAutoCreate(address, isTemporary, null, true);
+ existingQueuesCache[index] = physicalName;
}
- private boolean checkAutoCreateQueue(SimpleString queueName, boolean
isTemporary, String filter) throws Exception {
-
- boolean hasQueue = true;
- if (!connection.containsKnownDestination(queueName)) {
- RoutingType routingTypeToUse = RoutingType.ANYCAST;
- if (CompositeAddress.isFullyQualified(queueName.toString())) {
- SimpleString addressToUse =
CompositeAddress.extractAddressName(queueName);
+ private void checkAutoCreate(SimpleString destinationName, boolean
isTemporary, String filter, boolean isQueue) throws Exception {
+ if (isQueue && connection.containsKnownDestination(destinationName)) {
+ return;
+ } else {
+ RoutingType routingTypeToUse;
+ if (CompositeAddress.isFullyQualified(destinationName.toString())) {
+ SimpleString addressToUse =
CompositeAddress.extractAddressName(destinationName);
AddressInfo addressInfo = server.getAddressInfo(addressToUse);
if (addressInfo != null) {
routingTypeToUse = addressInfo.getRoutingType();
@@ -243,18 +238,22 @@ public class AMQSession implements SessionCallback {
AddressSettings as =
server.getAddressSettingsRepository().getMatch(addressToUse.toString());
routingTypeToUse = as.getDefaultAddressRoutingType();
}
+ } else if (isQueue) {
+ routingTypeToUse = RoutingType.ANYCAST;
+ } else {
+ routingTypeToUse = RoutingType.MULTICAST;
}
- AutoCreateResult autoCreateResult =
coreSession.checkAutoCreate(QueueConfiguration.of(queueName)
-
.setAddress(queueName)
+ AutoCreateResult autoCreateResult =
coreSession.checkAutoCreate(QueueConfiguration.of(destinationName)
+
.setAddress(destinationName)
.setRoutingType(routingTypeToUse)
.setTemporary(isTemporary)
.setFilterString(filter));
if (autoCreateResult == AutoCreateResult.NOT_FOUND) {
- throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
+ throw new InvalidDestinationException("Destination '" +
destinationName + "' doesn't exist and auto-creation is disabled.");
+ } else if (isQueue) {
+ connection.addKnownDestination(destinationName);
}
- connection.addKnownDestination(queueName);
}
- return hasQueue;
}
public void start() {
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 074e729dd0..0250c5ca79 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -33,6 +33,9 @@ public class OpenWireUtil {
public static final String SELECTOR_AWARE_OPTION = "selectorAware";
public static String extractFilterStringOrNull(final ConsumerInfo info,
final ActiveMQDestination openWireDest) {
+ if (openWireDest.isTopic()) {
+ return info.getSelector();
+ }
if (info.getSelector() != null) {
if (openWireDest.getOptions() != null) {
if
(Boolean.valueOf(openWireDest.getOptions().get(SELECTOR_AWARE_OPTION))) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
index 302e7d1e40..ebdaf9f8d1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
@@ -181,7 +181,7 @@ public class AmqpMessageRoutingTest extends
JMSClientTestSupport {
@Test
@Timeout(60)
public void testAMQPRouteMessageToJMSOpenWire() throws Throwable {
- testAMQPRouteMessageToJMS(createOpenWireConnection());
+
testAMQPRouteMessageToJMS(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(),
null, null, null, true, false));
}
@Test
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index 12c8d7ce44..f37e86f464 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -259,9 +259,17 @@ public abstract class JMSClientTestSupport extends
AmqpClientTestSupport {
return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(),
username, password, null, true);
}
- private Connection createOpenWireConnection(String connectionString, String
username, String password, String clientId, boolean start) throws JMSException {
+ protected Connection createOpenWireConnection(String connectionString,
String username, String password, String clientId, boolean start) throws
JMSException {
+ return createOpenWireConnection(connectionString, username, password,
clientId, start, true);
+ }
+
+ protected Connection createOpenWireConnection(String connectionString,
String username, String password, String clientId, boolean start, boolean
watchAdvisories) throws JMSException {
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(connectionString);
+ if (!watchAdvisories) {
+ factory.setWatchTopicAdvisories(false);
+ }
+
Connection connection =
trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(exception ->
exception.printStackTrace());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java
index 68cd847bb0..6a65a14114 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java
@@ -44,6 +44,10 @@ public class JMSMismatchedRoutingTypeTest extends
MultiprotocolJMSClientTestSupp
protected final String ANYCAST_ADDRESS = RandomUtil.randomUUIDString();
protected final String MULTICAST_ADDRESS = RandomUtil.randomUUIDString();
+ protected ConnectionSupplier OpenWireConnection = () -> {
+ return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(),
null, null, null, true, false);
+ };
+
@Override
protected boolean isAutoCreateAddresses() {
return false;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberWithSecurityTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberWithSecurityTest.java
new file mode 100644
index 0000000000..e07bf27333
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberWithSecurityTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class JMSTopicSubscriberWithSecurityTest extends
MultiprotocolJMSClientTestSupport {
+
+ private SecureConnectionSupplier connectionSupplier;
+ private static final String USER = "USER";
+ private static final String PASS = "PASS";
+ private static final String ROLE = "ROLE";
+
+ @Parameters(name = "protocol={0}")
+ public static Collection getParameters() {
+ return Arrays.asList(new Object[][]{
+ {Protocol.AMQP},
+ {Protocol.CORE},
+ {Protocol.OPENWIRE}
+ });
+ }
+
+ public JMSTopicSubscriberWithSecurityTest(Protocol protocol) {
+ switch (protocol) {
+ case AMQP -> this.connectionSupplier = (username, password) ->
createConnection(username, password);
+ case CORE -> this.connectionSupplier = (username, password) ->
createCoreConnection(username, password);
+ case OPENWIRE -> this.connectionSupplier = (username, password) ->
createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), username,
password, null, true, false);
+ }
+ }
+
+ @Override
+ protected boolean isAutoCreateAddresses() {
+ return false;
+ }
+
+ @Override
+ protected boolean isSecurityEnabled() {
+ return true;
+ }
+
+ @Override
+ protected void enableSecurity(ActiveMQServer server, String...
securityMatches) {
+ super.enableSecurity(server);
+
+ // add a new user/role who can only create non-durable queues (i.e.,
non-durable JMS subscriptions) and consume from them
+ ActiveMQJAASSecurityManager securityManager =
(ActiveMQJAASSecurityManager) server.getSecurityManager();
+ securityManager.getConfiguration().addUser(USER, PASS);
+ securityManager.getConfiguration().addRole(USER, ROLE);
+ HierarchicalRepository<Set<Role>> securityRepository =
server.getSecurityRepository();
+ Set<Role> value = securityRepository.getMatch(getTopicName());
+ value.add(new Role(ROLE, false, true, false, false, true, false, false,
false, false, false));
+ }
+
+ @TestTemplate
+ public void testCreateConsumer() throws Throwable {
+ Connection connection = connectionSupplier.createConnection(USER, PASS);
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ assertThrows(JMSException.class, () ->
session.createConsumer(session.createTopic(getTopicName())));
+ assertNull(server.getAddressInfo(SimpleString.of(getTopicName())));
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
index 0c315c469f..812bc247e3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
@@ -477,8 +477,19 @@ public abstract class MultiprotocolJMSClientTestSupport
extends ActiveMQTestBase
String password,
String clientId,
boolean start) throws
JMSException {
- ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(connectionString);
+ return createOpenWireConnection(connectionString, username, password,
clientId, start, true);
+ }
+ protected Connection createOpenWireConnection(String connectionString,
+ String username,
+ String password,
+ String clientId,
+ boolean start,
+ boolean watchAdvisories)
throws JMSException {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(connectionString);
+ if (!watchAdvisories) {
+ factory.setWatchTopicAdvisories(false);
+ }
Connection connection =
trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(exception ->
exception.printStackTrace());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
index 7125009c38..2c48e24f20 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
@@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -63,7 +62,8 @@ public class OpenWireFlowControlFailTest extends
OpenWireTestBase {
server.createQueue(QueueConfiguration.of(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST));
String textBody = " ".repeat(10);
- ConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(urlString);
+ factory.setWatchTopicAdvisories(false);
int numberOfMessage = 0;
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -93,6 +93,7 @@ public class OpenWireFlowControlFailTest extends
OpenWireTestBase {
}
factory = new ActiveMQConnectionFactory(urlString);
+ factory.setWatchTopicAdvisories(false);
try (Connection connection2 = factory.createConnection()) {
Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue =
session2.createQueue(addressInfo.getName().toString());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java
index 8f803cbc52..7b0b8b8827 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -125,32 +126,32 @@ public class MessagesExpiredPagingTest extends
ActiveMQTestBase {
@Test
public void testSendReceiveCORELarge() throws Exception {
- testSendReceive("CORE", 50, 20, 10, 500 * 1024);
+ testSendReceive(CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616"), 50, 20, 10, 500 * 1024);
}
@Test
public void testSendReceiveCORE() throws Exception {
- testSendReceive("CORE", 5000, 1000, 100, 0);
+ testSendReceive(CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616"), 5000, 1000, 100, 0);
}
@Test
public void testSendReceiveAMQP() throws Exception {
- testSendReceive("AMQP", 5000, 1000, 100, 0);
+ testSendReceive(CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:61616"), 5000, 1000, 100, 0);
}
@Test
public void testSendReceiveAMQPLarge() throws Exception {
- testSendReceive("AMQP", 50, 20, 10, 500 * 1024);
+ testSendReceive(CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:61616"), 50, 20, 10, 500 * 1024);
}
@Test
public void testSendReceiveOpenWire() throws Exception {
- testSendReceive("OPENWIRE", 5000, 1000, 100, 0);
+ ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory)
CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616");
+ cf.setWatchTopicAdvisories(false);
+ testSendReceive(cf, 5000, 1000, 100, 0);
}
- public void testSendReceive(String protocol, int numberOfMessages, int
numberOfMessageSecondWave, int pagingInterval, int bodySize) throws Exception {
- ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
-
+ public void testSendReceive(ConnectionFactory factory, int
numberOfMessages, int numberOfMessageSecondWave, int pagingInterval, int
bodySize) throws Exception {
String extraBody = "*".repeat(bodySize);
Consumer[] consumers = new Consumer[NUMBER_OF_QUEUES];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]