This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new dba0741f43 ARTEMIS-5565 Allow for offered capabilities in broker 
connection Open
dba0741f43 is described below

commit dba0741f436f794476938215ad7ed5c498a06c4b
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jul 3 10:15:41 2025 -0400

    ARTEMIS-5565 Allow for offered capabilities in broker connection Open
    
    When opening a broker connection to another broker instance we should 
provide a
    means of setting the offered capabilities as we do the desired capabilites 
so
    that future work could send offered capabilities that the remote side can 
make
    use of which it could not currently as the broker connection can't 
configure them.
---
 .../amqp/broker/ProtonProtocolManager.java         |  19 ++-
 .../amqp/client/AMQPClientConnectionFactory.java   |  13 +-
 .../amqp/connect/AMQPBrokerConnection.java         |   2 +-
 .../amqp/proton/AMQPConnectionContext.java         |  29 +++-
 .../amqp/proton/handler/ProtonHandler.java         |   3 +-
 .../amqp/proton/AMQPConnectionContextTest.java     |   1 +
 .../amqp/AmqpOutboundConnectionTest.java           | 180 ++++++++++++++++++++-
 .../amqp/connect/AMQPFederationConnectTest.java    |  24 +++
 8 files changed, 252 insertions(+), 19 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index a4cedccc56..af26b5f8a5 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -237,7 +237,7 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, 
Connection remotingConnection) {
-      return internalConnectionEntry(remotingConnection, false, null, null, 
null);
+      return internalConnectionEntry(remotingConnection, false, null, null, 
null, null);
    }
 
    /**
@@ -245,22 +245,22 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
     * AMQP Bridges
     */
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection) {
-      return internalConnectionEntry(remotingConnection, true, null, null, 
null);
+      return internalConnectionEntry(remotingConnection, true, null, null, 
null, null);
    }
 
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory) {
-      return internalConnectionEntry(remotingConnection, true, saslFactory, 
null, null);
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
null, null, null);
    }
 
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> 
connectionProperties) {
-      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties, null);
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties, null, null);
    }
 
-   public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> 
connectionProperties, Symbol[] desiredCapabilities) {
-      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties, desiredCapabilities);
+   public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> 
connectionProperties, Symbol[] offeredCapabilities, Symbol[] 
desiredCapabilities) {
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties, offeredCapabilities, desiredCapabilities);
    }
 
