This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d3135c1c2 ARTEMIS-5316 XOAUTH2 SASL mechanism for broker connection
7d3135c1c2 is described below

commit 7d3135c1c2de6c59c2cd434319e84a5025d2ed97
Author: Tomasz Ɓukasiewicz <[email protected]>
AuthorDate: Tue Feb 25 09:39:41 2025 +0100

    ARTEMIS-5316 XOAUTH2 SASL mechanism for broker connection
---
 .../amqp/connect/AMQPBrokerConnection.java         |  71 ++++++++++++--
 .../amqp/connect/AMQPConnectSaslTest.java          | 106 +++++++++++++++++++++
 2 files changed, 169 insertions(+), 8 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index b60a1706a0..ae068bace8 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -137,6 +137,7 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
    public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;
 
    private static final NettyConnectorFactory CONNECTOR_FACTORY = new 
NettyConnectorFactory().setServerConnector(true);
+   private static final String SASL_MECHANISMS_KEY = "saslMechanisms";
 
    private final ProtonProtocolManagerFactory protonProtocolManagerFactory;
    private final ReferenceIDSupplier referenceIdSupplier;
@@ -491,7 +492,8 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
          senders.clear();
          receivers.clear();
 
-         ClientSASLFactory saslFactory = new SaslFactory(connection, 
brokerConnectConfiguration);
+         final String[] enabledSaslMechanisms = 
configuration.getExtraParams().containsKey(SASL_MECHANISMS_KEY) ? 
protonProtocolManager.getSaslMechanisms() : null;
+         final ClientSASLFactory saslFactory = new SaslFactory(connection, 
brokerConnectConfiguration, enabledSaslMechanisms);
 
          final Map<String, Object> brokerConnectionInfo = new HashMap<>();
 
@@ -1181,6 +1183,7 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
    private static final String EXTERNAL = "EXTERNAL";
    private static final String PLAIN = "PLAIN";
    private static final String ANONYMOUS = "ANONYMOUS";
+   private static final String XOAUTH2 = "XOAUTH2";
    private static final byte[] EMPTY = new byte[0];
 
    private static class PlainSASLMechanism implements ClientSASL {
@@ -1256,42 +1259,94 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
       }
    }
 
