Copilot commented on code in PR #25538:
URL: https://github.com/apache/pulsar/pull/25538#discussion_r3115626813


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlowTest.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.testng.Assert.assertEquals;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import org.testng.annotations.Test;
+
+public class TlsClientAuthFlowTest {
+
+    @Test
+    public void testFromParametersWithoutClientId() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("tlsCertFile", "/path/to/cert.pem");
+        params.put("tlsKeyFile", "/path/to/key.pem");
+        params.put("issuerUrl", "http://localhost";);
+        params.put("scope", "http://localhost";);
+        OAuth2MockHttpClient.withMockedSslFactory(() -> {
+            TlsClientAuthFlow flow = TlsClientAuthFlow.fromParameters(params);
+            Field clientIdField = flow.getClass().getDeclaredField("clientId");
+            clientIdField.setAccessible(true);
+            assertEquals((String) clientIdField.get(flow), "pulsar-client");
+            flow.close();

Review Comment:
   This test reaches into private state via reflection to assert the default 
`clientId`. That’s brittle (renames/refactors break the test without changing 
behavior). Prefer asserting observable behavior (e.g., the token request uses 
the default client_id) or exposing a package-private accessor/default constant 
for tests.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.io.IOException;
+import java.net.URL;
+import java.time.Duration;
+import java.util.Map;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+
+/**
+ * Implementation of OAuth 2.0 Client TLS Authentication flow.
+ *
+ * @see <a href="https://datatracker.ietf.org/doc/html/rfc8705";>RFC 8705 - 
OAuth 2.0 Mutual-TLS Client Authentication</a>
+ */
+@Slf4j
+class TlsClientAuthFlow extends FlowBase {
+    public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
+    public static final String CONFIG_PARAM_CLIENT_ID = "clientId";
+    public static final String CONFIG_PARAM_AUDIENCE = "audience";
+    public static final String CONFIG_PARAM_SCOPE = "scope";
+    public static final String CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION =
+            FlowBase.CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION;
+
+    private static final String DEFAULT_CLIENT_ID = "pulsar-client";
+
+    private static final long serialVersionUID = 1L;
+
+    private final String clientId;
+    private final String audience;
+    private final String scope;
+
+    private transient TokenClient exchanger;
+
+    private boolean initialized = false;
+
+    @Builder
+    public TlsClientAuthFlow(URL issuerUrl, String clientId, String certFile, 
String keyFile, String audience,
+                             String scope, Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath,
+                             String wellKnownMetadataPath, Duration 
autoCertRefreshDuration) {
+        super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath, 
certFile, keyFile, autoCertRefreshDuration,
+                wellKnownMetadataPath);
+        this.clientId = clientId == null ? DEFAULT_CLIENT_ID : clientId;
+        this.audience = audience;

Review Comment:
   `clientId` defaults only when it is `null`. If a caller passes an 
empty/blank string (possible via JSON config), the flow will send an empty 
`client_id`, which is unlikely to be intended. Consider defaulting when 
`clientId` is blank (e.g., `StringUtils.defaultIfBlank(clientId, 
DEFAULT_CLIENT_ID)`).



##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java:
##########
@@ -79,6 +80,71 @@ public void 
exchangeClientCredentialsSuccessWithoutOptionalClientCredentialsTest
         ClientCredentialsExchangeRequest request = 
ClientCredentialsExchangeRequest.builder()
                 .clientId("test-client-id")
                 .clientSecret("test-client-secret")
+                .authMethod(TokenEndpointAuthMethod.CLIENT_SECRET_POST)
+                .build();
+        String body = tokenClient.buildClientCredentialsBody(request);
+        BoundRequestBuilder boundRequestBuilder = 
mock(BoundRequestBuilder.class);
+        Response response = mock(Response.class);
+        ListenableFuture<Response> listenableFuture = 
mock(ListenableFuture.class);
+        
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Accept", 
"application/json")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Content-Type",
+                
"application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+        
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+        when(listenableFuture.get()).thenReturn(response);
+        when(response.getStatusCode()).thenReturn(200);
+        TokenResult tokenResult = new TokenResult();
+        tokenResult.setAccessToken("test-access-token");
+        tokenResult.setIdToken("test-id");
+        when(response.getResponseBodyAsBytes()).thenReturn(new 
Gson().toJson(tokenResult).getBytes());
+        TokenResult tr = tokenClient.exchangeClientCredentials(request);
+        assertNotNull(tr);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void exchangeTlsClientAuthSuccessTest() throws
+            IOException, TokenExchangeException, ExecutionException, 
InterruptedException {
+        DefaultAsyncHttpClient defaultAsyncHttpClient = 
mock(DefaultAsyncHttpClient.class);
+        URL url = new URL("http://localhost";);
+        TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+        ClientCredentialsExchangeRequest request = 
ClientCredentialsExchangeRequest.builder()
+                .clientId("test-client-id")
+                .audience("test-audience")
+                .scope("test-scope")
+                .authMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+                .build();
+        String body = tokenClient.buildClientCredentialsBody(request);

Review Comment:
   This TLS-client-auth test currently only asserts that the exchange returns a 
non-null token; it doesn’t verify the behavior change that `client_secret` must 
*not* be included for `tls_client_auth`. Consider asserting the generated 
request body does not contain `client_secret=` (and that it still contains 
`client_id`).



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java:
##########
@@ -59,7 +59,9 @@ String 
buildClientCredentialsBody(ClientCredentialsExchangeRequest req) {
         Map<String, String> bodyMap = new TreeMap<>();
         bodyMap.put("grant_type", "client_credentials");
         bodyMap.put("client_id", req.getClientId());
-        bodyMap.put("client_secret", req.getClientSecret());
+        if (req.getAuthMethod() == TokenEndpointAuthMethod.CLIENT_SECRET_POST) 
{
+            bodyMap.put("client_secret", req.getClientSecret());
+        }
         // Only set audience and scope if they are non-empty.

Review Comment:
   `buildClientCredentialsBody` assumes `req.getAuthMethod()` is non-null. If 
it’s null, `client_secret` won’t be sent (breaking client_secret_post) and 
`URLEncoder.encode(e.getValue(), …)` can throw if any value is null. Consider 
treating a null auth method as the default (client_secret_post) and/or 
validating required fields before encoding.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java:
##########
@@ -92,18 +94,35 @@ public static class ClientCredentialsBuilder {
 
         private URL issuerUrl;
         private URL credentialsUrl;
+        private TokenEndpointAuthMethod tokenEndpointAuthMethod = 
TokenEndpointAuthMethod.CLIENT_SECRET_POST;
+        private String clientId;
+        private String tlsCertFile;
+        private String tlsKeyFile;
         private String audience;
         private String scope;
         private Duration connectTimeout;
         private Duration readTimeout;
         private String trustCertsFilePath;
         private String wellKnownMetadataPath;
+        private Duration autoCertRefreshDuration;
         private double earlyTokenRefreshPercent = 
AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT;
         private ScheduledExecutorService scheduler;
 
         private ClientCredentialsBuilder() {
         }
 
+        /**
+         * Optional token endpoint auth method.
+         * Defaults to {@code client_secret_post}.
+         *
+         * @param tokenEndpointAuthMethod the token endpoint auth method
+         * @return the builder
+         */
+        public ClientCredentialsBuilder 
tokenEndpointAuthMethod(TokenEndpointAuthMethod tokenEndpointAuthMethod) {
+            this.tokenEndpointAuthMethod = tokenEndpointAuthMethod;

Review Comment:
   `tokenEndpointAuthMethod(...)` accepts null; if a caller passes null, 
`build()` will treat it as “not CLIENT_SECRET_POST” and attempt the TLS flow, 
failing later with a confusing missing-cert error. Consider rejecting null here 
(or defaulting it to `CLIENT_SECRET_POST`) to keep builder behavior 
deterministic.
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java:
##########
@@ -47,28 +55,38 @@ abstract class FlowBase implements Flow {
     public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout";
     public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout";
     public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = 
"trustCertsFilePath";
+    public static final String CONFIG_PARAM_CERT_FILE = "tlsCertFile";
+    public static final String CONFIG_PARAM_TLS_KEY_FILE = "tlsKeyFile";
+    public static final String CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION = 
"autoCertRefreshDuration";
     public static final String CONFIG_PARAM_WELL_KNOWN_METADATA_PATH = 
"wellKnownMetadataPath";
 
     protected static final Duration DEFAULT_CONNECT_TIMEOUT = 
Duration.ofSeconds(10);
     protected static final Duration DEFAULT_READ_TIMEOUT = 
Duration.ofSeconds(30);
+    protected static final Duration DEFAULT_AUTO_CERT_REFRESH_DURATION = 
Duration.ofSeconds(300);
 
     private static final long serialVersionUID = 1L;
 
     protected final URL issuerUrl;
     protected final AsyncHttpClient httpClient;
     protected final String wellKnownMetadataPath;
 
+    protected transient PulsarSslFactory sslFactory;
+    protected transient ScheduledExecutorService sslRefreshScheduler;
     protected transient Metadata metadata;
 
     protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath,
+                       String certFile, String keyFile, Duration 
autoCertRefreshDuration,
                        String wellKnownMetadataPath) {
         this.issuerUrl = issuerUrl;
-        this.httpClient = defaultHttpClient(readTimeout, connectTimeout, 
trustCertsFilePath);
+        this.httpClient = defaultHttpClient(readTimeout, connectTimeout, 
trustCertsFilePath, certFile, keyFile);
+        long autoCertRefreshSeconds = 
getParameterDurationToSeconds(CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION,
+                autoCertRefreshDuration, DEFAULT_AUTO_CERT_REFRESH_DURATION);
+        scheduleSslContextRefreshIfEnabled(autoCertRefreshSeconds);
         this.wellKnownMetadataPath = wellKnownMetadataPath;

Review Comment:
   The PR description mentions using a different HTTP client for metadata vs 
token exchange, but `FlowBase` creates a single `httpClient` (potentially with 
client certs) that is also used by `createMetadataResolver()`. Either update 
the PR description or split the clients so metadata retrieval doesn’t inherit 
mTLS/token-specific settings if that’s the intent.



##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java:
##########
@@ -110,6 +111,42 @@ public void testConfigureWithoutOptionalParams() throws 
Exception {
         assertNotNull(this.auth.flow);
     }
 
+    @Test
+    public void testConfigureWithTlsClientAuth() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        
params.put(AuthenticationOAuth2.CONFIG_PARAM_TOKEN_ENDPOINT_AUTH_METHOD,
+                TokenEndpointAuthMethod.TLS_CLIENT_AUTH.value());
+        params.put("clientId", "test-client");
+        params.put("tlsCertFile", "/path/to/cert.pem");
+        params.put("tlsKeyFile", "/path/to/key.pem");
+        params.put("issuerUrl", "http://localhost";);
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        OAuth2MockHttpClient.withMockedSslFactory(() -> {
+            this.auth.configure(authParams);
+            assertNotNull(this.auth.flow);
+            assertEquals(this.auth.flow.getClass(), TlsClientAuthFlow.class);
+        });
+    }
+
+    @Test
+    public void testConfigureCredentialsWithTlsValues() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        params.put("privateKey", "data:base64,e30=");
+        params.put("tlsCertFile", "/path/to/cert.pem");
+        params.put("tlsKeyFile", "/path/to/key.pem");
+        params.put("issuerUrl", "http://localhost";);
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        OAuth2MockHttpClient.withMockedSslFactory(() -> {
+            this.auth.configure(authParams);
+            assertNotNull(this.auth.flow);
+            assertEquals(this.auth.flow.getClass(), 
ClientCredentialsFlow.class);
+        });

Review Comment:
   This test configures `ClientCredentialsFlow` with TLS cert/key values 
(creating an AsyncHttpClient and possibly the refresh scheduler) but doesn’t 
close it. Please close `auth`/`flow` after the assertions to prevent leaking 
threads/resources across the test suite.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java:
##########
@@ -91,7 +133,40 @@ private AsyncHttpClient defaultHttpClient(Duration 
readTimeout, Duration connect
         return new DefaultAsyncHttpClient(confBuilder.build());
     }
 
+    private void scheduleSslContextRefreshIfEnabled(long refreshSeconds) {
+        if (sslFactory == null || refreshSeconds <= 0 || sslRefreshScheduler 
!= null) {
+            return;
+        }
+        sslRefreshScheduler = Executors.newSingleThreadScheduledExecutor(
+                new 
ExecutorProvider.ExtendedThreadFactory("oauth2-tls-cert-refresher"));
+        sslRefreshScheduler.scheduleAtFixedRate(this::refreshSslContext,

Review Comment:
   `scheduleAtFixedRate` can cause refresh runs to bunch up back-to-back if 
`refreshSslContext` ever takes longer than the interval. Consider using 
`scheduleWithFixedDelay` (as done in `HttpClient`) so the delay is measured 
after each refresh completes.
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java:
##########
@@ -42,4 +42,7 @@ public class ClientCredentialsExchangeRequest {
 
     @JsonProperty("scope")
     private String scope;
+
+    @JsonProperty("token_endpoint_auth_method")
+    private TokenEndpointAuthMethod authMethod;

Review Comment:
   `authMethod` is nullable on the Lombok builder; if a caller constructs a 
request without setting it, `TokenClient.buildClientCredentialsBody()` will 
omit `client_secret` and change behavior silently. Consider defaulting 
`authMethod` to `CLIENT_SECRET_POST` via `@Builder.Default` (and/or validating 
non-null) to keep legacy behavior predictable.
   



##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java:
##########
@@ -110,6 +111,42 @@ public void testConfigureWithoutOptionalParams() throws 
Exception {
         assertNotNull(this.auth.flow);
     }
 
+    @Test
+    public void testConfigureWithTlsClientAuth() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        
params.put(AuthenticationOAuth2.CONFIG_PARAM_TOKEN_ENDPOINT_AUTH_METHOD,
+                TokenEndpointAuthMethod.TLS_CLIENT_AUTH.value());
+        params.put("clientId", "test-client");
+        params.put("tlsCertFile", "/path/to/cert.pem");
+        params.put("tlsKeyFile", "/path/to/key.pem");
+        params.put("issuerUrl", "http://localhost";);
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        OAuth2MockHttpClient.withMockedSslFactory(() -> {
+            this.auth.configure(authParams);
+            assertNotNull(this.auth.flow);
+            assertEquals(this.auth.flow.getClass(), TlsClientAuthFlow.class);

Review Comment:
   This test configures a TLS-based flow (which now creates an AsyncHttpClient 
and potentially a cert-refresh scheduler thread) but never closes 
`auth`/`flow`. To avoid thread/resource leaks between tests, explicitly close 
the created flow (or `AuthenticationOAuth2`) in a `finally` block inside 
`withMockedSslFactory`.
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java:
##########
@@ -47,28 +55,38 @@ abstract class FlowBase implements Flow {
     public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout";
     public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout";
     public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = 
"trustCertsFilePath";
+    public static final String CONFIG_PARAM_CERT_FILE = "tlsCertFile";
+    public static final String CONFIG_PARAM_TLS_KEY_FILE = "tlsKeyFile";
+    public static final String CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION = 
"autoCertRefreshDuration";
     public static final String CONFIG_PARAM_WELL_KNOWN_METADATA_PATH = 
"wellKnownMetadataPath";
 
     protected static final Duration DEFAULT_CONNECT_TIMEOUT = 
Duration.ofSeconds(10);
     protected static final Duration DEFAULT_READ_TIMEOUT = 
Duration.ofSeconds(30);
+    protected static final Duration DEFAULT_AUTO_CERT_REFRESH_DURATION = 
Duration.ofSeconds(300);

Review Comment:
   `DEFAULT_AUTO_CERT_REFRESH_DURATION` enables periodic certificate refresh by 
default (5 minutes) whenever a cert/key is configured, which also creates a 
dedicated scheduler thread. This is a behavioral/operational change; consider 
defaulting to disabled (e.g., 0) and only scheduling refresh when the user 
explicitly configures `autoCertRefreshDuration`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to