This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 7d3135c1c2 ARTEMIS-5316 XOAUTH2 SASL mechanism for broker connection
7d3135c1c2 is described below
commit 7d3135c1c2de6c59c2cd434319e84a5025d2ed97
Author: Tomasz Ćukasiewicz <[email protected]>
AuthorDate: Tue Feb 25 09:39:41 2025 +0100
ARTEMIS-5316 XOAUTH2 SASL mechanism for broker connection
---
.../amqp/connect/AMQPBrokerConnection.java | 71 ++++++++++++--
.../amqp/connect/AMQPConnectSaslTest.java | 106 +++++++++++++++++++++
2 files changed, 169 insertions(+), 8 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index b60a1706a0..ae068bace8 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -137,6 +137,7 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;
private static final NettyConnectorFactory CONNECTOR_FACTORY = new
NettyConnectorFactory().setServerConnector(true);
+ private static final String SASL_MECHANISMS_KEY = "saslMechanisms";
private final ProtonProtocolManagerFactory protonProtocolManagerFactory;
private final ReferenceIDSupplier referenceIdSupplier;
@@ -491,7 +492,8 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
senders.clear();
receivers.clear();
- ClientSASLFactory saslFactory = new SaslFactory(connection,
brokerConnectConfiguration);
+ final String[] enabledSaslMechanisms =
configuration.getExtraParams().containsKey(SASL_MECHANISMS_KEY) ?
protonProtocolManager.getSaslMechanisms() : null;
+ final ClientSASLFactory saslFactory = new SaslFactory(connection,
brokerConnectConfiguration, enabledSaslMechanisms);
final Map<String, Object> brokerConnectionInfo = new HashMap<>();
@@ -1181,6 +1183,7 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
private static final String EXTERNAL = "EXTERNAL";
private static final String PLAIN = "PLAIN";
private static final String ANONYMOUS = "ANONYMOUS";
+ private static final String XOAUTH2 = "XOAUTH2";
private static final byte[] EMPTY = new byte[0];
private static class PlainSASLMechanism implements ClientSASL {
@@ -1256,42 +1259,94 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
}
}
+ private static class XOAuth2SASLMechanism implements ClientSASL {
+
+ private final String userName;
+ private final String token;
+
+ XOAuth2SASLMechanism(String userName, String token) {
+ this.userName = userName;
+ this.token = token;
+ }
+
+ @Override
+ public String getName() {
+ return XOAUTH2;
+ }
+
+ @Override
+ public byte[] getInitialResponse() {
+ String response = String.format("user=%s\u0001auth=Bearer
%s\u0001\u0001", userName, token);
+ return response.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] getResponse(byte[] challenge) {
+ return EMPTY;
+ }
+
+ public static boolean isApplicable(final String username, final String
password) {
+ return username != null && !username.isEmpty() && password != null &&
!password.isEmpty();
+ }
+ }
+
private static final class SaslFactory implements ClientSASLFactory {
private final NettyConnection connection;
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
+ private final String[] enabledSaslMechanisms;
- SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration
brokerConnectConfiguration) {
+ SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration
brokerConnectConfiguration, String[] enabledSaslMechanisms) {
this.connection = connection;
this.brokerConnectConfiguration = brokerConnectConfiguration;
+ this.enabledSaslMechanisms = enabledSaslMechanisms;
}
@Override
- public ClientSASL chooseMechanism(String[] offeredMechanims) {
- List<String> availableMechanisms = offeredMechanims == null ?
Collections.emptyList() : Arrays.asList(offeredMechanims);
+ public ClientSASL chooseMechanism(String[] offeredMechanisms) {
+ Set<String> offeredMechanismsSet =
filterOfferedMechanisms(offeredMechanisms, enabledSaslMechanisms);
- if (availableMechanisms.contains(EXTERNAL) &&
ExternalSASLMechanism.isApplicable(connection)) {
+ if (offeredMechanismsSet.contains(EXTERNAL) &&
ExternalSASLMechanism.isApplicable(connection)) {
return new ExternalSASLMechanism();
}
+
if (SCRAMClientSASL.isApplicable(brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword())) {
for (SCRAM scram : SCRAM.values()) {
- if (availableMechanisms.contains(scram.getName())) {
+ if (offeredMechanismsSet.contains(scram.getName())) {
return new SCRAMClientSASL(scram,
brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword());
}
}
}
- if (availableMechanisms.contains(PLAIN) &&
PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword())) {
+
+ if (offeredMechanismsSet.contains(PLAIN) &&
PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword())) {
return new
PlainSASLMechanism(brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword());
}
- if (availableMechanisms.contains(ANONYMOUS)) {
+ if (offeredMechanismsSet.contains(XOAUTH2) &&
XOAuth2SASLMechanism.isApplicable(brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword())) {
+ return new
XOAuth2SASLMechanism(brokerConnectConfiguration.getUser(),
brokerConnectConfiguration.getPassword());
+ }
+
+ if (offeredMechanismsSet.contains(ANONYMOUS)) {
return new AnonymousSASLMechanism();
}
return null;
}
+
+ private Set<String> filterOfferedMechanisms(String[]
offeredSaslMechanisms, String[] enabledSaslMechanisms) {
+ Set<String> offeredSaslMechanismsSet = offeredSaslMechanisms == null
? Collections.emptySet() : new HashSet<>(Arrays.asList(offeredSaslMechanisms));
+
+ if (enabledSaslMechanisms == null || enabledSaslMechanisms.length ==
0) {
+ return offeredSaslMechanismsSet;
+ }
+
+ Set<String> enabledSaslMechanismsSet = new
HashSet<>(Arrays.asList(enabledSaslMechanisms));
+ enabledSaslMechanismsSet.retainAll(offeredSaslMechanismsSet);
+
+ return enabledSaslMechanismsSet;
+ }
}
public static boolean
isCoreMessageTunnelingEnabled(AMQPMirrorBrokerConnectionElement configuration) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
index 0e24e93b19..7bfefa2b18 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
@@ -57,6 +57,7 @@ public class AMQPConnectSaslTest extends
AmqpClientTestSupport {
private static final String ANONYMOUS = "ANONYMOUS";
private static final String EXTERNAL = "EXTERNAL";
private static final String SCRAM_SHA_512 = "SCRAM-SHA-512";
+ private static final String XOAUTH2 = "XOAUTH2";
@Override
protected ActiveMQServer createServer() throws Exception {
@@ -115,6 +116,111 @@ public class AMQPConnectSaslTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void testConnectsWithXOauth2() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLHeader().respondWithSASLHeader();
+ peer.remoteSaslMechanisms().withMechanisms(PLAIN, XOAUTH2,
ANONYMOUS).queue();
+
peer.expectSaslInit().withMechanism(XOAUTH2).withInitialResponse(peer.saslXOauth2InitialResponse(USER,
PASSWD));
+ peer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.debug("Connect test started, peer listening on: {}",
remoteURI);
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(),
"tcp://localhost:" + remoteURI.getPort() + "?saslMechanisms=" + XOAUTH2);
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.setUser(USER);
+ amqpConnection.setPassword(PASSWD);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testXOAuth2NotSelectedWhenUserAndPasswordNotProvided() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect(XOAUTH2, ANONYMOUS);
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.debug("Connect test started, peer listening on: {}",
remoteURI);
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(),
+ "tcp://localhost:" + remoteURI.getPort() + "?saslMechanisms="
+ XOAUTH2 + "," + ANONYMOUS);
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testConnectsWithPlainWhenPlainAndXOauth2IsValid() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLPlainConnect(USER, PASSWD, PLAIN, XOAUTH2, ANONYMOUS);
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.debug("Connect test started, peer listening on: {}",
remoteURI);
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(),
+ "tcp://localhost:" + remoteURI.getPort() + "?saslMechanisms="
+ XOAUTH2 + "," + PLAIN);
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.setUser(USER);
+ amqpConnection.setPassword(PASSWD);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testConnectionFailsIfFilterExcudesAnyOfferedMechanism() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLHeader().respondWithSASLHeader();
+ peer.remoteSaslMechanisms().withMechanisms(PLAIN, ANONYMOUS).queue();
+ peer.expectConnectionToDrop();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.debug("Connect test started, peer listening on: {}",
remoteURI);
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(),
+ "tcp://localhost:" + remoteURI.getPort() + "?saslMechanisms="
+ XOAUTH2);
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.setUser(USER);
+ amqpConnection.setPassword(PASSWD);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
@Test
@Timeout(20)
public void testAnonymousSelectedWhenNoCredentialsSupplied() throws
Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact