Copilot commented on code in PR #25538:
URL: https://github.com/apache/pulsar/pull/25538#discussion_r3111593091
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java:
##########
@@ -91,7 +127,40 @@ private AsyncHttpClient defaultHttpClient(Duration
readTimeout, Duration connect
return new DefaultAsyncHttpClient(confBuilder.build());
}
+ protected void scheduleSslContextRefreshIfEnabled(long refreshSeconds) {
+ if (sslFactory == null || refreshSeconds <= 0 || sslRefreshScheduler
!= null) {
+ return;
+ }
+ sslRefreshScheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("oauth2-tls-cert-refresher"));
Review Comment:
FlowBase schedules TLS context refresh using
`Executors.newSingleThreadScheduledExecutor(new
ExecutorProvider.ExtendedThreadFactory("oauth2-tls-cert-refresher"))`.
`ExtendedThreadFactory` defaults to non-daemon threads, which can prevent JVM
shutdown if flows aren’t closed promptly. Consider creating daemon threads
(pass `true`) and/or reusing a shared scheduler like AuthenticationOAuth2 does
for token refresh to avoid one thread per client instance.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java:
##########
@@ -155,6 +223,9 @@ static Duration parseParameterDuration(Map<String, String>
params, String name)
@Override
public void close() throws Exception {
+ if (sslRefreshScheduler != null) {
+ sslRefreshScheduler.shutdownNow();
+ }
httpClient.close();
Review Comment:
FlowBase.close() shuts down the refresh scheduler and closes the
AsyncHttpClient, but it never closes `sslFactory` (PulsarSslFactory is
AutoCloseable). This can leak file watchers/keystore resources and leave
background refresh running concurrently with shutdown. Please close
`sslFactory` (and consider awaiting scheduler termination) when it was created.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java:
##########
@@ -59,7 +59,9 @@ String
buildClientCredentialsBody(ClientCredentialsExchangeRequest req) {
Map<String, String> bodyMap = new TreeMap<>();
bodyMap.put("grant_type", "client_credentials");
bodyMap.put("client_id", req.getClientId());
- bodyMap.put("client_secret", req.getClientSecret());
+ if (req.getAuthMethod() == TokenEndpointAuthMethod.CLIENT_SECRET_POST)
{
Review Comment:
TokenClient.buildClientCredentialsBody() now includes `client_secret` only
when `req.getAuthMethod() == CLIENT_SECRET_POST`. If `authMethod` is left unset
(null) by a caller, the request will be built without a client_secret and fail
authentication (this is a behavior change vs. previous versions). Consider
defaulting null to CLIENT_SECRET_POST (or validating that authMethod is
provided) to keep backward-compatible behavior for existing callers of this
public class.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java:
##########
@@ -47,28 +55,38 @@ abstract class FlowBase implements Flow {
public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout";
public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout";
public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH =
"trustCertsFilePath";
+ public static final String CONFIG_PARAM_CERT_FILE = "tlsCertFile";
+ public static final String CONFIG_PARAM_KEY_FILE = "tlsKeyFile";
+ public static final String CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION =
"autoCertRefreshDuration";
public static final String CONFIG_PARAM_WELL_KNOWN_METADATA_PATH =
"wellKnownMetadataPath";
protected static final Duration DEFAULT_CONNECT_TIMEOUT =
Duration.ofSeconds(10);
protected static final Duration DEFAULT_READ_TIMEOUT =
Duration.ofSeconds(30);
+ protected static final Duration DEFAULT_AUTO_CERT_REFRESH_DURATION =
Duration.ofSeconds(300);
private static final long serialVersionUID = 1L;
protected final URL issuerUrl;
protected final AsyncHttpClient httpClient;
protected final String wellKnownMetadataPath;
+ protected transient PulsarSslFactory sslFactory;
+ protected transient ScheduledExecutorService sslRefreshScheduler;
protected transient Metadata metadata;
protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration
readTimeout, String trustCertsFilePath,
+ String certFile, String keyFile, Duration
autoCertRefreshDuration,
String wellKnownMetadataPath) {
this.issuerUrl = issuerUrl;
- this.httpClient = defaultHttpClient(readTimeout, connectTimeout,
trustCertsFilePath);
+ this.httpClient = defaultHttpClient(readTimeout, connectTimeout,
trustCertsFilePath, certFile, keyFile);
+ long autoCertRefreshSeconds =
getParameterDurationToSeconds(CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION,
+ autoCertRefreshDuration, DEFAULT_AUTO_CERT_REFRESH_DURATION);
+ scheduleSslContextRefreshIfEnabled(autoCertRefreshSeconds);
Review Comment:
FlowBase schedules TLS certificate refresh using a default refresh interval
even when the user didn’t specify `autoCertRefreshDuration` (since a default is
applied before calling scheduleSslContextRefreshIfEnabled). This introduces a
background thread + periodic work for any mTLS-configured client. Consider
making refresh opt-in (default disabled) or using a shared scheduler to reduce
per-client overhead.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java:
##########
@@ -81,12 +84,15 @@ public ClientCredentialsFlow(URL issuerUrl, String
audience, String privateKey,
public static ClientCredentialsFlow fromParameters(Map<String, String>
params) {
URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
String privateKeyUrl = parseParameterString(params,
CONFIG_PARAM_KEY_FILE);
- // These are optional parameters, so we only perform a get
+ // These are optional parameters, so we allow null values
String scope = params.get(CONFIG_PARAM_SCOPE);
String audience = params.get(CONFIG_PARAM_AUDIENCE);
Duration connectTimeout = parseParameterDuration(params,
CONFIG_PARAM_CONNECT_TIMEOUT);
Duration readTimeout = parseParameterDuration(params,
CONFIG_PARAM_READ_TIMEOUT);
String trustCertsFilePath =
params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);
+ String certFile = params.get(CONFIG_PARAM_CERT_FILE);
+ String keyFile = params.get(CONFIG_PARAM_KEY_FILE);
Review Comment:
ClientCredentialsFlow.fromParameters() reads `keyFile` using
`CONFIG_PARAM_KEY_FILE`, but in this class that constant is "privateKey" (not
the TLS key file param). This causes the TLS key path passed to FlowBase to
incorrectly use the privateKey URL value, which can break TLS client cert setup
whenever `tlsCertFile` is provided. Please read the TLS key path using
FlowBase's TLS key parameter (e.g., `FlowBase.CONFIG_PARAM_KEY_FILE`) and keep
the private key param separate.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java:
##########
@@ -79,7 +97,25 @@ private AsyncHttpClient defaultHttpClient(Duration
readTimeout, Duration connect
confBuilder.setReadTimeout(
getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT,
readTimeout, DEFAULT_READ_TIMEOUT));
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
- if (StringUtils.isNotBlank(trustCertsFilePath)) {
+ if (StringUtils.isNotBlank(certFile) &&
StringUtils.isNotBlank(keyFile)) {
+ try {
+ PulsarSslConfiguration sslConfiguration =
PulsarSslConfiguration.builder()
+ .tlsCertificateFilePath(certFile)
+ .tlsKeyFilePath(keyFile)
Review Comment:
FlowBase.defaultHttpClient() only enables mTLS when both `certFile` and
`keyFile` are non-blank; if a user accidentally provides only one of them, the
code silently ignores it and proceeds without client cert auth. That makes
misconfiguration hard to diagnose. Please fail fast (IllegalArgumentException)
when exactly one of `certFile`/`keyFile` is set (or at least log a clear
warning).
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenEndpointAuthMethod.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+public enum TokenEndpointAuthMethod {
+ CLIENT_SECRET_POST("client_secret_post"),
+ TLS_CLIENT_AUTH("tls_client_auth");
+
+ private final String value;
+
+ TokenEndpointAuthMethod(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
+ }
+
+ public static TokenEndpointAuthMethod fromValue(String value) {
Review Comment:
TokenEndpointAuthMethod.fromValue() will throw a NullPointerException if
`value` is null (because of `equalsIgnoreCase`). Since this is a public method,
it’s safer to handle null explicitly and throw an IllegalArgumentException with
a clear message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]