nastra commented on code in PR #10753:
URL: https://github.com/apache/iceberg/pull/10753#discussion_r1870946124


##########
core/src/test/java/org/apache/iceberg/rest/TestRESTUtil.java:
##########
@@ -139,4 +148,100 @@ public void testOAuth2FormDataDecoding() {
 
     assertThat(RESTUtil.decodeFormData(formString)).isEqualTo(expected);
   }
+
+  @ParameterizedTest
+  @MethodSource
+  public void buildRequestUri(HTTPRequest request, URI expected) {
+    assertThat(RESTUtil.buildRequestUri(request)).isEqualTo(expected);
+  }
+
+  public static Stream<Arguments> buildRequestUri() {

Review Comment:
   ```suggestion
     public static Stream<Arguments> validRequestUris() {
   ```



##########
core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java:
##########
@@ -69,4 +70,43 @@ default String oauth2ServerUri() {
   static ImmutableAuthConfig.Builder builder() {
     return ImmutableAuthConfig.builder();
   }
+
+  static AuthConfig fromProperties(Map<String, String> properties) {
+    String scope = properties.getOrDefault(OAuth2Properties.SCOPE, 
OAuth2Properties.CATALOG_SCOPE);
+    Map<String, String> optionalOAuthParams = 
OAuth2Util.buildOptionalParam(properties);
+    String oauth2ServerUri =
+        properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, 
ResourcePaths.tokens());
+    boolean keepRefreshed =
+        PropertyUtil.propertyAsBoolean(
+            properties,
+            OAuth2Properties.TOKEN_REFRESH_ENABLED,
+            OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT);
+    return builder()
+        .credential(properties.get(OAuth2Properties.CREDENTIAL))
+        .token(properties.get(OAuth2Properties.TOKEN))
+        .scope(scope)
+        .oauth2ServerUri(oauth2ServerUri)
+        .optionalOAuthParams(optionalOAuthParams)
+        .keepRefreshed(keepRefreshed)
+        .expiresAtMillis(expiresAtMillis(properties))
+        .build();
+  }
+
+  private static Long expiresAtMillis(Map<String, String> props) {
+    Long expiresInMillis = null;

Review Comment:
   this should be named `expiresAtMillis`, because that's what we're returning 
here



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());

Review Comment:
   the previous impl was passing `RESTUtil.merge(configHeaders(props), 
OAuth2Util.authHeaders(initToken));` as the init headers: 
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L244.
 Don't we need to do the same here?



##########
core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java:
##########
@@ -69,4 +70,43 @@ default String oauth2ServerUri() {
   static ImmutableAuthConfig.Builder builder() {
     return ImmutableAuthConfig.builder();
   }
+
+  static AuthConfig fromProperties(Map<String, String> properties) {
+    String scope = properties.getOrDefault(OAuth2Properties.SCOPE, 
OAuth2Properties.CATALOG_SCOPE);
+    Map<String, String> optionalOAuthParams = 
OAuth2Util.buildOptionalParam(properties);
+    String oauth2ServerUri =
+        properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, 
ResourcePaths.tokens());
+    boolean keepRefreshed =
+        PropertyUtil.propertyAsBoolean(
+            properties,
+            OAuth2Properties.TOKEN_REFRESH_ENABLED,
+            OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT);
+    return builder()
+        .credential(properties.get(OAuth2Properties.CREDENTIAL))
+        .token(properties.get(OAuth2Properties.TOKEN))
+        .scope(scope)
+        .oauth2ServerUri(oauth2ServerUri)
+        .optionalOAuthParams(optionalOAuthParams)
+        .keepRefreshed(keepRefreshed)
+        .expiresAtMillis(expiresAtMillis(properties))
+        .build();
+  }
+
+  private static Long expiresAtMillis(Map<String, String> props) {
+    Long expiresInMillis = null;
+    if (props.containsKey(OAuth2Properties.TOKEN)) {
+      expiresInMillis = 
OAuth2Util.expiresAtMillis(props.get(OAuth2Properties.TOKEN));
+    }

Review Comment:
   minor: newline after } here and in other places



##########
core/src/main/java/org/apache/iceberg/rest/auth/AuthSessionCache.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.time.Duration;
+import java.util.function.Supplier;
+
+/** A cache for {@link AuthSession} instances. */
+public class AuthSessionCache implements AutoCloseable {
+
+  private final Duration sessionTimeout;
+  private volatile Cache<String, AuthSession> sessionCache;
+
+  public AuthSessionCache(Duration sessionTimeout) {
+    this.sessionTimeout = sessionTimeout;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends AuthSession> T cachedSession(String key, Supplier<T> 
loader) {
+    return (T) sessionCache().get(key, k -> loader.get());
+  }
+
+  @Override
+  public void close() {
+    Cache<String, AuthSession> cache = sessionCache;
+    try {
+      if (cache != null) {
+        cache.invalidateAll();
+        cache.cleanUp();
+      }
+    } finally {
+      this.sessionCache = null;

Review Comment:
   why not set this to null in L45 so that we don't need the try-finally?



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {
+      // keep track of the start time for token refresh
+      this.startTimeMillis = System.currentTimeMillis();
+      this.authResponse =
+          OAuth2Util.fetchToken(
+              initClient,
+              headers,
+              config.credential(),
+              config.scope(),
+              config.oauth2ServerUri(),
+              config.optionalOAuthParams());
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          initClient, null, authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          initClient, null, config.token(), null, session);

Review Comment:
   I'm assuming we're not scheduling refreshes here because it's an init 
session right?



##########
core/src/test/java/org/apache/iceberg/rest/auth/TestAuthManagers.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Map;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TestAuthManagers {

Review Comment:
   this also needs a test where the auth type being passed is an unknown string 
or something: `AuthManagers.loadAuthManager("test", 
Map.of(AuthProperties.AUTH_TYPE, "unknown"))`



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java:
##########
@@ -725,15 +736,15 @@ private static AuthSession fromTokenResponse(
       if (issuedTokenType == null) {
         issuedTokenType = OAuth2Properties.ACCESS_TOKEN_TYPE;
       }
-      AuthSession session =
-          new AuthSession(
-              parent.headers(),
-              AuthConfig.builder()
-                  .from(parent.config())
-                  .token(response.token())
-                  .tokenType(issuedTokenType)
-                  .credential(credential)
-                  .build());
+      Map<String, String> headers = RESTUtil.merge(parent.headers(), 
authHeaders(response.token()));
+      AuthConfig config =

Review Comment:
   and I would also inline the headers here and in the code further above



##########
core/src/main/java/org/apache/iceberg/rest/auth/BasicAuthManager.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.RESTClient;
+
+/** An auth manager that adds static BASIC authentication data to outgoing 
HTTP requests. */
+public final class BasicAuthManager implements AuthManager {
+
+  @Override
+  public AuthSession catalogSession(RESTClient sharedClient, Map<String, 
String> properties) {
+    Preconditions.checkArgument(
+        properties.containsKey(AuthProperties.BASIC_USERNAME),
+        "Property %s is required",

Review Comment:
   what about `Invalid username: missing required property ...`



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTUtil.java:
##########
@@ -139,4 +148,100 @@ public void testOAuth2FormDataDecoding() {
 
     assertThat(RESTUtil.decodeFormData(formString)).isEqualTo(expected);
   }
+
+  @ParameterizedTest
+  @MethodSource

Review Comment:
   we typically use explicit method names here rather than naming the argument 
method and the test method the same. 
   ```suggestion
     @MethodSource("validRequestUris")
   ```



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTUtil.java:
##########
@@ -139,4 +148,100 @@ public void testOAuth2FormDataDecoding() {
 
     assertThat(RESTUtil.decodeFormData(formString)).isEqualTo(expected);
   }
+
+  @ParameterizedTest
+  @MethodSource
+  public void buildRequestUri(HTTPRequest request, URI expected) {
+    assertThat(RESTUtil.buildRequestUri(request)).isEqualTo(expected);
+  }
+
+  public static Stream<Arguments> buildRequestUri() {
+    return Stream.of(
+        Arguments.of(
+            HTTPRequest.builder()
+                .baseUri(URI.create("http://localhost:8080/foo";))
+                .method(HTTPMethod.GET)
+                .path("v1/namespaces/ns/tables/") // trailing slash should be 
removed
+                .setParameter("pageToken", "1234")
+                .setParameter("pageSize", "10")
+                .build(),
+            URI.create(
+                
"http://localhost:8080/foo/v1/namespaces/ns/tables?pageToken=1234&pageSize=10";)),
+        Arguments.of(
+            HTTPRequest.builder()
+                .baseUri(URI.create("http://localhost:8080/foo";))
+                .method(HTTPMethod.GET)
+                .path("https://authserver.com/token";) // absolute path HTTPS
+                .build(),
+            URI.create("https://authserver.com/token";)),
+        Arguments.of(
+            HTTPRequest.builder()
+                .baseUri(URI.create("http://localhost:8080/foo";))
+                .method(HTTPMethod.GET)
+                .path("http://authserver.com/token";) // absolute path HTTP
+                .build(),
+            URI.create("http://authserver.com/token";)));
+  }
+
+  @Test
+  public void buildRequestUriFailures() {
+    HTTPRequest request =
+        HTTPRequest.builder()
+            .baseUri(URI.create("http://localhost";))
+            .method(HTTPMethod.GET)
+            .path("/v1/namespaces") // wrong leading slash
+            .build();
+    assertThatThrownBy(() -> RESTUtil.buildRequestUri(request))
+        .isInstanceOf(RESTException.class)
+        .hasMessageContaining("Paths should not start with /");
+    HTTPRequest request2 =
+        HTTPRequest.builder()
+            .baseUri(URI.create("http://localhost";))
+            .method(HTTPMethod.GET)
+            .path(" not a valid path") // wrong path
+            .build();
+    assertThatThrownBy(() -> RESTUtil.buildRequestUri(request2))
+        .isInstanceOf(RESTException.class)
+        .hasMessageContaining("Failed to create request URI");

Review Comment:
   ```suggestion
           .hasMessage(
               "Failed to create request URI from base http://localhost/ not a 
valid path, params {}");
   ```



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTUtil.java:
##########
@@ -139,4 +148,100 @@ public void testOAuth2FormDataDecoding() {
 
     assertThat(RESTUtil.decodeFormData(formString)).isEqualTo(expected);
   }
+
+  @ParameterizedTest
+  @MethodSource
+  public void buildRequestUri(HTTPRequest request, URI expected) {
+    assertThat(RESTUtil.buildRequestUri(request)).isEqualTo(expected);
+  }
+
+  public static Stream<Arguments> buildRequestUri() {
+    return Stream.of(
+        Arguments.of(
+            HTTPRequest.builder()
+                .baseUri(URI.create("http://localhost:8080/foo";))
+                .method(HTTPMethod.GET)
+                .path("v1/namespaces/ns/tables/") // trailing slash should be 
removed
+                .setParameter("pageToken", "1234")
+                .setParameter("pageSize", "10")
+                .build(),
+            URI.create(
+                
"http://localhost:8080/foo/v1/namespaces/ns/tables?pageToken=1234&pageSize=10";)),
+        Arguments.of(
+            HTTPRequest.builder()
+                .baseUri(URI.create("http://localhost:8080/foo";))
+                .method(HTTPMethod.GET)
+                .path("https://authserver.com/token";) // absolute path HTTPS
+                .build(),
+            URI.create("https://authserver.com/token";)),
+        Arguments.of(
+            HTTPRequest.builder()
+                .baseUri(URI.create("http://localhost:8080/foo";))
+                .method(HTTPMethod.GET)
+                .path("http://authserver.com/token";) // absolute path HTTP
+                .build(),
+            URI.create("http://authserver.com/token";)));
+  }
+
+  @Test
+  public void buildRequestUriFailures() {
+    HTTPRequest request =
+        HTTPRequest.builder()
+            .baseUri(URI.create("http://localhost";))
+            .method(HTTPMethod.GET)
+            .path("/v1/namespaces") // wrong leading slash
+            .build();
+    assertThatThrownBy(() -> RESTUtil.buildRequestUri(request))
+        .isInstanceOf(RESTException.class)
+        .hasMessageContaining("Paths should not start with /");

Review Comment:
   ```suggestion
           .hasMessage(
               "Received a malformed path for a REST request: /v1/namespaces. 
Paths should not start with /");
   ```
   if possible, we should always try to assert against the full msg. This also 
helps in seeing how the entire error msg will look like when this happens



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {
+      // keep track of the start time for token refresh
+      this.startTimeMillis = System.currentTimeMillis();
+      this.authResponse =
+          OAuth2Util.fetchToken(
+              initClient,
+              headers,
+              config.credential(),
+              config.scope(),
+              config.oauth2ServerUri(),
+              config.optionalOAuthParams());
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          initClient, null, authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          initClient, null, config.token(), null, session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession catalogSession(
+      RESTClient sharedClient, Map<String, String> properties) {
+    this.client = sharedClient;
+    this.sessionCache = new AuthSessionCache(sessionTimeout(properties));
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());

Review Comment:
   the old impl was merging config headers here too



##########
core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java:
##########
@@ -69,4 +70,43 @@ default String oauth2ServerUri() {
   static ImmutableAuthConfig.Builder builder() {
     return ImmutableAuthConfig.builder();
   }
+
+  static AuthConfig fromProperties(Map<String, String> properties) {
+    String scope = properties.getOrDefault(OAuth2Properties.SCOPE, 
OAuth2Properties.CATALOG_SCOPE);

Review Comment:
   I think all of these can be just inlined



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {
+      // keep track of the start time for token refresh
+      this.startTimeMillis = System.currentTimeMillis();
+      this.authResponse =
+          OAuth2Util.fetchToken(
+              initClient,
+              headers,
+              config.credential(),
+              config.scope(),
+              config.oauth2ServerUri(),
+              config.optionalOAuthParams());
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          initClient, null, authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          initClient, null, config.token(), null, session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession catalogSession(
+      RESTClient sharedClient, Map<String, String> properties) {
+    this.client = sharedClient;
+    this.sessionCache = new AuthSessionCache(sessionTimeout(properties));
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    setKeepRefreshed(config.keepRefreshed());
+    if (authResponse != null /* from the pre-config phase */) {
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          client, refreshExecutor(), authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          client, refreshExecutor(), config.token(), config.expiresAtMillis(), 
session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession contextualSession(
+      SessionCatalog.SessionContext context, AuthSession parent) {
+    return maybeCreateChildSession(
+        context.credentials(),
+        context.properties(),
+        ignored -> context.sessionId(),
+        (OAuth2Util.AuthSession) parent);
+  }
+
+  @Override
+  public OAuth2Util.AuthSession tableSession(
+      TableIdentifier table, Map<String, String> properties, AuthSession 
parent) {
+    return maybeCreateChildSession(
+        Maps.filterKeys(properties, TABLE_SESSION_ALLOW_LIST::contains),
+        properties,
+        properties::get,
+        (OAuth2Util.AuthSession) parent);
+  }
+
+  @Override
+  public void close() {
+    try {
+      super.close();
+    } finally {
+      try {
+        AuthSessionCache cache = sessionCache;
+        if (cache != null) {
+          cache.close();
+        }
+      } finally {
+        sessionCache = null;
+      }
+    }
+  }
+
+  protected OAuth2Util.AuthSession maybeCreateChildSession(
+      Map<String, String> credentials,
+      Map<String, String> properties,
+      Function<String, String> cacheKeyFunc,
+      OAuth2Util.AuthSession parent) {
+    if (credentials != null) {
+      // use the bearer token without exchanging
+      if (credentials.containsKey(OAuth2Properties.TOKEN)) {
+        String token = credentials.get(OAuth2Properties.TOKEN);
+        return sessionCache.cachedSession(
+            cacheKeyFunc.apply(OAuth2Properties.TOKEN),
+            () -> newSessionFromAccessToken(token, properties, parent));
+      }
+
+      if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) {
+        // fetch a token using the client credentials flow
+        String credential = credentials.get(OAuth2Properties.CREDENTIAL);
+        return sessionCache.cachedSession(
+            cacheKeyFunc.apply(OAuth2Properties.CREDENTIAL),
+            () -> newSessionFromCredential(credential, parent));
+      }
+
+      for (String tokenType : TOKEN_PREFERENCE_ORDER) {
+        if (credentials.containsKey(tokenType)) {
+          // exchange the token for an access token using the token exchange 
flow
+          String token = credentials.get(tokenType);
+          return sessionCache.cachedSession(
+              cacheKeyFunc.apply(tokenType),
+              () -> newSessionFromTokenExchange(token, tokenType, parent));
+        }
+      }
+    }
+    return parent;
+  }
+
+  protected OAuth2Util.AuthSession newSessionFromAccessToken(
+      String token, Map<String, String> properties, OAuth2Util.AuthSession 
parent) {
+    Long expiresAtMillis = 
AuthConfig.fromProperties(properties).expiresAtMillis();
+    return OAuth2Util.AuthSession.fromAccessToken(
+        client, refreshExecutor(), token, expiresAtMillis, parent);
+  }
+
+  protected OAuth2Util.AuthSession newSessionFromCredential(
+      String credential, OAuth2Util.AuthSession parent) {
+    return OAuth2Util.AuthSession.fromCredential(client, refreshExecutor(), 
credential, parent);
+  }
+
+  protected OAuth2Util.AuthSession newSessionFromTokenExchange(
+      String token, String tokenType, OAuth2Util.AuthSession parent) {
+    return OAuth2Util.AuthSession.fromTokenExchange(
+        client, refreshExecutor(), token, tokenType, parent);
+  }
+
+  private static void warnIfDeprecatedTokenEndpointUsed(Map<String, String> 
properties) {
+    if (usesDeprecatedTokenEndpoint(properties)) {
+      String credential = properties.get(OAuth2Properties.CREDENTIAL);
+      String initToken = properties.get(OAuth2Properties.TOKEN);
+      boolean hasCredential = credential != null && !credential.isEmpty();
+      boolean hasInitToken = initToken != null;
+      if (hasInitToken || hasCredential) {
+        LOG.warn(
+            "Iceberg REST client is missing the OAuth2 server URI 
configuration and defaults to {}/{}. "
+                + "This automatic fallback will be removed in a future Iceberg 
release."
+                + "It is recommended to configure the OAuth2 endpoint using 
the '{}' property to be prepared. "
+                + "This warning will disappear if the OAuth2 endpoint is 
explicitly configured. "
+                + "See https://github.com/apache/iceberg/issues/10537";,
+            RESTUtil.stripTrailingSlash(properties.get(CatalogProperties.URI)),
+            ResourcePaths.tokens(),
+            OAuth2Properties.OAUTH2_SERVER_URI);
+      }
+    }
+  }
+
+  private static boolean usesDeprecatedTokenEndpoint(Map<String, String> 
properties) {
+    if (properties.containsKey(OAuth2Properties.OAUTH2_SERVER_URI)) {
+      String oauth2ServerUri = 
properties.get(OAuth2Properties.OAUTH2_SERVER_URI);
+      boolean relativePath = !oauth2ServerUri.startsWith("http");
+      boolean sameHost = 
oauth2ServerUri.startsWith(properties.get(CatalogProperties.URI));
+      return relativePath || sameHost;
+    }
+    return true;
+  }
+
+  private static Duration sessionTimeout(Map<String, String> props) {
+    long expirationIntervalMs =

Review Comment:
   can be inlined



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {
+      // keep track of the start time for token refresh
+      this.startTimeMillis = System.currentTimeMillis();
+      this.authResponse =
+          OAuth2Util.fetchToken(
+              initClient,
+              headers,
+              config.credential(),
+              config.scope(),
+              config.oauth2ServerUri(),
+              config.optionalOAuthParams());
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          initClient, null, authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          initClient, null, config.token(), null, session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession catalogSession(
+      RESTClient sharedClient, Map<String, String> properties) {
+    this.client = sharedClient;
+    this.sessionCache = new AuthSessionCache(sessionTimeout(properties));
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    setKeepRefreshed(config.keepRefreshed());
+    if (authResponse != null /* from the pre-config phase */) {

Review Comment:
   I'd rather put the comment on the previous line and use //



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {

Review Comment:
   I believe the old code was checking `credential != null && 
!credential.isEmpty()` so we might want to do the same here?



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {
+      // keep track of the start time for token refresh
+      this.startTimeMillis = System.currentTimeMillis();
+      this.authResponse =
+          OAuth2Util.fetchToken(
+              initClient,
+              headers,
+              config.credential(),
+              config.scope(),
+              config.oauth2ServerUri(),
+              config.optionalOAuthParams());
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          initClient, null, authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          initClient, null, config.token(), null, session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession catalogSession(
+      RESTClient sharedClient, Map<String, String> properties) {
+    this.client = sharedClient;
+    this.sessionCache = new AuthSessionCache(sessionTimeout(properties));
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    setKeepRefreshed(config.keepRefreshed());
+    if (authResponse != null /* from the pre-config phase */) {
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          client, refreshExecutor(), authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          client, refreshExecutor(), config.token(), config.expiresAtMillis(), 
session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession contextualSession(
+      SessionCatalog.SessionContext context, AuthSession parent) {
+    return maybeCreateChildSession(
+        context.credentials(),
+        context.properties(),
+        ignored -> context.sessionId(),
+        (OAuth2Util.AuthSession) parent);
+  }
+
+  @Override
+  public OAuth2Util.AuthSession tableSession(
+      TableIdentifier table, Map<String, String> properties, AuthSession 
parent) {
+    return maybeCreateChildSession(
+        Maps.filterKeys(properties, TABLE_SESSION_ALLOW_LIST::contains),
+        properties,
+        properties::get,
+        (OAuth2Util.AuthSession) parent);
+  }
+
+  @Override
+  public void close() {
+    try {
+      super.close();
+    } finally {
+      try {
+        AuthSessionCache cache = sessionCache;
+        if (cache != null) {
+          cache.close();
+        }
+      } finally {
+        sessionCache = null;
+      }
+    }
+  }
+
+  protected OAuth2Util.AuthSession maybeCreateChildSession(
+      Map<String, String> credentials,
+      Map<String, String> properties,
+      Function<String, String> cacheKeyFunc,
+      OAuth2Util.AuthSession parent) {
+    if (credentials != null) {
+      // use the bearer token without exchanging
+      if (credentials.containsKey(OAuth2Properties.TOKEN)) {
+        String token = credentials.get(OAuth2Properties.TOKEN);
+        return sessionCache.cachedSession(
+            cacheKeyFunc.apply(OAuth2Properties.TOKEN),
+            () -> newSessionFromAccessToken(token, properties, parent));
+      }
+
+      if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) {
+        // fetch a token using the client credentials flow
+        String credential = credentials.get(OAuth2Properties.CREDENTIAL);
+        return sessionCache.cachedSession(
+            cacheKeyFunc.apply(OAuth2Properties.CREDENTIAL),
+            () -> newSessionFromCredential(credential, parent));
+      }
+
+      for (String tokenType : TOKEN_PREFERENCE_ORDER) {
+        if (credentials.containsKey(tokenType)) {
+          // exchange the token for an access token using the token exchange 
flow
+          String token = credentials.get(tokenType);
+          return sessionCache.cachedSession(
+              cacheKeyFunc.apply(tokenType),
+              () -> newSessionFromTokenExchange(token, tokenType, parent));
+        }
+      }
+    }
+    return parent;

Review Comment:
   newline after }



##########
core/src/main/java/org/apache/iceberg/rest/auth/AuthSession.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import org.apache.iceberg.rest.HTTPRequest;
+
+/**
+ * An authentication session that can be used to authenticate outgoing HTTP 
requests.
+ *
+ * <p>Authentication sessions are usually immutable, but may hold resources 
that need to be released
+ * when the session is no longer needed. Implementations should override 
{@link #close()} to release
+ * any resources.
+ */
+public interface AuthSession extends AutoCloseable {

Review Comment:
   it would be great to avoid naming collisions with the already existing 
`AuthSession` in `OAuth2Util`. Also it seems this interface really only adds a 
trait, so what about naming it `Authenticateable` or something along those 
lines?



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {
+      // keep track of the start time for token refresh
+      this.startTimeMillis = System.currentTimeMillis();
+      this.authResponse =
+          OAuth2Util.fetchToken(
+              initClient,
+              headers,
+              config.credential(),
+              config.scope(),
+              config.oauth2ServerUri(),
+              config.optionalOAuthParams());
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          initClient, null, authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          initClient, null, config.token(), null, session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession catalogSession(
+      RESTClient sharedClient, Map<String, String> properties) {
+    this.client = sharedClient;
+    this.sessionCache = new AuthSessionCache(sessionTimeout(properties));
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    setKeepRefreshed(config.keepRefreshed());

Review Comment:
   ```suggestion
       keepRefreshed(config.keepRefreshed());
   ```



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {
+      // keep track of the start time for token refresh
+      this.startTimeMillis = System.currentTimeMillis();
+      this.authResponse =
+          OAuth2Util.fetchToken(
+              initClient,
+              headers,
+              config.credential(),
+              config.scope(),
+              config.oauth2ServerUri(),
+              config.optionalOAuthParams());
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          initClient, null, authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          initClient, null, config.token(), null, session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession catalogSession(
+      RESTClient sharedClient, Map<String, String> properties) {
+    this.client = sharedClient;
+    this.sessionCache = new AuthSessionCache(sessionTimeout(properties));
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    setKeepRefreshed(config.keepRefreshed());

Review Comment:
   we typically don't use `getter`/`setter` names in the codebase



##########
core/src/main/java/org/apache/iceberg/rest/auth/BasicAuthManager.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.RESTClient;
+
+/** An auth manager that adds static BASIC authentication data to outgoing 
HTTP requests. */
+public final class BasicAuthManager implements AuthManager {
+
+  @Override
+  public AuthSession catalogSession(RESTClient sharedClient, Map<String, 
String> properties) {
+    Preconditions.checkArgument(
+        properties.containsKey(AuthProperties.BASIC_USERNAME),
+        "Property %s is required",
+        AuthProperties.BASIC_USERNAME);
+    Preconditions.checkArgument(
+        properties.containsKey(AuthProperties.BASIC_PASSWORD),
+        "Property %s is required",
+        AuthProperties.BASIC_PASSWORD);
+    String username = properties.get(AuthProperties.BASIC_USERNAME);
+    String password = properties.get(AuthProperties.BASIC_PASSWORD);
+    String credentials = username + ":" + password;
+    String header =
+        "Basic " + 
Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));
+    return DefaultAuthSession.of("Authorization", header);

Review Comment:
   ```suggestion
       return DefaultAuthSession.of(OAuth2Util.basicAuthHeaders(credentials));
   ```



##########
core/src/main/java/org/apache/iceberg/rest/auth/BasicAuthManager.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.RESTClient;
+
+/** An auth manager that adds static BASIC authentication data to outgoing 
HTTP requests. */
+public final class BasicAuthManager implements AuthManager {
+
+  @Override
+  public AuthSession catalogSession(RESTClient sharedClient, Map<String, 
String> properties) {
+    Preconditions.checkArgument(
+        properties.containsKey(AuthProperties.BASIC_USERNAME),
+        "Property %s is required",
+        AuthProperties.BASIC_USERNAME);
+    Preconditions.checkArgument(
+        properties.containsKey(AuthProperties.BASIC_PASSWORD),
+        "Property %s is required",
+        AuthProperties.BASIC_PASSWORD);
+    String username = properties.get(AuthProperties.BASIC_USERNAME);
+    String password = properties.get(AuthProperties.BASIC_PASSWORD);
+    String credentials = username + ":" + password;
+    String header =

Review Comment:
   you could use `Oauth2Util.basicAuthHeaders` here directly



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java:
##########
@@ -725,15 +736,15 @@ private static AuthSession fromTokenResponse(
       if (issuedTokenType == null) {
         issuedTokenType = OAuth2Properties.ACCESS_TOKEN_TYPE;
       }
-      AuthSession session =
-          new AuthSession(
-              parent.headers(),
-              AuthConfig.builder()
-                  .from(parent.config())
-                  .token(response.token())
-                  .tokenType(issuedTokenType)
-                  .credential(credential)
-                  .build());
+      Map<String, String> headers = RESTUtil.merge(parent.headers(), 
authHeaders(response.token()));
+      AuthConfig config =

Review Comment:
   why not keep the config inlined as before?



##########
core/src/main/java/org/apache/iceberg/rest/auth/RefreshingAuthManager.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AuthManager} that provides machinery for refreshing 
authentication data asynchronously,
+ * using a background thread pool.
+ */
+public abstract class RefreshingAuthManager implements AuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RefreshingAuthManager.class);
+
+  private final String executorNamePrefix;
+  private boolean keepRefreshed = true;
+  private volatile ScheduledExecutorService refreshExecutor;
+
+  protected RefreshingAuthManager(String executorNamePrefix) {
+    this.executorNamePrefix = executorNamePrefix;
+  }
+
+  public void setKeepRefreshed(boolean keepRefreshed) {

Review Comment:
   ```suggestion
     public void keepRefreshed(boolean keepRefreshed) {
   ```
   
   the iceberg codebase doesn't use getter/setter prefixes



##########
core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unused") // loaded by reflection
+public class OAuth2Manager extends RefreshingAuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuth2Manager.class);
+
+  private static final List<String> TOKEN_PREFERENCE_ORDER =
+      ImmutableList.of(
+          OAuth2Properties.ID_TOKEN_TYPE,
+          OAuth2Properties.ACCESS_TOKEN_TYPE,
+          OAuth2Properties.JWT_TOKEN_TYPE,
+          OAuth2Properties.SAML2_TOKEN_TYPE,
+          OAuth2Properties.SAML1_TOKEN_TYPE);
+
+  // Auth-related properties that are allowed to be passed to the table session
+  private static final Set<String> TABLE_SESSION_ALLOW_LIST =
+      ImmutableSet.<String>builder()
+          .add(OAuth2Properties.TOKEN)
+          .addAll(TOKEN_PREFERENCE_ORDER)
+          .build();
+
+  private RESTClient client;
+  private long startTimeMillis;
+  private OAuthTokenResponse authResponse;
+  private AuthSessionCache sessionCache;
+
+  public OAuth2Manager(String name) {
+    super(name + "-token-refresh");
+  }
+
+  @Override
+  public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, 
String> properties) {
+    warnIfDeprecatedTokenEndpointUsed(properties);
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    if (config.credential() != null) {
+      // keep track of the start time for token refresh
+      this.startTimeMillis = System.currentTimeMillis();
+      this.authResponse =
+          OAuth2Util.fetchToken(
+              initClient,
+              headers,
+              config.credential(),
+              config.scope(),
+              config.oauth2ServerUri(),
+              config.optionalOAuthParams());
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          initClient, null, authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          initClient, null, config.token(), null, session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession catalogSession(
+      RESTClient sharedClient, Map<String, String> properties) {
+    this.client = sharedClient;
+    this.sessionCache = new AuthSessionCache(sessionTimeout(properties));
+    AuthConfig config = AuthConfig.fromProperties(properties);
+    Map<String, String> headers = OAuth2Util.authHeaders(config.token());
+    OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, 
config);
+    setKeepRefreshed(config.keepRefreshed());
+    if (authResponse != null /* from the pre-config phase */) {
+      return OAuth2Util.AuthSession.fromTokenResponse(
+          client, refreshExecutor(), authResponse, startTimeMillis, session);
+    } else if (config.token() != null) {
+      return OAuth2Util.AuthSession.fromAccessToken(
+          client, refreshExecutor(), config.token(), config.expiresAtMillis(), 
session);
+    }
+    return session;
+  }
+
+  @Override
+  public OAuth2Util.AuthSession contextualSession(
+      SessionCatalog.SessionContext context, AuthSession parent) {
+    return maybeCreateChildSession(
+        context.credentials(),
+        context.properties(),
+        ignored -> context.sessionId(),
+        (OAuth2Util.AuthSession) parent);
+  }
+
+  @Override
+  public OAuth2Util.AuthSession tableSession(
+      TableIdentifier table, Map<String, String> properties, AuthSession 
parent) {
+    return maybeCreateChildSession(
+        Maps.filterKeys(properties, TABLE_SESSION_ALLOW_LIST::contains),
+        properties,
+        properties::get,
+        (OAuth2Util.AuthSession) parent);
+  }
+
+  @Override
+  public void close() {
+    try {
+      super.close();
+    } finally {
+      try {
+        AuthSessionCache cache = sessionCache;
+        if (cache != null) {
+          cache.close();
+        }
+      } finally {

Review Comment:
   why do we need this second finally block? Why not set `this.sessionCache = 
null;` before the if statement?



##########
core/src/main/java/org/apache/iceberg/rest/auth/RefreshingAuthManager.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest.auth;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AuthManager} that provides machinery for refreshing 
authentication data asynchronously,
+ * using a background thread pool.
+ */
+public abstract class RefreshingAuthManager implements AuthManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RefreshingAuthManager.class);
+
+  private final String executorNamePrefix;
+  private boolean keepRefreshed = true;
+  private volatile ScheduledExecutorService refreshExecutor;
+
+  protected RefreshingAuthManager(String executorNamePrefix) {
+    this.executorNamePrefix = executorNamePrefix;
+  }
+
+  public void setKeepRefreshed(boolean keepRefreshed) {
+    this.keepRefreshed = keepRefreshed;
+  }
+
+  @Override
+  public void close() {
+    ScheduledExecutorService service = refreshExecutor;
+    try {
+      if (service != null) {
+        List<Runnable> tasks = service.shutdownNow();
+        tasks.forEach(
+            task -> {
+              if (task instanceof Future) {
+                ((Future<?>) task).cancel(true);
+              }
+            });
+
+        try {
+          if (!service.awaitTermination(1, TimeUnit.MINUTES)) {
+            LOG.warn("Timed out waiting for refresh executor to terminate");
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted while waiting for refresh executor to 
terminate", e);
+          Thread.currentThread().interrupt();
+        }
+      }
+    } finally {

Review Comment:
   I don't think this finally block is really needed. Why not set 
`this.refreshExecutor = null` in L53?



##########
core/src/main/java/org/apache/iceberg/rest/HTTPRequest.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.immutables.value.Value;
+
+/** Represents an HTTP request. */
+@Value.Style(
+    set = "*",
+    put = "set*",
+    putAll = "add*",
+    create = "new",
+    toImmutable = "build",

Review Comment:
   we should be keeping this simle and reduce to `@Value.Style(redactedMask = 
"****", depluralize = true)`



##########
core/src/main/java/org/apache/iceberg/rest/HTTPRequest.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.immutables.value.Value;
+
+/** Represents an HTTP request. */
+@Value.Style(
+    set = "*",
+    put = "set*",
+    putAll = "add*",
+    create = "new",
+    toImmutable = "build",
+    redactedMask = "****",
+    depluralize = true)
+@Value.Modifiable
+@Value.Immutable(builder = false)
+@ParametersAreNonnullByDefault
+@SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
+public abstract class HTTPRequest {
+
+  public enum HTTPMethod {
+    GET,
+    HEAD,
+    POST,
+    DELETE
+  }
+
+  /**
+   * Returns the base URI configured at the REST client level. The base URI is 
used to construct the
+   * full {@link #requestUri()}.
+   */
+  @Value.Parameter(order = 0)
+  protected abstract URI baseUri();
+
+  /**
+   * Returns the full URI of this request. The URI is constructed from the 
base URI, path, and query
+   * parameters. It cannot be modified directly.
+   */
+  @Value.Lazy
+  public URI requestUri() {
+    return RESTUtil.buildRequestUri(this);
+  }
+
+  /** Returns the HTTP method of this request. */
+  @Value.Parameter(order = 1)
+  public abstract HTTPMethod method();
+
+  /** Returns the path of this request. */
+  @Value.Parameter(order = 2)
+  public abstract String path();
+
+  /** Returns the query parameters of this request. */
+  @Value.Parameter(order = 3)
+  public abstract Map<String, String> parameters();

Review Comment:
   I would name this `queryParams` or `queryParameters` as it's not immediately 
obvious that these are the query parameters when using the builder



##########
core/src/main/java/org/apache/iceberg/rest/HTTPRequest.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.immutables.value.Value;
+
+/** Represents an HTTP request. */
+@Value.Style(
+    set = "*",
+    put = "set*",
+    putAll = "add*",
+    create = "new",
+    toImmutable = "build",
+    redactedMask = "****",
+    depluralize = true)
+@Value.Modifiable
+@Value.Immutable(builder = false)
+@ParametersAreNonnullByDefault
+@SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
+public abstract class HTTPRequest {
+
+  public enum HTTPMethod {
+    GET,
+    HEAD,
+    POST,
+    DELETE
+  }
+
+  /**
+   * Returns the base URI configured at the REST client level. The base URI is 
used to construct the
+   * full {@link #requestUri()}.
+   */
+  @Value.Parameter(order = 0)
+  protected abstract URI baseUri();
+
+  /**
+   * Returns the full URI of this request. The URI is constructed from the 
base URI, path, and query
+   * parameters. It cannot be modified directly.
+   */
+  @Value.Lazy
+  public URI requestUri() {
+    return RESTUtil.buildRequestUri(this);
+  }
+
+  /** Returns the HTTP method of this request. */
+  @Value.Parameter(order = 1)
+  public abstract HTTPMethod method();
+
+  /** Returns the path of this request. */
+  @Value.Parameter(order = 2)
+  public abstract String path();
+
+  /** Returns the query parameters of this request. */
+  @Value.Parameter(order = 3)
+  public abstract Map<String, String> parameters();
+
+  /** Returns all the headers of this request. The map is case-sensitive! */
+  @Value.Parameter(order = 4)
+  @Value.Redacted
+  public abstract Map<String, List<String>> headers();
+
+  /** Returns the header values of the given name. */
+  public List<String> headers(String name) {
+    return headers().getOrDefault(name, List.of());
+  }
+
+  /** Returns whether the request contains a header with the given name. */
+  public boolean containsHeader(String name) {
+    return !headers(name).isEmpty();
+  }
+
+  /** Returns the raw, unencoded request body. */
+  @Nullable
+  @Value.Parameter(order = 5)
+  @Value.Redacted
+  public abstract Object body();
+
+  /** Returns the encoded request body as a string. */
+  @Value.Default
+  @Nullable
+  @Value.Redacted
+  public String encodedBody() {
+    return RESTUtil.encodeRequestBody(this);
+  }
+
+  /**
+   * Returns the {@link ObjectMapper} to use for encoding the request body. 
The default is {@link
+   * RESTObjectMapper#mapper()}.
+   */
+  @Value.Default
+  protected ObjectMapper mapper() {
+    return RESTObjectMapper.mapper();
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static Builder builder(
+      URI baseUri,
+      HTTPMethod method,
+      String path,
+      @Nullable Map<String, String> queryParams,
+      @Nullable Map<String, String> headers,
+      @Nullable Object body,
+      @Nullable ObjectMapper mapper) {
+    Builder builder = 
builder().baseUri(baseUri).method(method).path(path).body(body);
+    if (queryParams != null) {
+      queryParams.forEach(builder::setParameter);
+    }
+    if (headers != null) {
+      headers.forEach(builder::setHeader);
+    }
+    if (mapper != null) {
+      builder.mapper(mapper);
+    }
+    return builder;
+  }
+
+  /** A modifiable builder for {@link HTTPRequest}. */
+  public static class Builder extends ModifiableHTTPRequest {

Review Comment:
   while I'm a fan of using Immutables where possible, I think this actually 
adds quite a lot of complexity. Can we get around this withhout having to 
introduce a separate builder?



##########
core/src/main/java/org/apache/iceberg/rest/HTTPRequest.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.immutables.value.Value;
+
+/** Represents an HTTP request. */
+@Value.Style(
+    set = "*",
+    put = "set*",
+    putAll = "add*",
+    create = "new",
+    toImmutable = "build",
+    redactedMask = "****",
+    depluralize = true)
+@Value.Modifiable
+@Value.Immutable(builder = false)
+@ParametersAreNonnullByDefault
+@SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
+public abstract class HTTPRequest {
+
+  public enum HTTPMethod {
+    GET,
+    HEAD,
+    POST,
+    DELETE
+  }
+
+  /**
+   * Returns the base URI configured at the REST client level. The base URI is 
used to construct the
+   * full {@link #requestUri()}.
+   */
+  @Value.Parameter(order = 0)
+  protected abstract URI baseUri();
+
+  /**
+   * Returns the full URI of this request. The URI is constructed from the 
base URI, path, and query
+   * parameters. It cannot be modified directly.
+   */
+  @Value.Lazy
+  public URI requestUri() {
+    return RESTUtil.buildRequestUri(this);
+  }
+
+  /** Returns the HTTP method of this request. */
+  @Value.Parameter(order = 1)
+  public abstract HTTPMethod method();
+
+  /** Returns the path of this request. */
+  @Value.Parameter(order = 2)
+  public abstract String path();
+
+  /** Returns the query parameters of this request. */
+  @Value.Parameter(order = 3)
+  public abstract Map<String, String> parameters();
+
+  /** Returns all the headers of this request. The map is case-sensitive! */
+  @Value.Parameter(order = 4)
+  @Value.Redacted
+  public abstract Map<String, List<String>> headers();
+
+  /** Returns the header values of the given name. */
+  public List<String> headers(String name) {
+    return headers().getOrDefault(name, List.of());
+  }
+
+  /** Returns whether the request contains a header with the given name. */
+  public boolean containsHeader(String name) {
+    return !headers(name).isEmpty();
+  }
+
+  /** Returns the raw, unencoded request body. */
+  @Nullable
+  @Value.Parameter(order = 5)
+  @Value.Redacted
+  public abstract Object body();
+
+  /** Returns the encoded request body as a string. */
+  @Value.Default
+  @Nullable
+  @Value.Redacted
+  public String encodedBody() {
+    return RESTUtil.encodeRequestBody(this);
+  }
+
+  /**
+   * Returns the {@link ObjectMapper} to use for encoding the request body. 
The default is {@link
+   * RESTObjectMapper#mapper()}.
+   */
+  @Value.Default
+  protected ObjectMapper mapper() {
+    return RESTObjectMapper.mapper();
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static Builder builder(

Review Comment:
   I'm not a fan of having such a builder method. We already have a generated 
builder, so we should rather go through the builder: 
`HTTPRequest.builder().baseUri(baseUri).method(method).path(path)...build()`



##########
core/src/main/java/org/apache/iceberg/rest/BaseHTTPClient.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
+import org.apache.iceberg.rest.auth.AuthSession;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+
+/** A base class for {@link RESTClient} implementations using {@link 
HTTPRequest}. */
+public abstract class BaseHTTPClient implements RESTClient {

Review Comment:
   while such refactorings make a lot of sense, it would be easier to have them 
in a separate PR as this reduces the review complexity of what you're trying to 
do in this PR.
   Could you please extract the introduction of `BaseHTTPClient` into a 
separate PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to