This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 70c38182b5 ARTEMIS-5961 Ensure control link temporary address cleaned
up
70c38182b5 is described below
commit 70c38182b55dc1e8e827fbbf1f40792b88452769
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Mar 19 18:25:58 2026 -0400
ARTEMIS-5961 Ensure control link temporary address cleaned up
Add a more proactive cleanup stage in the connection interrupted handling to
remove the federation control link temporary address and binding to avoid
some
logging in tests that warn it couldn't be removed because of a binding still
remaining.
---
.../connect/federation/AMQPFederationSource.java | 16 +++-
.../amqp/connect/AMQPFederationConnectTest.java | 96 ++++++++++++++++++++++
.../connect/AMQPFederationServerToServerTest.java | 7 +-
3 files changed, 114 insertions(+), 5 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
index 14e4caa7e9..9b34b08879 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
@@ -91,6 +91,7 @@ public class AMQPFederationSource extends AMQPFederation {
private volatile AMQPFederationConfiguration configuration;
private volatile AMQPFederationCapabilities capabilities;
+ private volatile AMQPFederationCommandDispatcher controlLink;
/**
* Creates a new AMQP Federation instance that will manage the state of a
single AMQP broker federation instance
@@ -242,6 +243,14 @@ public class AMQPFederationSource extends AMQPFederation {
eventProcessor = null;
}
+ try {
+ controlLink.close(false);
+ } catch (Exception ex) {
+ errorCaught.compareAndExchange(null, ex);
+ } finally {
+ controlLink = null;
+ }
+
connection = null;
session = null;
@@ -678,9 +687,10 @@ public class AMQPFederationSource extends AMQPFederation {
throw new ActiveMQAMQPInternalErrorException("Error while
configuring interal session metadata");
}
- final AMQPFederationCommandDispatcher commandLink = new
AMQPFederationCommandDispatcher(sender, getServer(), session.getSessionSPI());
+ controlLink = new AMQPFederationCommandDispatcher(sender,
getServer(), session.getSessionSPI());
+
final ProtonServerSenderContext senderContext =
- new ProtonServerSenderContext(connection, sender,
session, session.getSessionSPI(), commandLink);
+ new ProtonServerSenderContext(connection, sender,
session, session.getSessionSPI(), controlLink);
session.addSender(sender, senderContext);
@@ -689,7 +699,7 @@ public class AMQPFederationSource extends AMQPFederation {
// Setup events sender link to the target if there are any
remote policies and
// then send those polices to start remote federation.
- asyncCreateTargetEventsSender(commandLink);
+ asyncCreateTargetEventsSender(controlLink);
// Setup events receiver link from the target if there are
any local policies
// and then start the policy managers to begin tracking
local demand.
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index ad5207323b..4b6c83c429 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -374,6 +374,102 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void testFederationCleanUpControlLinkWhenBrokerConnectionDrops()
throws Exception {
+ final String controlLinkAddress = "test-control-address";
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respond()
+ .withTarget().withAddress(controlLinkAddress)
+ .and()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V1)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(new
AMQPFederatedBrokerConnectionElement(getTestName()));
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final SimpleString controlLinkName =
SimpleString.of(FEDERATION_BASE_VALIDATION_ADDRESS +
+ "." +
FEDERATION_CONTROL_LINK_PREFIX +
+ "." +
controlLinkAddress);
+
+ Wait.assertNotNull(() -> server.locateQueue(controlLinkName), 2_000,
50);
+
+ peer.close();
+
+ Wait.assertNull(() -> server.locateQueue(controlLinkName), 2_000, 50);
+ Wait.assertFalse(() ->
server.addressQuery(controlLinkName).isExists(), 2_000, 50);
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testFederationCleanUpControlLinkWhenBrokerConnectionStopped()
throws Exception {
+ final String controlLinkAddress = "test-control-address";
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respond()
+ .withTarget().withAddress(controlLinkAddress)
+ .and()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(new
AMQPFederatedBrokerConnectionElement(getTestName()));
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final SimpleString controlLinkName =
SimpleString.of(FEDERATION_BASE_VALIDATION_ADDRESS +
+ "." +
FEDERATION_CONTROL_LINK_PREFIX +
+ "." +
controlLinkAddress);
+
+ Wait.assertNotNull(() -> server.locateQueue(controlLinkName), 2_000,
50);
+
+ server.getBrokerConnections().forEach((connection) -> {
+ try {
+ connection.stop();
+ } catch (Exception e) {
+ fail("Broker connection shutdown should not have thrown an
exception");
+ }
+ });
+
+ Wait.assertNull(() -> server.locateQueue(controlLinkName), 2_000, 50);
+ Wait.assertFalse(() ->
server.addressQuery(controlLinkName).isExists(), 2_000, 50);
+
+ peer.close();
+ }
+ }
+
@Test
@Timeout(20)
public void
testFederationCreatesControlLinkAndClosesConnectionIfCapabilityIsAbsent()
throws Exception {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index e5c7a378c5..82b48761d0 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -2106,7 +2106,7 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT);
- amqpConnection.setReconnectAttempts(10); // Limit reconnects
+ amqpConnection.setReconnectAttempts(20); // Limit reconnects
amqpConnection.setRetryInterval(50);
amqpConnection.addElement(element);
@@ -2143,6 +2143,8 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
assertNotNull(received);
received.reject(); // Terminal outcome should be DLQ'd
+ Wait.assertEquals(1L, () ->
server.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(),
5_000, 10);
+
final ConnectionFactory factoryRemote = new JmsConnectionFactory(
"amqp://localhost:" + SERVER_PORT_REMOTE +
"?jms.prefetchPolicy.all=0");
@@ -2155,8 +2157,9 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
final MessageConsumer consumer = sessionR.createConsumer(queue);
Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(), 1_000);
+ Wait.assertEquals(1, () ->
server.bindingQuery(SimpleString.of(getDeadLetterAddress())).getQueueNames().size(),
5_000);
Wait.assertTrue(() ->
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).isExists(),
1_000);
- Wait.assertEquals(1L, () ->
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(),
5_000, 10);
+ Wait.assertEquals(1L, () ->
remoteServer.queueQuery(SimpleString.of(getDeadLetterAddress())).getMessageCount(),
5_000);
assertNotNull(consumer.receiveNoWait());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]