+   private static class XOAuth2SASLMechanism implements ClientSASL {
+
+      private final String userName;
+      private final String token;
+
+      XOAuth2SASLMechanism(String userName, String token) {
+         this.userName = userName;
+         this.token = token;
+      }
+
+      @Override
+      public String getName() {
+         return XOAUTH2;
+      }
+
+      @Override
+      public byte[] getInitialResponse() {
+         String response = String.format("user=%s\u0001auth=Bearer 
%s\u0001\u0001", userName, token);
+         return response.getBytes(StandardCharsets.UTF_8);
+      }
+
+      @Override
+      public byte[] getResponse(byte[] challenge) {
+         return EMPTY;
+      }
+
+      public static boolean isApplicable(final String username, final String 
password) {
+         return username != null && !username.isEmpty() && password != null && 
!password.isEmpty();
+      }
+   }
+
    private static final class SaslFactory implements ClientSASLFactory {
 
       private final NettyConnection connection;
       private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
+      private final String[] enabledSaslMechanisms;
 
-      SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration 
brokerConnectConfiguration) {
+      SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration 
brokerConnectConfiguration, String[] enabledSaslMechanisms) {
          this.connection = connection;
          this.brokerConnectConfiguration = brokerConnectConfiguration;
+         this.enabledSaslMechanisms = enabledSaslMechanisms;
       }
 
       @Override
-      public ClientSASL chooseMechanism(String[] offeredMechanims) {
-         List<String> availableMechanisms = offeredMechanims == null ? 
Collections.emptyList() : Arrays.asList(offeredMechanims);
+      public ClientSASL chooseMechanism(String[] offeredMechanisms) {
+         Set<String> offeredMechanismsSet = 
filterOfferedMechanisms(offeredMechanisms, enabledSaslMechanisms);
 
-         if (availableMechanisms.contains(EXTERNAL) && 
ExternalSASLMechanism.isApplicable(connection)) {
+         if (offeredMechanismsSet.contains(EXTERNAL) && 
ExternalSASLMechanism.isApplicable(connection)) {
             return new ExternalSASLMechanism();
          }
+
          if (SCRAMClientSASL.isApplicable(brokerConnectConfiguration.getUser(),
                                           
brokerConnectConfiguration.getPassword())) {
             for (SCRAM scram : SCRAM.values()) {
-               if (availableMechanisms.contains(scram.getName())) {
+               if (offeredMechanismsSet.contains(scram.getName())) {
                   return new SCRAMClientSASL(scram, 
brokerConnectConfiguration.getUser(),
                                              
brokerConnectConfiguration.getPassword());
                }
             }
          }
-         if (availableMechanisms.contains(PLAIN) && 
PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(), 
brokerConnectConfiguration.getPassword())) {
+
+         if (offeredMechanismsSet.contains(PLAIN) && 
PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(), 
brokerConnectConfiguration.getPassword())) {
             return new 
PlainSASLMechanism(brokerConnectConfiguration.getUser(), 
brokerConnectConfiguration.getPassword());
          }
 
-         if (availableMechanisms.contains(ANONYMOUS)) {
+         if (offeredMechanismsSet.contains(XOAUTH2) && 
XOAuth2SASLMechanism.isApplicable(brokerConnectConfiguration.getUser(), 
brokerConnectConfiguration.getPassword())) {
+            return new 
XOAuth2SASLMechanism(brokerConnectConfiguration.getUser(), 
brokerConnectConfiguration.getPassword());
+         }
+
+         if (offeredMechanismsSet.contains(ANONYMOUS)) {
             return new AnonymousSASLMechanism();
          }
 
          return null;
       }
+
+      private Set<String> filterOfferedMechanisms(String[] 
offeredSaslMechanisms, String[] enabledSaslMechanisms) {
+         Set<String> offeredSaslMechanismsSet = offeredSaslMechanisms == null 
? Collections.emptySet() : new HashSet<>(Arrays.asList(offeredSaslMechanisms));
+
+         if (enabledSaslMechanisms == null || enabledSaslMechanisms.length == 
0) {
+            return offeredSaslMechanismsSet;
+         }
+
+         Set<String> enabledSaslMechanismsSet = new 
HashSet<>(Arrays.asList(enabledSaslMechanisms));
+         enabledSaslMechanismsSet.retainAll(offeredSaslMechanismsSet);
+
+         return enabledSaslMechanismsSet;
+      }
    }
 
    public static boolean 
isCoreMessageTunnelingEnabled(AMQPMirrorBrokerConnectionElement configuration) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
index 0e24e93b19..7bfefa2b18 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
@@ -57,6 +57,7 @@ public class AMQPConnectSaslTest extends 
AmqpClientTestSupport {
    private static final String ANONYMOUS = "ANONYMOUS";
    private static final String EXTERNAL = "EXTERNAL";
    private static final String SCRAM_SHA_512 = "SCRAM-SHA-512";
+   private static final String XOAUTH2 = "XOAUTH2";
 
    @Override
    protected ActiveMQServer createServer() throws Exception {
@@ -115,6 +116,111 @@ public class AMQPConnectSaslTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void testConnectsWithXOauth2() throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLHeader().respondWithSASLHeader();
+         peer.remoteSaslMechanisms().withMechanisms(PLAIN, XOAUTH2, 
ANONYMOUS).queue();
+         
peer.expectSaslInit().withMechanism(XOAUTH2).withInitialResponse(peer.saslXOauth2InitialResponse(USER,
 PASSWD));
+         peer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
+         peer.expectAMQPHeader().respondWithAMQPHeader();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.debug("Connect test started, peer listening on: {}", 
remoteURI);
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+             new AMQPBrokerConnectConfiguration(getTestName(), 
"tcp://localhost:" + remoteURI.getPort() + "?saslMechanisms=" + XOAUTH2);
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.setUser(USER);
+         amqpConnection.setPassword(PASSWD);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testXOAuth2NotSelectedWhenUserAndPasswordNotProvided() throws 
Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect(XOAUTH2, ANONYMOUS);
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.debug("Connect test started, peer listening on: {}", 
remoteURI);
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+             new AMQPBrokerConnectConfiguration(getTestName(),
+                 "tcp://localhost:" + remoteURI.getPort() + "?saslMechanisms=" 
+ XOAUTH2 + "," + ANONYMOUS);
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testConnectsWithPlainWhenPlainAndXOauth2IsValid() throws 
Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLPlainConnect(USER, PASSWD, PLAIN, XOAUTH2, ANONYMOUS);
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.debug("Connect test started, peer listening on: {}", 
remoteURI);
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+             new AMQPBrokerConnectConfiguration(getTestName(),
+                 "tcp://localhost:" + remoteURI.getPort() + "?saslMechanisms=" 
+ XOAUTH2 + "," + PLAIN);
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.setUser(USER);
+         amqpConnection.setPassword(PASSWD);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testConnectionFailsIfFilterExcudesAnyOfferedMechanism() throws 
Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLHeader().respondWithSASLHeader();
+         peer.remoteSaslMechanisms().withMechanisms(PLAIN, ANONYMOUS).queue();
+         peer.expectConnectionToDrop();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.debug("Connect test started, peer listening on: {}", 
remoteURI);
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+             new AMQPBrokerConnectConfiguration(getTestName(),
+                 "tcp://localhost:" + remoteURI.getPort() + "?saslMechanisms=" 
+ XOAUTH2);
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.setUser(USER);
+         amqpConnection.setPassword(PASSWD);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
    @Test
    @Timeout(20)
    public void testAnonymousSelectedWhenNoCredentialsSupplied() throws 
Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to