This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new dba0741f43 ARTEMIS-5565 Allow for offered capabilities in broker
connection Open
dba0741f43 is described below
commit dba0741f436f794476938215ad7ed5c498a06c4b
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jul 3 10:15:41 2025 -0400
ARTEMIS-5565 Allow for offered capabilities in broker connection Open
When opening a broker connection to another broker instance we should
provide a
means of setting the offered capabilities as we do the desired capabilites
so
that future work could send offered capabilities that the remote side can
make
use of which it could not currently as the broker connection can't
configure them.
---
.../amqp/broker/ProtonProtocolManager.java | 19 ++-
.../amqp/client/AMQPClientConnectionFactory.java | 13 +-
.../amqp/connect/AMQPBrokerConnection.java | 2 +-
.../amqp/proton/AMQPConnectionContext.java | 29 +++-
.../amqp/proton/handler/ProtonHandler.java | 3 +-
.../amqp/proton/AMQPConnectionContextTest.java | 1 +
.../amqp/AmqpOutboundConnectionTest.java | 180 ++++++++++++++++++++-
.../amqp/connect/AMQPFederationConnectTest.java | 24 +++
8 files changed, 252 insertions(+), 19 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index a4cedccc56..af26b5f8a5 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -237,7 +237,7 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed,
Connection remotingConnection) {
- return internalConnectionEntry(remotingConnection, false, null, null,
null);
+ return internalConnectionEntry(remotingConnection, false, null, null,
null, null);
}
/**
@@ -245,22 +245,22 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
* AMQP Bridges
*/
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection) {
- return internalConnectionEntry(remotingConnection, true, null, null,
null);
+ return internalConnectionEntry(remotingConnection, true, null, null,
null, null);
}
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory) {
- return internalConnectionEntry(remotingConnection, true, saslFactory,
null, null);
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
null, null, null);
}
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object>
connectionProperties) {
- return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties, null);
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties, null, null);
}
- public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object>
connectionProperties, Symbol[] desiredCapabilities) {
- return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties, desiredCapabilities);
+ public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object>
connectionProperties, Symbol[] offeredCapabilities, Symbol[]
desiredCapabilities) {
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties, offeredCapabilities, desiredCapabilities);
}
- private ConnectionEntry internalConnectionEntry(Connection
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory,
Map<Symbol, Object> connectionProperties, Symbol[] desiredCapabilities) {
+ private ConnectionEntry internalConnectionEntry(Connection
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory,
Map<Symbol, Object> connectionProperties, Symbol[] offeredCapabilities,
Symbol[] desiredCapabilities) {
AMQPConnectionCallback connectionCallback = new
AMQPConnectionCallback(this, remotingConnection,
server.getExecutorFactory().getExecutor(), server);
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
@@ -278,7 +278,10 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
String id = server.getNodeID().toString();
boolean useCoreSubscriptionNaming =
server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
- AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this,
connectionCallback, id, (int) ttl, getMaxFrameSize(),
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), true, saslFactory, connectionProperties,
desiredCapabilities, outgoing);
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(
+ this, connectionCallback, id, (int) ttl, getMaxFrameSize(),
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
+ useCoreSubscriptionNaming, server.getScheduledPool(), true,
saslFactory, connectionProperties,
+ offeredCapabilities, desiredCapabilities, outgoing);
Executor executor = server.getExecutorFactory().getExecutor();
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index cce7c9c82e..7e55a46b47 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -41,13 +41,21 @@ public class AMQPClientConnectionFactory {
private final Map<Symbol, Object> connectionProperties;
private final int ttl;
private final boolean useCoreSubscriptionNaming;
+ private final Symbol[] desiredCapabilities;
+ private final Symbol[] offeredCapabilities;
public AMQPClientConnectionFactory(ActiveMQServer server, String
containerId, Map<Symbol, Object> connectionProperties, int ttl) {
+ this(server, containerId, connectionProperties, ttl, null, null);
+ }
+
+ public AMQPClientConnectionFactory(ActiveMQServer server, String
containerId, Map<Symbol, Object> connectionProperties, int ttl, Symbol[]
offeredCapabilities, Symbol[] desiredCapabilities) {
this.server = server;
this.containerId = containerId;
this.connectionProperties = connectionProperties;
this.ttl = ttl;
this.useCoreSubscriptionNaming = false;
+ this.offeredCapabilities = offeredCapabilities;
+ this.desiredCapabilities = desiredCapabilities;
}
public ActiveMQProtonRemotingConnection
createConnection(ProtonProtocolManager protocolManager, Connection connection,
Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory) {
@@ -55,7 +63,10 @@ public class AMQPClientConnectionFactory {
Executor executor = server.getExecutorFactory().getExecutor();
- AMQPConnectionContext amqpConnection = new
AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl,
protocolManager.getMaxFrameSize(),
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), false, clientSASLFactory, connectionProperties,
null);
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(
+ protocolManager, connectionCallback, containerId, ttl,
protocolManager.getMaxFrameSize(),
+ AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
useCoreSubscriptionNaming, server.getScheduledPool(),
+ false, clientSASLFactory, connectionProperties, offeredCapabilities,
desiredCapabilities);
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new
ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection,
executor);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index ab026ceb29..b60a1706a0 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -505,7 +505,7 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
final Symbol[] brokerDesiredCapabilities = bridgeManagers == null ?
null : new Symbol[] {SHARED_SUBS};
NettyConnectorCloseHandler connectorCloseHandler = new
NettyConnectorCloseHandler(connector, connectExecutor);
- ConnectionEntry entry =
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory,
brokerConnectionProperties, brokerDesiredCapabilities);
+ ConnectionEntry entry =
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory,
brokerConnectionProperties, null, brokerDesiredCapabilities);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection)
entry.connection;
protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(),
this::linkClosed);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index d5b496c0d6..0982eff748 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -121,6 +121,7 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
private final boolean isIncomingConnection;
private final ClientSASLFactory saslClientFactory;
private final Map<Symbol, Object> connectionProperties = new HashMap<>();
+ private final Symbol[] offeredCapabilities;
private final Symbol[] desiredCapabilities;
private final ScheduledExecutorService scheduledPool;
private final Map<String, LinkCloseListener> linkCloseListeners = new
ConcurrentHashMap<>();
@@ -152,8 +153,9 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
boolean isIncomingConnection,
ClientSASLFactory saslClientFactory,
Map<Symbol, Object> connectionProperties,
+ Symbol[] offeredCapabilities,
Symbol[] desiredCapabilities) {
- this(protocolManager, connectionSP, containerId, idleTimeout,
maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool,
isIncomingConnection, saslClientFactory, connectionProperties,
desiredCapabilities, false);
+ this(protocolManager, connectionSP, containerId, idleTimeout,
maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool,
isIncomingConnection, saslClientFactory, connectionProperties,
offeredCapabilities, desiredCapabilities, false);
}
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
@@ -167,6 +169,7 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
boolean isIncomingConnection,
ClientSASLFactory saslClientFactory,
Map<Symbol, Object> connectionProperties,
+ Symbol[] offeredCapabilities,
Symbol[] desiredCapabilities,
boolean brokerConnection) {
this.protocolManager = protocolManager;
@@ -184,6 +187,12 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
this.connectionProperties.putAll(connectionProperties);
}
+ if (offeredCapabilities != null && offeredCapabilities.length > 0) {
+ this.offeredCapabilities = Arrays.copyOf(offeredCapabilities,
offeredCapabilities.length);
+ } else {
+ this.offeredCapabilities = null;
+ }
+
if (desiredCapabilities != null && desiredCapabilities.length > 0) {
this.desiredCapabilities = Arrays.copyOf(desiredCapabilities,
desiredCapabilities.length);
} else {
@@ -510,8 +519,13 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
return verifyDesiredCapability(sender, FEDERATION_ADDRESS_RECEIVER);
}
- public Symbol[] getConnectionCapabilitiesOffered() {
- URI tc = connectionCallback.getFailoverList();
+ private Symbol[] getOfferedCapabilitiesForResponse() {
+ return offeredCapabilities != null ? offeredCapabilities :
ExtCapability.getCapabilities();
+ }
+
+ private Map<Symbol, Object> getConnectionPropertiesForResponse() {
+ final URI tc = connectionCallback.getFailoverList();
+
if (tc != null) {
Map<Symbol, Object> hostDetails = new HashMap<>();
hostDetails.put(NETWORK_HOST, tc.getHost());
@@ -526,11 +540,12 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
connectionProperties.put(FAILOVER_SERVER_LIST,
Arrays.asList(hostDetails));
}
- return ExtCapability.getCapabilities();
+
+ return Collections.unmodifiableMap(connectionProperties);
}
public void open() {
- handler.open(containerId, connectionProperties, desiredCapabilities);
+ handler.open(containerId, connectionProperties, offeredCapabilities,
desiredCapabilities);
}
public String getContainer() {
@@ -661,8 +676,8 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
// sent values or preventing the in process open from configuring the
values it wants.
if (isIncomingConnection) {
connection.setContainer(containerId);
- connection.setProperties(connectionProperties);
-
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ connection.setProperties(getConnectionPropertiesForResponse());
+
connection.setOfferedCapabilities(getOfferedCapabilitiesForResponse());
connection.setDesiredCapabilities(desiredCapabilities);
connection.open();
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index c77dd4534e..ea4f920728 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -630,10 +630,11 @@ public class ProtonHandler extends ProtonInitializable
implements SaslListener {
flush();
}
- public void open(String containerId, Map<Symbol, Object>
connectionProperties, Symbol[] desiredCapabilities) {
+ public void open(String containerId, Map<Symbol, Object>
connectionProperties, Symbol[] offeredCapabilities, Symbol[]
desiredCapabilities) {
this.transport.open();
this.connection.setContainer(containerId);
this.connection.setProperties(connectionProperties);
+ this.connection.setOfferedCapabilities(offeredCapabilities);
this.connection.setDesiredCapabilities(desiredCapabilities);
this.connection.open();
flush();
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
index cf1e7f1fe2..f767379ffb 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
@@ -77,6 +77,7 @@ public class AMQPConnectionContextTest {
false,
null,
null,
+ null,
null);
connectionContext.onRemoteOpen(connection);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
index 33f0e5397e..1af7fe27cb 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
@@ -20,6 +20,7 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.hamcrest.Matchers.nullValue;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -27,6 +28,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -47,6 +49,7 @@ import
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -72,7 +75,6 @@ public class AmqpOutboundConnectionTest extends
AmqpClientTestSupport {
runOutboundConnectionTest(true, true);
}
-
private void runOutboundConnectionTest(boolean withSecurity, boolean
closeFromClient) throws Exception {
final ActiveMQServer remote;
try {
@@ -156,6 +158,164 @@ public class AmqpOutboundConnectionTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void testOutboundConnectsWithOfferedAndDesiredCapabilities() throws
Exception {
+ // Tests that the underlying AMQPConnectionContext will honor the set
offered and desired capabilities
+ // and place them in the outgoing Open performative.
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().withOfferedCapability("ANONYMOUS_RELAY")
+ .withDesiredCapability("SHARED_SUBS")
+ .respond();
+ peer.start();
+
+ final Map<String, Object> config = new LinkedHashMap<>();
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ config.put(TransportConstants.PORT_PROP_NAME,
String.valueOf(peer.getServerURI().getPort()));
+
+ final ClientSASLFactory clientSASLFactory = availableMechanims -> {
+ if (availableMechanims != null &&
Arrays.asList(availableMechanims).contains("ANONYMOUS")) {
+ return new AnonymousSASLMechanism();
+ } else {
+ return null;
+ }
+ };
+
+ final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+ EventHandler eventHandler = new EventHandler() {
+ @Override
+ public void onRemoteOpen(Connection connection) throws Exception {
+ connectionOpened.set(true);
+ }
+ };
+
+ final Symbol[] offeredCapabilities = new Symbol[]
{Symbol.valueOf("ANONYMOUS_RELAY")};
+ final Symbol[] desiredCapabilities = new Symbol[]
{Symbol.valueOf("SHARED_SUBS")};
+
+ AMQPClientConnectionFactory clientFactory = new
AMQPClientConnectionFactory(server, "myid",
Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000,
offeredCapabilities, desiredCapabilities);
+ ProtonClientConnectionManager lifeCycleListener = new
ProtonClientConnectionManager(clientFactory, Optional.of(eventHandler),
clientSASLFactory);
+ ProtonClientProtocolManager protocolManager = new
ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+ NettyConnector connector = new NettyConnector(config,
lifeCycleListener, lifeCycleListener,
server.getExecutorFactory().getExecutor(),
server.getExecutorFactory().getExecutor(), server.getScheduledPool(),
protocolManager);
+
+ try {
+ connector.start();
+
+ assertNotNull(connector.createConnection().getID());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ Wait.assertTrue(connectionOpened::get);
+ } finally {
+ lifeCycleListener.stop();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testOutboundTreatsEmptyOfferedAndDesiredAsNoCapabilities()
throws Exception {
+ // Tests that the underlying AMQPConnectionContext will treat empty
offered and desired capabilities
+ // arrays as being nothing to send and proceeding as normal with old
default behavior of sending nothing.
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().withOfferedCapabilities(nullValue())
+ .withDesiredCapabilities(nullValue())
+ .respond();
+ peer.start();
+
+ final Map<String, Object> config = new LinkedHashMap<>();
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ config.put(TransportConstants.PORT_PROP_NAME,
String.valueOf(peer.getServerURI().getPort()));
+
+ final ClientSASLFactory clientSASLFactory = availableMechanims -> {
+ if (availableMechanims != null &&
Arrays.asList(availableMechanims).contains("ANONYMOUS")) {
+ return new AnonymousSASLMechanism();
+ } else {
+ return null;
+ }
+ };
+
+ final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+ EventHandler eventHandler = new EventHandler() {
+ @Override
+ public void onRemoteOpen(Connection connection) throws Exception {
+ connectionOpened.set(true);
+ }
+ };
+
+ final Symbol[] offeredCapabilities = new Symbol[0];
+ final Symbol[] desiredCapabilities = new Symbol[0];
+
+ AMQPClientConnectionFactory clientFactory = new
AMQPClientConnectionFactory(server, "myid",
Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000,
offeredCapabilities, desiredCapabilities);
+ ProtonClientConnectionManager lifeCycleListener = new
ProtonClientConnectionManager(clientFactory, Optional.of(eventHandler),
clientSASLFactory);
+ ProtonClientProtocolManager protocolManager = new
ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+ NettyConnector connector = new NettyConnector(config,
lifeCycleListener, lifeCycleListener,
server.getExecutorFactory().getExecutor(),
server.getExecutorFactory().getExecutor(), server.getScheduledPool(),
protocolManager);
+
+ try {
+ connector.start();
+
+ assertNotNull(connector.createConnection().getID());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ Wait.assertTrue(connectionOpened::get);
+ } finally {
+ lifeCycleListener.stop();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testOutboundRemainsDefaultedToNoOfferedOrDesiredCapabilities()
throws Exception {
+ // Tests that the underlying AMQPConnectionContext retains its default
behavior of not sending
+ // offered or desired capabilities if not told to do so.
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().withOfferedCapabilities(nullValue())
+ .withDesiredCapabilities(nullValue())
+ .respond();
+ peer.start();
+
+ final Map<String, Object> config = new LinkedHashMap<>();
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ config.put(TransportConstants.PORT_PROP_NAME,
String.valueOf(peer.getServerURI().getPort()));
+
+ final ClientSASLFactory clientSASLFactory = availableMechanims -> {
+ if (availableMechanims != null &&
Arrays.asList(availableMechanims).contains("ANONYMOUS")) {
+ return new AnonymousSASLMechanism();
+ } else {
+ return null;
+ }
+ };
+
+ final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+ EventHandler eventHandler = new EventHandler() {
+ @Override
+ public void onRemoteOpen(Connection connection) throws Exception {
+ connectionOpened.set(true);
+ }
+ };
+
+ AMQPClientConnectionFactory clientFactory = new
AMQPClientConnectionFactory(server, "myid",
Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000);
+ ProtonClientConnectionManager lifeCycleListener = new
ProtonClientConnectionManager(clientFactory, Optional.of(eventHandler),
clientSASLFactory);
+ ProtonClientProtocolManager protocolManager = new
ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+ NettyConnector connector = new NettyConnector(config,
lifeCycleListener, lifeCycleListener,
server.getExecutorFactory().getExecutor(),
server.getExecutorFactory().getExecutor(), server.getScheduledPool(),
protocolManager);
+
+ try {
+ connector.start();
+
+ assertNotNull(connector.createConnection().getID());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ Wait.assertTrue(connectionOpened::get);
+ } finally {
+ lifeCycleListener.stop();
+ }
+ }
+ }
@Override
protected boolean isSecurityEnabled() {
return securityEnabled;
@@ -189,4 +349,22 @@ public class AmqpOutboundConnectionTest extends
AmqpClientTestSupport {
return new byte[0];
}
}
+
+ private static class AnonymousSASLMechanism implements ClientSASL {
+
+ @Override
+ public String getName() {
+ return "ANONYMOUS";
+ }
+
+ @Override
+ public byte[] getInitialResponse() {
+ return null;
+ }
+
+ @Override
+ public byte[] getResponse(byte[] challenge) {
+ return new byte[0];
+ }
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index a2b7b53920..201ca151cb 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -1341,6 +1341,30 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void testBrokerConnectionDoesNotSendOfferedCapabilities() throws
Exception {
+ // Checks that current behavior is to not send, this will break if the
defaults are
+ // changed such that the connection starts sending offered capabilities
by default.
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().withOfferedCapabilities(nullValue()).respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ // No user or pass given, it will have to select ANONYMOUS even
though PLAIN also offered
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
// Use these methods to script the initial handshake that a broker that is
establishing
// a federation connection with a remote broker instance would perform.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact