This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c844030eeb ARTEMIS-5987: Properly handling exceptions with single
connection.
c844030eeb is described below
commit c844030eeb2fd76be417313fe61bfb8ad47968f6
Author: Emmanuel Hugonnet <[email protected]>
AuthorDate: Mon May 4 17:25:46 2026 +0200
ARTEMIS-5987: Properly handling exceptions with single connection.
If there is an issue while creating a session in single connection mode
then the connection factory can be in a bad state and the loop for
creating sessions still continues over this.
Issue: https://issues.apache.org/jira/browse/ARTEMIS-5987
Signed-off-by: Emmanuel Hugonnet <[email protected]>
---
.../artemis/ra/inflow/ActiveMQActivation.java | 13 +-
...ActiveMQMessageHandlerSingleConnectionTest.java | 226 +++++++++++++++++++++
2 files changed, 235 insertions(+), 4 deletions(-)
diff --git
a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index 3639c5ac99..26b2250659 100644
---
a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -244,7 +244,7 @@ public class ActiveMQActivation {
ClientSessionFactory cf = null;
for (int i = 0; i < spec.getMaxSession(); i++) {
- //if we are sharing the ceonnection only create 1
+ //if we are sharing the connection only create 1
if (!spec.isSingleConnection()) {
cf = null;
}
@@ -259,10 +259,9 @@ public class ActiveMQActivation {
handler.setup();
handlers.add(handler);
} catch (Exception e) {
+ logger.trace("Failed to setup session {} for activation {}", i,
spec, e);
if (cf != null) {
- if (!spec.isSingleConnection()) {
- cf.close();
- }
+ cf.close();
}
if (session != null) {
session.close();
@@ -270,6 +269,12 @@ public class ActiveMQActivation {
if (firstException == null) {
firstException = e;
}
+ if (spec.isSingleConnection()) {
+ // The shared ClientSessionFactory is in a broken state; stop
the loop
+ // all remaining sessions would fail with "ClientSession closed
while
+ // creating session", masking the real error.
+ break;
+ }
}
}
//if we have any exceptions close all the handlers and throw the first
exception.
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerSingleConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerSingleConnectionTest.java
new file mode 100644
index 0000000000..45c9b3d042
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerSingleConnectionTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.ra;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
+import
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for single connection mode exception handling in ActiveMQActivation.
+ * Related to ARTEMIS-5987: Properly handling exceptions with single
connection.
+ */
+public class ActiveMQMessageHandlerSingleConnectionTest extends
ActiveMQRATestBase {
+
+ @Override
+ public boolean useSecurity() {
+ return false;
+ }
+
+ /**
+ * Test that when using single connection mode and session creation fails
because the server is
+ * down, the activation completes without hanging and leaves no active
consumers behind.
+ *
+ * Without the fix, the loop would continue after the shared
ClientSessionFactory entered a broken
+ * state, causing unnecessary retries against a dead factory instead of
breaking early.
+ */
+ @Test
+ public void testSingleConnectionFailsCleanlyWhenServerIsDown() throws
Exception {
+ server.stop();
+
+ ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+
+ ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ spec.setMaxSession(5);
+ spec.setSingleConnection(true);
+ spec.setSetupAttempts(1);
+ spec.setSetupInterval(0L);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+
+ // endpointActivation must complete without hanging even though the
server is down.
+ // Setup failures are handled asynchronously and do not propagate
through endpointActivation.
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+ // Restart the server so we can send messages and verify no consumers
are active.
+ server.start();
+
+ try (ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession()) {
+ session.start();
+ ClientProducer producer = session.createProducer(MDBQUEUEPREFIXED);
+ for (int i = 0; i < 3; i++) {
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("test-message-" + i);
+ producer.send(message);
+ }
+ }
+
+ // No handlers should be active: the activation failed when the server
was down and all
+ // reconnect attempts (setupAttempts=1) have been exhausted before the
server came back.
+ assertFalse(latch.await(2, TimeUnit.SECONDS),
+ "Messages should not be consumed — activation failed before
server came back up");
+
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ qResourceAdapter.stop();
+ }
+
+ /**
+ * Test that when using single connection mode and a mid-loop failure
occurs, the loop breaks
+ * immediately instead of continuing to create sessions on a shared factory
that is now broken.
+ *
+ * A queue with maxConsumers=2 triggers a genuine server-enforced failure
inside handler.setup()
+ * when the 3rd session tries to subscribe. The fix ensures the loop breaks
at that point,
+ * producing exactly maxConsumers+1 CreateSession packets (2 successful + 1
failing). Without
+ * the fix the loop runs all maxSession iterations, producing 5 packets and
masking the original
+ * error with secondary "factory closed" failures in the logs.
+ *
+ * setupAttempts=0 is used so setup() runs exactly once, keeping the packet
count precise.
+ */
+ @Test
+ public void testSingleConnectionMidLoopFailureBreaksEarly() throws
Exception {
+ AtomicInteger sessionCreateCount = new AtomicInteger(0);
+ Interceptor countingInterceptor = (packet, connection) -> {
+ if (packet instanceof CreateSessionMessage) {
+ sessionCreateCount.incrementAndGet();
+ }
+ return true;
+ };
+ server.getRemotingService().addIncomingInterceptor(countingInterceptor);
+
+ String limitedQueue = "limited-consumers-queue";
+ server.createQueue(QueueConfiguration.of(limitedQueue)
+ .setAddress(limitedQueue)
+ .setRoutingType(RoutingType.ANYCAST)
+ .setMaxConsumers(2)
+ .setAutoCreated(false)
+ .setAutoDelete(false));
+
+ ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+
+ ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(limitedQueue);
+ spec.setMaxSession(5);
+ spec.setSingleConnection(true);
+ spec.setSetupAttempts(0); // no retries — exactly one setup() call,
keeping the count exact
+ spec.setSetupInterval(0L);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+
server.getRemotingService().removeIncomingInterceptor(countingInterceptor);
+
+ // With the fix the loop breaks after session index 2 (the first
failure):
+ // session 0 → CreateSession #1, consumer created OK
+ // session 1 → CreateSession #2, consumer created OK
+ // session 2 → CreateSession #3, consumer rejected (maxConsumers=2) →
cf.close() + break
+ // Without the fix the loop continues through all 5 iterations, sending
5 CreateSession
+ // packets even though sessions 2-4 will all fail at consumer creation.
+ assertEquals(3, sessionCreateCount.get(),
+ "Expected 3 CreateSession packets (maxConsumers+1); without
the fix all 5 are sent");
+
+ // Verify all-or-nothing teardown: handlers 0 and 1 must also be torn
down.
+ Thread.sleep(500);
+ try (ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession verifySession = sf.createSession()) {
+ QueueQuery queueQuery =
verifySession.queueQuery(SimpleString.of(limitedQueue));
+ assertEquals(0, queueQuery.getConsumerCount(),
+ "Queue should have 0 active consumers — all handlers
torn down after mid-loop failure");
+ }
+
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ qResourceAdapter.stop();
+ server.destroyQueue(SimpleString.of(limitedQueue));
+ }
+
+ /**
+ * Test that single connection mode works correctly under normal operation
— all sessions
+ * share one underlying connection and messages are delivered.
+ */
+ @Test
+ public void testSingleConnectionNormalOperation() throws Exception {
+ ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+
+ ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+ spec.setMaxSession(3);
+ spec.setSingleConnection(true);
+
+ CountDownLatch latch = new CountDownLatch(3);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new
DummyMessageEndpointFactory(endpoint, false);
+
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+ try (ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession()) {
+ ClientProducer producer = session.createProducer(MDBQUEUEPREFIXED);
+ for (int i = 0; i < 3; i++) {
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("test-message-" + i);
+ producer.send(message);
+ }
+ }
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS), "All 3 messages should be
received within 5s");
+ assertNotNull(endpoint.lastMessage, "At least one message should have
been received");
+
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ qResourceAdapter.stop();
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]