wgtmac commented on code in PR #646:
URL: https://github.com/apache/iceberg-cpp/pull/646#discussion_r3247387566


##########
src/iceberg/catalog/rest/auth/auth_session.cc:
##########
@@ -44,6 +51,183 @@ class DefaultAuthSession : public AuthSession {
   std::unordered_map<std::string, std::string> headers_;
 };
 
+/// \brief OAuth2 session with automatic token refresh.
+class OAuth2AuthSession : public AuthSession,
+                          public 
std::enable_shared_from_this<OAuth2AuthSession> {
+ public:
+  struct Config {
+    std::string token_endpoint;
+    std::string client_id;
+    std::string client_secret;
+    std::string scope;
+    bool keep_refreshed;
+  };
+
+  /// \brief Create an OAuth2 session and optionally schedule refresh.
+  static std::shared_ptr<OAuth2AuthSession> Create(
+      const OAuthTokenResponse& initial_token, Config config, HttpClient& 
client) {
+    auto session = std::shared_ptr<OAuth2AuthSession>(
+        new OAuth2AuthSession(std::move(config), client));
+    session->SetInitialToken(initial_token);
+    return session;
+  }
+
+  Status Authenticate(std::unordered_map<std::string, std::string>& headers) 
override {
+    std::shared_lock lock(mutex_);
+    for (const auto& [key, value] : headers_) {
+      headers.insert_or_assign(key, value);
+    }
+    return {};
+  }
+
+  Status Close() override {
+    bool expected = false;
+    if (!closed_.compare_exchange_strong(expected, true)) {
+      return {};  // Already closed
+    }
+    TokenRefreshScheduler::Instance().Cancel(scheduled_task_id_.load());
+    return {};
+  }
+
+ private:
+  OAuth2AuthSession(Config config, HttpClient& client)
+      : config_(std::move(config)), client_(client) {}
+
+  void SetInitialToken(const OAuthTokenResponse& token_response) {
+    token_ = token_response.access_token;
+    headers_ = {{std::string(kAuthorizationHeader), std::string(kBearerPrefix) 
+ token_}};
+
+    // Determine expiration time
+    if (token_response.expires_in_secs.has_value()) {
+      expires_at_ = std::chrono::steady_clock::now() +
+                    std::chrono::seconds(*token_response.expires_in_secs);
+    } else if (auto exp_ms = ExpiresAtMillis(token_); exp_ms.has_value()) {
+      // Convert absolute epoch millis to steady_clock time_point
+      auto now_sys = std::chrono::system_clock::now();
+      auto now_steady = std::chrono::steady_clock::now();
+      auto exp_sys =
+          
std::chrono::system_clock::time_point(std::chrono::milliseconds(*exp_ms));
+      expires_at_ = now_steady + (exp_sys - now_sys);
+    }
+
+    if (config_.keep_refreshed &&
+        expires_at_ != std::chrono::steady_clock::time_point{}) {
+      ScheduleRefresh();
+    }
+  }
+
+  void DoRefresh() { DoRefreshAttempt(0, std::chrono::milliseconds(200)); }
+
+  /// \brief Single refresh attempt. On failure, schedules a retry via the
+  /// scheduler (non-blocking) instead of sleeping on the worker thread.
+  void DoRefreshAttempt(int attempt, std::chrono::milliseconds backoff) {
+    static constexpr int kMaxRetries = 5;
+    static constexpr auto kMaxBackoff = std::chrono::milliseconds(10'000);
+
+    if (closed_.load()) return;
+
+    // Build credential and properties once (invariant across retries)
+    std::string credential = config_.client_id.empty()
+                                 ? config_.client_secret
+                                 : config_.client_id + ":" + 
config_.client_secret;
+
+    // Use an empty session for the refresh request (no auth headers —
+    // avoids circular dependency of using an expired token to refresh itself)
+    auto empty_session = AuthSession::MakeDefault({});
+
+    AuthProperties props;
+    props.Set(AuthProperties::kCredential, credential);
+    props.Set(AuthProperties::kScope, config_.scope);
+    props.Set(AuthProperties::kOAuth2ServerUri, config_.token_endpoint);
+
+    auto result = FetchToken(client_, *empty_session, props);
+    if (result.has_value()) {
+      auto& response = result.value();
+      {
+        std::unique_lock lock(mutex_);
+        token_ = response.access_token;
+        headers_ = {
+            {std::string(kAuthorizationHeader), std::string(kBearerPrefix) + 
token_}};
+
+        // Update expiration
+        if (response.expires_in_secs.has_value()) {

Review Comment:
   Reset `expires_at_` before deriving the new expiry, and skip scheduling when 
the refreshed token has neither `expires_in` nor JWT `exp`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to