This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 70c38182b5 ARTEMIS-5961 Ensure control link temporary address cleaned 
up
70c38182b5 is described below

commit 70c38182b55dc1e8e827fbbf1f40792b88452769
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Mar 19 18:25:58 2026 -0400

    ARTEMIS-5961 Ensure control link temporary address cleaned up
    
    Add a more proactive cleanup stage in the connection interrupted handling to
    remove the federation control link temporary address and binding to avoid 
some
    logging in tests that warn it couldn't be removed because of a binding still
    remaining.
---
 .../connect/federation/AMQPFederationSource.java   | 16 +++-
 .../amqp/connect/AMQPFederationConnectTest.java    | 96 ++++++++++++++++++++++
 .../connect/AMQPFederationServerToServerTest.java  |  7 +-
 3 files changed, 114 insertions(+), 5 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
index 14e4caa7e9..9b34b08879 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
@@ -91,6 +91,7 @@ public class AMQPFederationSource extends AMQPFederation {
 
    private volatile AMQPFederationConfiguration configuration;
    private volatile AMQPFederationCapabilities capabilities;
+   private volatile AMQPFederationCommandDispatcher controlLink;
 
    /**
     * Creates a new AMQP Federation instance that will manage the state of a 
single AMQP broker federation instance
@@ -242,6 +243,14 @@ public class AMQPFederationSource extends AMQPFederation {
          eventProcessor = null;
       }
 
+      try {
+         controlLink.close(false);
+      } catch (Exception ex) {
+         errorCaught.compareAndExchange(null, ex);
+      } finally {
+         controlLink = null;
+      }
+
       connection = null;
       session = null;
 
@@ -678,9 +687,10 @@ public class AMQPFederationSource extends AMQPFederation {
                      throw new ActiveMQAMQPInternalErrorException("Error while 
configuring interal session metadata");
                   }
 
-                  final AMQPFederationCommandDispatcher commandLink = new 
AMQPFederationCommandDispatcher(sender, getServer(), session.getSessionSPI());
+                  controlLink = new AMQPFederationCommandDispatcher(sender, 
getServer(), session.getSessionSPI());
+
                   final ProtonServerSenderContext senderContext =
-                     new ProtonServerSenderContext(connection, sender, 
session, session.getSessionSPI(), commandLink);
+                     new ProtonServerSenderContext(connection, sender, 
session, session.getSessionSPI(), controlLink);
 
                   session.addSender(sender, senderContext);
 
@@ -689,7 +699,7 @@ public class AMQPFederationSource extends AMQPFederation {
 
                   // Setup events sender link to the target if there are any 
remote policies and
                   // then send those polices to start remote federation.
-                  asyncCreateTargetEventsSender(commandLink);
+                  asyncCreateTargetEventsSender(controlLink);
 
                   // Setup events receiver link from the target if there are 
any local policies
                   // and then start the policy managers to begin tracking 
local demand.
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index ad5207323b..4b6c83c429 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -374,6 +374,102 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void testFederationCleanUpControlLinkWhenBrokerConnectionDrops() 
throws Exception {
+      final String controlLinkAddress = "test-control-address";
+
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            .respond()
+                            .withTarget().withAddress(controlLinkAddress)
+                            .and()
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V1)
+                            
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+               new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(new 
AMQPFederatedBrokerConnectionElement(getTestName()));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final SimpleString controlLinkName = 
SimpleString.of(FEDERATION_BASE_VALIDATION_ADDRESS +
+                                                              "." + 
FEDERATION_CONTROL_LINK_PREFIX +
+                                                              "." + 
controlLinkAddress);
+
+         Wait.assertNotNull(() -> server.locateQueue(controlLinkName), 2_000, 
50);
+
+         peer.close();
+
+         Wait.assertNull(() -> server.locateQueue(controlLinkName), 2_000, 50);
+         Wait.assertFalse(() -> 
server.addressQuery(controlLinkName).isExists(), 2_000, 50);
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testFederationCleanUpControlLinkWhenBrokerConnectionStopped() 
throws Exception {
+      final String controlLinkAddress = "test-control-address";
+
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            .respond()
+                            .withTarget().withAddress(controlLinkAddress)
+                            .and()
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+               new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(new 
AMQPFederatedBrokerConnectionElement(getTestName()));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final SimpleString controlLinkName = 
SimpleString.of(FEDERATION_BASE_VALIDATION_ADDRESS +
+                                                              "." + 
FEDERATION_CONTROL_LINK_PREFIX +
+                                                              "." + 
controlLinkAddress);
+
+         Wait.assertNotNull(() -> server.locateQueue(controlLinkName), 2_000, 
50);
+
+         server.getBrokerConnections().forEach((connection) -> {
+            try {
+               connection.stop();
+            } catch (Exception e) {
+               fail("Broker connection shutdown should not have thrown an 
exception");
+            }
+         });
+
+         Wait.assertNull(() -> server.locateQueue(controlLinkName), 2_000, 50);
+         Wait.assertFalse(() -> 
server.addressQuery(controlLinkName).isExists(), 2_000, 50);
+
+         peer.close();
+      }
+   }
+
    @Test
    @Timeout(20)
    public void 
testFederationCreatesControlLinkAndClosesConnectionIfCapabilityIsAbsent() 
throws Exception {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index e5c7a378c5..82b48761d0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -2106,7 +2106,7 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
 
       final AMQPBrokerConnectConfiguration amqpConnection =
          new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT);
-      amqpConnection.setReconnectAttempts(10); // Limit reconnects
+      amqpConnection.setReconnectAttempts(20); // Limit reconnects
       amqpConnection.setRetryInterval(50);
       amqpConnection.addElement(element);
 
@@ -2143,6 +2143,8 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
       assertNotNull(received);
       received.reject(); // Terminal outcome should be DLQ'd
 
+      Wait.assertEquals(1L, () -> 
server.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(), 
5_000, 10);
+
       final ConnectionFactory factoryRemote = new JmsConnectionFactory(
          "amqp://localhost:" + SERVER_PORT_REMOTE + 
"?jms.prefetchPolicy.all=0");
 
@@ -2155,8 +2157,9 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
          final MessageConsumer consumer = sessionR.createConsumer(queue);
 
          Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 1_000);
+         Wait.assertEquals(1, () -> 
server.bindingQuery(SimpleString.of(getDeadLetterAddress())).getQueueNames().size(),
 5_000);
          Wait.assertTrue(() -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 
1_000);
-         Wait.assertEquals(1L, () -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(),
 5_000, 10);
+         Wait.assertEquals(1L, () -> 
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(),
 5_000);
 
          assertNotNull(consumer.receiveNoWait());
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to