This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b9b4dfdeaf ARTEMIS-5956 Connection.close may leave leaked temporary 
destinations
b9b4dfdeaf is described below

commit b9b4dfdeafccc9af724cbad31f8309e787f3899f
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 18 14:46:12 2026 -0400

    ARTEMIS-5956 Connection.close may leave leaked temporary destinations
    
    co-author: Done in collaboration with Tim Bish
---
 ...va => ActiveMQAddressHasBindingsException.java} | 14 ++---
 .../api/core/ActiveMQDeleteAddressException.java   |  2 +-
 .../artemis/core/server/ActiveMQMessageBundle.java |  4 +-
 .../artemis/core/server/ActiveMQServer.java        |  5 ++
 .../core/server/impl/ActiveMQServerImpl.java       |  9 +++
 .../core/server/impl/ServerSessionImpl.java        | 36 +++++++++++-
 .../server/impl/TransientQueueManagerImpl.java     |  3 +-
 .../amqp/JMSTemporaryDestinationTest.java          | 67 ++++++++++++++++++++++
 8 files changed, 126 insertions(+), 14 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java
similarity index 70%
copy from 
artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
copy to 
artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java
index 9c8030649c..a023b68c3a 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java
@@ -14,18 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.activemq.artemis.api.core;
 
-/**
- * An operation failed because an address exists on the server.
- */
-public final class ActiveMQDeleteAddressException extends ActiveMQException {
+public class ActiveMQAddressHasBindingsException extends 
ActiveMQDeleteAddressException {
 
-   public ActiveMQDeleteAddressException() {
-      super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR);
+   public ActiveMQAddressHasBindingsException() {
+      super();
    }
 
-   public ActiveMQDeleteAddressException(String msg) {
-      super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR, msg);
+   public ActiveMQAddressHasBindingsException(String msg) {
+      super(msg);
    }
 }
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
index 9c8030649c..c3dfcfccbb 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.api.core;
 /**
  * An operation failed because an address exists on the server.
  */
-public final class ActiveMQDeleteAddressException extends ActiveMQException {
+public  class ActiveMQDeleteAddressException extends ActiveMQException {
 
    public ActiveMQDeleteAddressException() {
       super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index cbe1417f9d..02b532341d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -21,9 +21,9 @@ import java.util.Set;
 import 
org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
+import 
org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException;
 import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
-import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
 import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQDivertDoesNotExistException;
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
@@ -375,7 +375,7 @@ public interface ActiveMQMessageBundle {
    ActiveMQAddressExistsException addressAlreadyExists(SimpleString address);
 
    @Message(id = 229205, value = "Address {} has bindings")
-   ActiveMQDeleteAddressException addressHasBindings(SimpleString address);
+   ActiveMQAddressHasBindingsException addressHasBindings(SimpleString 
address);
 
    @Message(id = 229206, value = "Queue {} has invalid max consumer setting: 
{}")
    IllegalArgumentException invalidMaxConsumers(String queueName, int value);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 3158483591..0cf0bff50c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -162,6 +162,11 @@ public interface ActiveMQServer extends ServiceComponent {
 
    StorageManager getStorageManager();
 
+   /**
+    * The executor responsible to remove temporary destinations.
+    * */
+   Executor getTransientQueueExecutor();
+
    PagingManager getPagingManager();
 
    PagingManager createPagingManager() throws Exception;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index cf56456fc3..875597cd36 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -290,6 +290,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    protected volatile ExecutorFactory executorFactory;
 
+   protected volatile Executor transientQueueExecutor;
+
    private volatile ExecutorService ioExecutorPool;
 
    private ReplayManager replayManager;
@@ -1776,6 +1778,11 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
       return storageManager;
    }
 
+   @Override
+   public Executor getTransientQueueExecutor() {
+      return transientQueueExecutor;
+   }
+
    @Override
    public ActiveMQSecurityManager getSecurityManager() {
       return securityManager;
@@ -3271,6 +3278,8 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
       }
       this.executorFactory = new OrderedExecutorFactory(threadPool);
 
+      this.transientQueueExecutor = executorFactory.getExecutor();
+
       if (serviceRegistry.getIOExecutorService() == null) {
          this.ioExecutorPool = new ActiveMQThreadPoolExecutor(0, maxIoThreads, 
THREAD_POOL_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, getThreadFactory("io"));
       } else {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 6931d08773..4ec39ed1bf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import 
org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
+import 
org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -814,7 +815,7 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
       // not mean it will get deleted automatically when the session is 
closed. It is up to the user to delete the
       // resource when finished with it
 
-      TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, 
name);
+      TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server, 
name, sessionExecutor);
       if (remotingConnection instanceof TempResourceObserver observer) {
          cleaner.setObserver(observer);
       }
@@ -1163,15 +1164,22 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
 
    public static class TempResourceCleanerUpper implements CloseListener, 
FailureListener {
 
+      private int retry = 0;
+
+      private final int MAX_RETRY = 5;
+
       private final SimpleString resourceName;
 
       private final ActiveMQServer server;
 
       private TempResourceObserver observer;
 
-      public TempResourceCleanerUpper(final ActiveMQServer server, final 
SimpleString resourceName) {
+      private Executor sessionExecutor;
+
+      public TempResourceCleanerUpper(final ActiveMQServer server, final 
SimpleString resourceName, Executor sessionExecutor) {
          this.server = server;
          this.resourceName = resourceName;
+         this.sessionExecutor = sessionExecutor;
       }
 
       public void setObserver(TempResourceObserver observer) {
@@ -1179,10 +1187,20 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
       }
 
       private void run() {
+         sessionExecutor.execute(() -> {
+            // this needs to use the same executor as TransientQueueManagerImpl
+            // even though we retry failed executions
+            // we still use the same executor as the TransientQueueManagerImpl 
to minimize the number of retries
+            server.getTransientQueueExecutor().execute(this::done);
+         });
+      }
+
+      private void done() {
          try {
             logger.debug("deleting temporary resource {}", resourceName);
             try {
                Queue q = server.locateQueue(resourceName);
+               logger.debug("deleting queue {}", resourceName);
                if (q != null && q.isTemporary()) {
                   AddressInfo a = server.getAddressInfo(q.getAddress());
                   server.destroyQueue(resourceName, null, false, false, a == 
null || a.isTemporary());
@@ -1196,12 +1214,26 @@ public class ServerSessionImpl extends 
CriticalComponentImpl implements ServerSe
             }
             try {
                AddressInfo a = server.getAddressInfo(resourceName);
+               logger.debug("deleting address with resource={}, address={}", 
resourceName, a);
                if (a != null && a.isTemporary()) {
                   server.removeAddressInfo(resourceName, null);
                   if (observer != null) {
                      observer.tempAddressDeleted(resourceName);
                   }
                }
+            } catch (ActiveMQAddressHasBindingsException e) {
+               // in a scenario where the consumer on a temporary and 
connection is being closed as part of the same event
+               // we could get on a situation where the remove of the queue is 
already scheduled in the executors
+               // but have not yet reached.
+               // It is not possible to serialize the calls on  
org.apache.activemq.artemis.core.server.impl.TransientQueueManagerImpl
+               // as that could lead to starvations and deadlocks.
+               // for that reason we can only retry in the executor's line
+               if (retry++ < MAX_RETRY) {
+                  logger.debug("retrying deleteResource {}, retry={}", 
resourceName, retry);
+                  TempResourceCleanerUpper.this.run();
+               } else {
+                  logger.warn(e.getMessage(), e);
+               }
             } catch (ActiveMQException e) {
                // that's fine.. it can happen due to resource already been 
deleted
                logger.debug(e.getMessage(), e);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
index 1d8118311f..df480c2e55 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
@@ -52,7 +52,8 @@ public class TransientQueueManagerImpl extends 
ReferenceCounterUtil implements T
    }
 
    public TransientQueueManagerImpl(ActiveMQServer server, SimpleString 
queueName) {
-      super(server.getExecutorFactory().getExecutor());
+      // We have to use the same executor between here and 
ServerSessionImpl::TempResourceCleanerUpper
+      super(server.getTransientQueueExecutor());
 
       this.server = server;
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
index 362d6fc32a..06bfc6d100 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -24,6 +25,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -137,4 +139,69 @@ public class JMSTemporaryDestinationTest extends 
JMSClientTestSupport {
          connection.close();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void testTemporaryTopicDeletedOnConnectionClosed() throws Exception {
+      doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testTemporaryQueueDeletedOnConnectionClosed() throws Exception {
+      doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testTemporaryTopicDeletedOnConnectionClosedWithoutExplicitConsumerClose() 
throws Exception {
+      doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testTemporaryQueueDeletedOnConnectionClosedWithoutExplicitConsumerClose() 
throws Exception {
+      doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, false);
+   }
+
+   private void doTestTemporaryDestinationIsDeletedOnConnectionClosed(boolean 
topic, boolean closeConsumer) throws Exception {
+      final String addressName;
+      try (Connection connection = createConnection()) {
+         final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         final Destination destination;
+
+         if (topic) {
+            destination = session.createTemporaryTopic();
+
+            assertNotNull(destination);
+            assertTrue(destination instanceof TemporaryTopic);
+
+            addressName = ((TemporaryTopic) destination).getTopicName();
+         } else {
+            destination = session.createTemporaryQueue();
+
+            assertNotNull(destination);
+            assertTrue(destination instanceof TemporaryQueue);
+
+            addressName = ((TemporaryQueue) destination).getQueueName();
+         }
+
+         logger.debug("Address being used is {}", addressName);
+
+         final MessageConsumer consumer = session.createConsumer(destination);
+
+         final AddressInfo addressView = getProxyToAddress(addressName);
+         assertNotNull(addressView);
+
+         assertEquals(1, 
server.bindingQuery(addressView.getName()).getQueueNames().size());
+
+         if (closeConsumer) {
+            consumer.close();
+         }
+
+      }
+      Wait.assertNull(() -> getProxyToAddress(addressName), 
TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50));
+   }
+
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to