nodece commented on code in PR #25363:
URL: https://github.com/apache/pulsar/pull/25363#discussion_r2963798587
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java:
##########
@@ -96,21 +228,111 @@ public void start() throws PulsarClientException {
flow.initialize();
}
+ /**
+ * The first time that this method is called, it retrieves a token. All
subsequent
+ * calls should get a cached value. However, if there is an issue with the
Identity
+ * Provider, there is a chance that the background thread responsible for
keeping
+ * the refresh token hot will
+ * @return The authentication data identifying this client that will be
sent to the broker
+ * @throws PulsarClientException
+ */
@Override
public synchronized AuthenticationDataProvider getAuthData() throws
PulsarClientException {
+ if (isClosed) {
+ throw new
PulsarClientException.AlreadyClosedException("Authentication already closed.");
+ }
if (this.cachedToken == null || this.cachedToken.isExpired()) {
- TokenResult tr = this.flow.authenticate();
- this.cachedToken = new CachedToken(tr);
+ this.authenticate();
}
return this.cachedToken.getAuthData();
}
+ /**
+ * Retrieve the token (synchronously), and then schedule refresh runnable.
+ */
+ private void authenticate() throws PulsarClientException {
+ if (log.isDebugEnabled()) {
+ log.debug("Attempting to retrieve OAuth2 token now.");
+ }
+ TokenResult tr = this.flow.authenticate();
+ this.cachedToken = new CachedToken(tr);
+ handleSuccessfulTokenRefresh();
+ }
+
+ /**
+ * When we successfully get a token, we need to schedule the next attempt
to refresh it.
+ * This is done completely based on the "expires_in" value returned by the
identity provider.
+ * The code is run on the single scheduler thread in order to ensure that
the backoff and the nextRefreshAttempt are
+ * updated safely.
+ */
+ private void handleSuccessfulTokenRefresh() {
+ if (scheduler != null) {
+ scheduler.execute(() -> {
+ if (earlyTokenRefreshPercent < 1) {
+ backoff = buildBackoff(cachedToken.latest.getExpiresIn());
+ long expiresInMillis =
TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn());
+ scheduleRefresh((long) (expiresInMillis *
earlyTokenRefreshPercent));
+ }
+ });
+ }
Review Comment:
```suggestion
if (scheduler != null && earlyTokenRefreshPercent < 1) {
scheduler.execute(() -> {
backoff = buildBackoff(cachedToken.latest.getExpiresIn());
long expiresInMillis =
TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn());
scheduleRefresh((long) (expiresInMillis *
earlyTokenRefreshPercent));
});
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]