-   private ConnectionEntry internalConnectionEntry(Connection 
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory, 
Map<Symbol, Object> connectionProperties, Symbol[] desiredCapabilities) {
+   private ConnectionEntry internalConnectionEntry(Connection 
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory, 
Map<Symbol, Object> connectionProperties, Symbol[] offeredCapabilities, 
Symbol[] desiredCapabilities) {
       AMQPConnectionCallback connectionCallback = new 
AMQPConnectionCallback(this, remotingConnection, 
server.getExecutorFactory().getExecutor(), server);
       long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
 
@@ -278,7 +278,10 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
       String id = server.getNodeID().toString();
       boolean useCoreSubscriptionNaming = 
server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, 
connectionCallback, id, (int) ttl, getMaxFrameSize(), 
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, 
server.getScheduledPool(), true, saslFactory, connectionProperties, 
desiredCapabilities, outgoing);
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(
+         this, connectionCallback, id, (int) ttl, getMaxFrameSize(), 
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
+         useCoreSubscriptionNaming, server.getScheduledPool(), true, 
saslFactory, connectionProperties,
+         offeredCapabilities, desiredCapabilities, outgoing);
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index cce7c9c82e..7e55a46b47 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -41,13 +41,21 @@ public class AMQPClientConnectionFactory {
    private final Map<Symbol, Object> connectionProperties;
    private final int ttl;
    private final boolean useCoreSubscriptionNaming;
+   private final Symbol[] desiredCapabilities;
+   private final Symbol[] offeredCapabilities;
 
    public AMQPClientConnectionFactory(ActiveMQServer server, String 
containerId, Map<Symbol, Object> connectionProperties, int ttl) {
+      this(server, containerId, connectionProperties, ttl, null, null);
+   }
+
+   public AMQPClientConnectionFactory(ActiveMQServer server, String 
containerId, Map<Symbol, Object> connectionProperties, int ttl, Symbol[] 
offeredCapabilities, Symbol[] desiredCapabilities) {
       this.server = server;
       this.containerId = containerId;
       this.connectionProperties = connectionProperties;
       this.ttl = ttl;
       this.useCoreSubscriptionNaming = false;
+      this.offeredCapabilities = offeredCapabilities;
+      this.desiredCapabilities = desiredCapabilities;
    }
 
    public ActiveMQProtonRemotingConnection 
createConnection(ProtonProtocolManager protocolManager, Connection connection, 
Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory) {
@@ -55,7 +63,10 @@ public class AMQPClientConnectionFactory {
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
-      AMQPConnectionContext amqpConnection = new 
AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, 
protocolManager.getMaxFrameSize(), 
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, 
server.getScheduledPool(), false, clientSASLFactory, connectionProperties, 
null);
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(
+         protocolManager, connectionCallback, containerId, ttl, 
protocolManager.getMaxFrameSize(),
+         AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, 
useCoreSubscriptionNaming, server.getScheduledPool(),
+         false, clientSASLFactory, connectionProperties, offeredCapabilities, 
desiredCapabilities);
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new 
ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, 
executor);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index ab026ceb29..b60a1706a0 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -505,7 +505,7 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
          final Symbol[] brokerDesiredCapabilities = bridgeManagers == null ? 
null : new Symbol[] {SHARED_SUBS};
 
          NettyConnectorCloseHandler connectorCloseHandler = new 
NettyConnectorCloseHandler(connector, connectExecutor);
-         ConnectionEntry entry = 
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory, 
brokerConnectionProperties, brokerDesiredCapabilities);
+         ConnectionEntry entry = 
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory, 
brokerConnectionProperties, null, brokerDesiredCapabilities);
          server.getRemotingService().addConnectionEntry(connection, entry);
          protonRemotingConnection = (ActiveMQProtonRemotingConnection) 
entry.connection;
          
protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(),
 this::linkClosed);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index d5b496c0d6..0982eff748 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -121,6 +121,7 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
    private final boolean isIncomingConnection;
    private final ClientSASLFactory saslClientFactory;
    private final Map<Symbol, Object> connectionProperties = new HashMap<>();
+   private final Symbol[] offeredCapabilities;
    private final Symbol[] desiredCapabilities;
    private final ScheduledExecutorService scheduledPool;
    private final Map<String, LinkCloseListener> linkCloseListeners = new 
ConcurrentHashMap<>();
@@ -152,8 +153,9 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
                                 boolean isIncomingConnection,
                                 ClientSASLFactory saslClientFactory,
                                 Map<Symbol, Object> connectionProperties,
+                                Symbol[] offeredCapabilities,
                                 Symbol[] desiredCapabilities) {
-      this(protocolManager, connectionSP, containerId, idleTimeout, 
maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool, 
isIncomingConnection, saslClientFactory, connectionProperties, 
desiredCapabilities, false);
+      this(protocolManager, connectionSP, containerId, idleTimeout, 
maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool, 
isIncomingConnection, saslClientFactory, connectionProperties, 
offeredCapabilities, desiredCapabilities, false);
    }
 
    public AMQPConnectionContext(ProtonProtocolManager protocolManager,
@@ -167,6 +169,7 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
                                 boolean isIncomingConnection,
                                 ClientSASLFactory saslClientFactory,
                                 Map<Symbol, Object> connectionProperties,
+                                Symbol[] offeredCapabilities,
                                 Symbol[] desiredCapabilities,
                                 boolean brokerConnection) {
       this.protocolManager = protocolManager;
@@ -184,6 +187,12 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
          this.connectionProperties.putAll(connectionProperties);
       }
 
