This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b9b4dfdeaf ARTEMIS-5956 Connection.close may leave leaked temporary
destinations
b9b4dfdeaf is described below
commit b9b4dfdeafccc9af724cbad31f8309e787f3899f
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 18 14:46:12 2026 -0400
ARTEMIS-5956 Connection.close may leave leaked temporary destinations
co-author: Done in collaboration with Tim Bish
---
...va => ActiveMQAddressHasBindingsException.java} | 14 ++---
.../api/core/ActiveMQDeleteAddressException.java | 2 +-
.../artemis/core/server/ActiveMQMessageBundle.java | 4 +-
.../artemis/core/server/ActiveMQServer.java | 5 ++
.../core/server/impl/ActiveMQServerImpl.java | 9 +++
.../core/server/impl/ServerSessionImpl.java | 36 +++++++++++-
.../server/impl/TransientQueueManagerImpl.java | 3 +-
.../amqp/JMSTemporaryDestinationTest.java | 67 ++++++++++++++++++++++
8 files changed, 126 insertions(+), 14 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java
similarity index 70%
copy from
artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
copy to
artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java
index 9c8030649c..a023b68c3a 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressHasBindingsException.java
@@ -14,18 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.activemq.artemis.api.core;
-/**
- * An operation failed because an address exists on the server.
- */
-public final class ActiveMQDeleteAddressException extends ActiveMQException {
+public class ActiveMQAddressHasBindingsException extends
ActiveMQDeleteAddressException {
- public ActiveMQDeleteAddressException() {
- super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR);
+ public ActiveMQAddressHasBindingsException() {
+ super();
}
- public ActiveMQDeleteAddressException(String msg) {
- super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR, msg);
+ public ActiveMQAddressHasBindingsException(String msg) {
+ super(msg);
}
}
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
index 9c8030649c..c3dfcfccbb 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.api.core;
/**
* An operation failed because an address exists on the server.
*/
-public final class ActiveMQDeleteAddressException extends ActiveMQException {
+public class ActiveMQDeleteAddressException extends ActiveMQException {
public ActiveMQDeleteAddressException() {
super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index cbe1417f9d..02b532341d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -21,9 +21,9 @@ import java.util.Set;
import
org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
+import
org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException;
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
import
org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
-import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import
org.apache.activemq.artemis.api.core.ActiveMQDivertDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
@@ -375,7 +375,7 @@ public interface ActiveMQMessageBundle {
ActiveMQAddressExistsException addressAlreadyExists(SimpleString address);
@Message(id = 229205, value = "Address {} has bindings")
- ActiveMQDeleteAddressException addressHasBindings(SimpleString address);
+ ActiveMQAddressHasBindingsException addressHasBindings(SimpleString
address);
@Message(id = 229206, value = "Queue {} has invalid max consumer setting:
{}")
IllegalArgumentException invalidMaxConsumers(String queueName, int value);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 3158483591..0cf0bff50c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -162,6 +162,11 @@ public interface ActiveMQServer extends ServiceComponent {
StorageManager getStorageManager();
+ /**
+ * The executor responsible to remove temporary destinations.
+ * */
+ Executor getTransientQueueExecutor();
+
PagingManager getPagingManager();
PagingManager createPagingManager() throws Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index cf56456fc3..875597cd36 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -290,6 +290,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected volatile ExecutorFactory executorFactory;
+ protected volatile Executor transientQueueExecutor;
+
private volatile ExecutorService ioExecutorPool;
private ReplayManager replayManager;
@@ -1776,6 +1778,11 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
return storageManager;
}
+ @Override
+ public Executor getTransientQueueExecutor() {
+ return transientQueueExecutor;
+ }
+
@Override
public ActiveMQSecurityManager getSecurityManager() {
return securityManager;
@@ -3271,6 +3278,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
}
this.executorFactory = new OrderedExecutorFactory(threadPool);
+ this.transientQueueExecutor = executorFactory.getExecutor();
+
if (serviceRegistry.getIOExecutorService() == null) {
this.ioExecutorPool = new ActiveMQThreadPoolExecutor(0, maxIoThreads,
THREAD_POOL_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, getThreadFactory("io"));
} else {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 6931d08773..4ec39ed1bf 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import
org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
+import
org.apache.activemq.artemis.api.core.ActiveMQAddressHasBindingsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -814,7 +815,7 @@ public class ServerSessionImpl extends
CriticalComponentImpl implements ServerSe
// not mean it will get deleted automatically when the session is
closed. It is up to the user to delete the
// resource when finished with it
- TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server,
name);
+ TempResourceCleanerUpper cleaner = new TempResourceCleanerUpper(server,
name, sessionExecutor);
if (remotingConnection instanceof TempResourceObserver observer) {
cleaner.setObserver(observer);
}
@@ -1163,15 +1164,22 @@ public class ServerSessionImpl extends
CriticalComponentImpl implements ServerSe
public static class TempResourceCleanerUpper implements CloseListener,
FailureListener {
+ private int retry = 0;
+
+ private final int MAX_RETRY = 5;
+
private final SimpleString resourceName;
private final ActiveMQServer server;
private TempResourceObserver observer;
- public TempResourceCleanerUpper(final ActiveMQServer server, final
SimpleString resourceName) {
+ private Executor sessionExecutor;
+
+ public TempResourceCleanerUpper(final ActiveMQServer server, final
SimpleString resourceName, Executor sessionExecutor) {
this.server = server;
this.resourceName = resourceName;
+ this.sessionExecutor = sessionExecutor;
}
public void setObserver(TempResourceObserver observer) {
@@ -1179,10 +1187,20 @@ public class ServerSessionImpl extends
CriticalComponentImpl implements ServerSe
}
private void run() {
+ sessionExecutor.execute(() -> {
+ // this needs to use the same executor as TransientQueueManagerImpl
+ // even though we retry failed executions
+ // we still use the same executor as the TransientQueueManagerImpl
to minimize the number of retries
+ server.getTransientQueueExecutor().execute(this::done);
+ });
+ }
+
+ private void done() {
try {
logger.debug("deleting temporary resource {}", resourceName);
try {
Queue q = server.locateQueue(resourceName);
+ logger.debug("deleting queue {}", resourceName);
if (q != null && q.isTemporary()) {
AddressInfo a = server.getAddressInfo(q.getAddress());
server.destroyQueue(resourceName, null, false, false, a ==
null || a.isTemporary());
@@ -1196,12 +1214,26 @@ public class ServerSessionImpl extends
CriticalComponentImpl implements ServerSe
}
try {
AddressInfo a = server.getAddressInfo(resourceName);
+ logger.debug("deleting address with resource={}, address={}",
resourceName, a);
if (a != null && a.isTemporary()) {
server.removeAddressInfo(resourceName, null);
if (observer != null) {
observer.tempAddressDeleted(resourceName);
}
}
+ } catch (ActiveMQAddressHasBindingsException e) {
+ // in a scenario where the consumer on a temporary and
connection is being closed as part of the same event
+ // we could get on a situation where the remove of the queue is
already scheduled in the executors
+ // but have not yet reached.
+ // It is not possible to serialize the calls on
org.apache.activemq.artemis.core.server.impl.TransientQueueManagerImpl
+ // as that could lead to starvations and deadlocks.
+ // for that reason we can only retry in the executor's line
+ if (retry++ < MAX_RETRY) {
+ logger.debug("retrying deleteResource {}, retry={}",
resourceName, retry);
+ TempResourceCleanerUpper.this.run();
+ } else {
+ logger.warn(e.getMessage(), e);
+ }
} catch (ActiveMQException e) {
// that's fine.. it can happen due to resource already been
deleted
logger.debug(e.getMessage(), e);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
index 1d8118311f..df480c2e55 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
@@ -52,7 +52,8 @@ public class TransientQueueManagerImpl extends
ReferenceCounterUtil implements T
}
public TransientQueueManagerImpl(ActiveMQServer server, SimpleString
queueName) {
- super(server.getExecutorFactory().getExecutor());
+ // We have to use the same executor between here and
ServerSessionImpl::TempResourceCleanerUpper
+ super(server.getTransientQueueExecutor());
this.server = server;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
index 362d6fc32a..06bfc6d100 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -24,6 +25,7 @@ import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -137,4 +139,69 @@ public class JMSTemporaryDestinationTest extends
JMSClientTestSupport {
connection.close();
}
}
+
+ @Test
+ @Timeout(20)
+ public void testTemporaryTopicDeletedOnConnectionClosed() throws Exception {
+ doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, true);
+ }
+
+ @Test
+ @Timeout(20)
+ public void testTemporaryQueueDeletedOnConnectionClosed() throws Exception {
+ doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, true);
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testTemporaryTopicDeletedOnConnectionClosedWithoutExplicitConsumerClose()
throws Exception {
+ doTestTemporaryDestinationIsDeletedOnConnectionClosed(true, false);
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testTemporaryQueueDeletedOnConnectionClosedWithoutExplicitConsumerClose()
throws Exception {
+ doTestTemporaryDestinationIsDeletedOnConnectionClosed(false, false);
+ }
+
+ private void doTestTemporaryDestinationIsDeletedOnConnectionClosed(boolean
topic, boolean closeConsumer) throws Exception {
+ final String addressName;
+ try (Connection connection = createConnection()) {
+ final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ final Destination destination;
+
+ if (topic) {
+ destination = session.createTemporaryTopic();
+
+ assertNotNull(destination);
+ assertTrue(destination instanceof TemporaryTopic);
+
+ addressName = ((TemporaryTopic) destination).getTopicName();
+ } else {
+ destination = session.createTemporaryQueue();
+
+ assertNotNull(destination);
+ assertTrue(destination instanceof TemporaryQueue);
+
+ addressName = ((TemporaryQueue) destination).getQueueName();
+ }
+
+ logger.debug("Address being used is {}", addressName);
+
+ final MessageConsumer consumer = session.createConsumer(destination);
+
+ final AddressInfo addressView = getProxyToAddress(addressName);
+ assertNotNull(addressView);
+
+ assertEquals(1,
server.bindingQuery(addressView.getName()).getQueueNames().size());
+
+ if (closeConsumer) {
+ consumer.close();
+ }
+
+ }
+ Wait.assertNull(() -> getProxyToAddress(addressName),
TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50));
+ }
+
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]