nodece commented on code in PR #25363:
URL: https://github.com/apache/pulsar/pull/25363#discussion_r2963798587


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java:
##########
@@ -96,21 +228,111 @@ public void start() throws PulsarClientException {
         flow.initialize();
     }
 
+    /**
+     * The first time that this method is called, it retrieves a token. All 
subsequent
+     * calls should get a cached value. However, if there is an issue with the 
Identity
+     * Provider, there is a chance that the background thread responsible for 
keeping
+     * the refresh token hot will
+     * @return The authentication data identifying this client that will be 
sent to the broker
+     * @throws PulsarClientException
+     */
     @Override
     public synchronized AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
+        if (isClosed) {
+            throw new 
PulsarClientException.AlreadyClosedException("Authentication already closed.");
+        }
         if (this.cachedToken == null || this.cachedToken.isExpired()) {
-            TokenResult tr = this.flow.authenticate();
-            this.cachedToken = new CachedToken(tr);
+            this.authenticate();
         }
         return this.cachedToken.getAuthData();
     }
 
+    /**
+     * Retrieve the token (synchronously), and then schedule refresh runnable.
+     */
+    private void authenticate() throws PulsarClientException {
+        if (log.isDebugEnabled()) {
+            log.debug("Attempting to retrieve OAuth2 token now.");
+        }
+        TokenResult tr = this.flow.authenticate();
+        this.cachedToken = new CachedToken(tr);
+        handleSuccessfulTokenRefresh();
+    }
+
+    /**
+     * When we successfully get a token, we need to schedule the next attempt 
to refresh it.
+     * This is done completely based on the "expires_in" value returned by the 
identity provider.
+     * The code is run on the single scheduler thread in order to ensure that 
the backoff and the nextRefreshAttempt are
+     * updated safely.
+     */
+    private void handleSuccessfulTokenRefresh() {
+        if (scheduler != null) {
+            scheduler.execute(() -> {
+                if (earlyTokenRefreshPercent < 1) {
+                    backoff = buildBackoff(cachedToken.latest.getExpiresIn());
+                    long expiresInMillis = 
TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn());
+                    scheduleRefresh((long) (expiresInMillis * 
earlyTokenRefreshPercent));
+                }
+            });
+        }

Review Comment:
   ```suggestion
           if (scheduler != null && earlyTokenRefreshPercent < 1) {
               scheduler.execute(() -> {
                     backoff = buildBackoff(cachedToken.latest.getExpiresIn());
                     long expiresInMillis = 
TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn());
                     scheduleRefresh((long) (expiresInMillis * 
earlyTokenRefreshPercent));
               });
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to