+      if (offeredCapabilities != null && offeredCapabilities.length > 0) {
+         this.offeredCapabilities = Arrays.copyOf(offeredCapabilities, 
offeredCapabilities.length);
+      } else {
+         this.offeredCapabilities = null;
+      }
+
       if (desiredCapabilities != null && desiredCapabilities.length > 0) {
          this.desiredCapabilities = Arrays.copyOf(desiredCapabilities, 
desiredCapabilities.length);
       } else {
@@ -510,8 +519,13 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
       return verifyDesiredCapability(sender, FEDERATION_ADDRESS_RECEIVER);
    }
 
-   public Symbol[] getConnectionCapabilitiesOffered() {
-      URI tc = connectionCallback.getFailoverList();
+   private Symbol[] getOfferedCapabilitiesForResponse() {
+      return offeredCapabilities != null ? offeredCapabilities : 
ExtCapability.getCapabilities();
+   }
+
+   private Map<Symbol, Object> getConnectionPropertiesForResponse() {
+      final URI tc = connectionCallback.getFailoverList();
+
       if (tc != null) {
          Map<Symbol, Object> hostDetails = new HashMap<>();
          hostDetails.put(NETWORK_HOST, tc.getHost());
@@ -526,11 +540,12 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
 
          connectionProperties.put(FAILOVER_SERVER_LIST, 
Arrays.asList(hostDetails));
       }
-      return ExtCapability.getCapabilities();
+
+      return Collections.unmodifiableMap(connectionProperties);
    }
 
    public void open() {
-      handler.open(containerId, connectionProperties, desiredCapabilities);
+      handler.open(containerId, connectionProperties, offeredCapabilities, 
desiredCapabilities);
    }
 
    public String getContainer() {
@@ -661,8 +676,8 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
          // sent values or preventing the in process open from configuring the 
values it wants.
          if (isIncomingConnection) {
             connection.setContainer(containerId);
-            connection.setProperties(connectionProperties);
-            
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+            connection.setProperties(getConnectionPropertiesForResponse());
+            
connection.setOfferedCapabilities(getOfferedCapabilitiesForResponse());
             connection.setDesiredCapabilities(desiredCapabilities);
             connection.open();
          }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index c77dd4534e..ea4f920728 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -630,10 +630,11 @@ public class ProtonHandler extends ProtonInitializable 
implements SaslListener {
       flush();
    }
 
-   public void open(String containerId, Map<Symbol, Object> 
connectionProperties, Symbol[] desiredCapabilities) {
+   public void open(String containerId, Map<Symbol, Object> 
connectionProperties, Symbol[] offeredCapabilities, Symbol[] 
desiredCapabilities) {
       this.transport.open();
       this.connection.setContainer(containerId);
       this.connection.setProperties(connectionProperties);
+      this.connection.setOfferedCapabilities(offeredCapabilities);
       this.connection.setDesiredCapabilities(desiredCapabilities);
       this.connection.open();
       flush();
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
index cf1e7f1fe2..f767379ffb 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
@@ -77,6 +77,7 @@ public class AMQPConnectionContextTest {
          false,
          null,
          null,
+         null,
          null);
 
       connectionContext.onRemoteOpen(connection);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
index 33f0e5397e..1af7fe27cb 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
@@ -20,6 +20,7 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.hamcrest.Matchers.nullValue;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -27,6 +28,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -47,6 +49,7 @@ import 
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -72,7 +75,6 @@ public class AmqpOutboundConnectionTest extends 
AmqpClientTestSupport {
       runOutboundConnectionTest(true, true);
    }
 
-
    private void runOutboundConnectionTest(boolean withSecurity, boolean 
closeFromClient) throws Exception {
       final ActiveMQServer remote;
       try {
@@ -156,6 +158,164 @@ public class AmqpOutboundConnectionTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void testOutboundConnectsWithOfferedAndDesiredCapabilities() throws 
Exception {
+      // Tests that the underlying AMQPConnectionContext will honor the set 
offered and desired capabilities
+      // and place them in the outgoing Open performative.
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+         peer.expectOpen().withOfferedCapability("ANONYMOUS_RELAY")
+                          .withDesiredCapability("SHARED_SUBS")
+                          .respond();
+         peer.start();
+
+         final Map<String, Object> config = new LinkedHashMap<>(); 
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+         config.put(TransportConstants.PORT_PROP_NAME, 
String.valueOf(peer.getServerURI().getPort()));
+
+         final ClientSASLFactory clientSASLFactory = availableMechanims -> {
+            if (availableMechanims != null && 
Arrays.asList(availableMechanims).contains("ANONYMOUS")) {
+               return new AnonymousSASLMechanism();
+            } else {
+               return null;
+            }
+         };
+
+         final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+         EventHandler eventHandler = new EventHandler() {
+            @Override
+            public void onRemoteOpen(Connection connection) throws Exception {
+               connectionOpened.set(true);
+            }
+         };
+
+         final Symbol[] offeredCapabilities = new Symbol[] 
{Symbol.valueOf("ANONYMOUS_RELAY")};
+         final Symbol[] desiredCapabilities = new Symbol[] 
{Symbol.valueOf("SHARED_SUBS")};
+
+         AMQPClientConnectionFactory clientFactory = new 
AMQPClientConnectionFactory(server, "myid", 
Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000, 
offeredCapabilities, desiredCapabilities);
+         ProtonClientConnectionManager lifeCycleListener = new 
ProtonClientConnectionManager(clientFactory, Optional.of(eventHandler), 
clientSASLFactory);
+         ProtonClientProtocolManager protocolManager = new 
ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+         NettyConnector connector = new NettyConnector(config, 
lifeCycleListener, lifeCycleListener, 
server.getExecutorFactory().getExecutor(), 
server.getExecutorFactory().getExecutor(), server.getScheduledPool(), 
protocolManager);
+
+         try {
+            connector.start();
+
+            assertNotNull(connector.createConnection().getID());
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            Wait.assertTrue(connectionOpened::get);
+         } finally {
+            lifeCycleListener.stop();
+         }
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testOutboundTreatsEmptyOfferedAndDesiredAsNoCapabilities() 
throws Exception {
+      // Tests that the underlying AMQPConnectionContext will treat empty 
offered and desired capabilities
+      // arrays as being nothing to send and proceeding as normal with old 
default behavior of sending nothing.
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+         peer.expectOpen().withOfferedCapabilities(nullValue())
+                          .withDesiredCapabilities(nullValue())
+                          .respond();
+         peer.start();
+
+         final Map<String, Object> config = new LinkedHashMap<>(); 
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+         config.put(TransportConstants.PORT_PROP_NAME, 
String.valueOf(peer.getServerURI().getPort()));
+
+         final ClientSASLFactory clientSASLFactory = availableMechanims -> {
+            if (availableMechanims != null && 
Arrays.asList(availableMechanims).contains("ANONYMOUS")) {
+               return new AnonymousSASLMechanism();
+            } else {
+               return null;
+            }
+         };
+
+         final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+         EventHandler eventHandler = new EventHandler() {
+            @Override
+            public void onRemoteOpen(Connection connection) throws Exception {
+               connectionOpened.set(true);
+            }
+         };
+
+         final Symbol[] offeredCapabilities = new Symbol[0];
+         final Symbol[] desiredCapabilities = new Symbol[0];
+
+         AMQPClientConnectionFactory clientFactory = new 
AMQPClientConnectionFactory(server, "myid", 
Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000, 
offeredCapabilities, desiredCapabilities);
+         ProtonClientConnectionManager lifeCycleListener = new 
ProtonClientConnectionManager(clientFactory, Optional.of(eventHandler), 
clientSASLFactory);
+         ProtonClientProtocolManager protocolManager = new 
ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+         NettyConnector connector = new NettyConnector(config, 
lifeCycleListener, lifeCycleListener, 
server.getExecutorFactory().getExecutor(), 
server.getExecutorFactory().getExecutor(), server.getScheduledPool(), 
protocolManager);
+
+         try {
+            connector.start();
+
+            assertNotNull(connector.createConnection().getID());
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            Wait.assertTrue(connectionOpened::get);
+         } finally {
+            lifeCycleListener.stop();
+         }
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testOutboundRemainsDefaultedToNoOfferedOrDesiredCapabilities() 
throws Exception {
+      // Tests that the underlying AMQPConnectionContext retains its default 
behavior of not sending
+      // offered or desired capabilities if not told to do so.
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+         peer.expectOpen().withOfferedCapabilities(nullValue())
+                          .withDesiredCapabilities(nullValue())
+                          .respond();
+         peer.start();
+
+         final Map<String, Object> config = new LinkedHashMap<>(); 
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+         config.put(TransportConstants.PORT_PROP_NAME, 
String.valueOf(peer.getServerURI().getPort()));
+
+         final ClientSASLFactory clientSASLFactory = availableMechanims -> {
+            if (availableMechanims != null && 
Arrays.asList(availableMechanims).contains("ANONYMOUS")) {
+               return new AnonymousSASLMechanism();
+            } else {
+               return null;
+            }
+         };
+
+         final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+         EventHandler eventHandler = new EventHandler() {
+            @Override
+            public void onRemoteOpen(Connection connection) throws Exception {
+               connectionOpened.set(true);
+            }
+         };
+
+         AMQPClientConnectionFactory clientFactory = new 
AMQPClientConnectionFactory(server, "myid", 
Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000);
+         ProtonClientConnectionManager lifeCycleListener = new 
ProtonClientConnectionManager(clientFactory, Optional.of(eventHandler), 
clientSASLFactory);
+         ProtonClientProtocolManager protocolManager = new 
ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+         NettyConnector connector = new NettyConnector(config, 
lifeCycleListener, lifeCycleListener, 
server.getExecutorFactory().getExecutor(), 
server.getExecutorFactory().getExecutor(), server.getScheduledPool(), 
protocolManager);
+
+         try {
+            connector.start();
+
+            assertNotNull(connector.createConnection().getID());
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            Wait.assertTrue(connectionOpened::get);
+         } finally {
+            lifeCycleListener.stop();
+         }
+      }
+   }
    @Override
    protected boolean isSecurityEnabled() {
       return securityEnabled;
@@ -189,4 +349,22 @@ public class AmqpOutboundConnectionTest extends 
AmqpClientTestSupport {
          return new byte[0];
       }
    }
+
+   private static class AnonymousSASLMechanism implements ClientSASL {
+
+      @Override
+      public String getName() {
+         return "ANONYMOUS";
+      }
+
+      @Override
+      public byte[] getInitialResponse() {
+         return null;
+      }
+
+      @Override
+      public byte[] getResponse(byte[] challenge) {
+         return new byte[0];
+      }
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index a2b7b53920..201ca151cb 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -1341,6 +1341,30 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void testBrokerConnectionDoesNotSendOfferedCapabilities() throws 
Exception {
+      // Checks that current behavior is to not send, this will break if the 
defaults are
+      // changed such that the connection starts sending offered capabilities 
by default.
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+         peer.expectOpen().withOfferedCapabilities(nullValue()).respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         // No user or pass given, it will have to select ANONYMOUS even 
though PLAIN also offered
+         AMQPBrokerConnectConfiguration amqpConnection =
+               new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
 
    // Use these methods to script the initial handshake that a broker that is 
establishing
    // a federation connection with a remote broker instance would perform.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to