This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ef3661366 [#9567]test(auth): Add OAuth integration tests (#10549)
3ef3661366 is described below

commit 3ef3661366f7391599bf832b9eb421ae2a9c8027
Author: Bharath Krishna <[email protected]>
AuthorDate: Tue Apr 28 08:14:28 2026 -0700

    [#9567]test(auth): Add OAuth integration tests (#10549)
    
    ### What changes were proposed in this pull request?
    
    
    Adds OAuth2/JWKS integration test coverage for `JwksTokenValidator`
    using an in-process `JwksMockServerHelper` (serves `/jwks` + `/token`) —
    no external services required for the core tests.
    
    **`JwksTokenValidatorIT`** (client-java, embedded): valid token
    accepted; expired, wrong-audience, and wrong-key tokens rejected;
    per-user authorization via `ForbiddenException`.
    
    **`SparkJwksAuthIT`** (spark-common, embedded): end-to-end Spark plugin
    → `client_credentials` → JWKS validation, verified via
    `GravitinoAdminClient` with the same OAuth provider.
    
    **`SparkJwksAuthorizationIT33/34/35`** (Docker, MySQL): Alice creates a
    table via Spark OAuth2; Bob is denied until admin grants access.
    
    
    ### Why are the changes needed?
    
    `JwksTokenValidator` had no integration test coverage.
    
    Fix: #9567
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    - Embedded tests (no Docker): `./gradlew :clients:client-java:test
    --tests "*.JwksTokenValidatorIT"` and `./gradlew
    :spark-connector:spark-common:test --tests "*.SparkJwksAuthIT"`
    - Docker tests: `./gradlew test -PskipTests=false
    -PskipDockerTests=false` (requires MySQL container, tagged
    `@Tag("gravitino-docker-test")`)
---
 clients/client-java/build.gradle.kts               |   1 +
 .../test/authorization/JwksTokenValidatorIT.java   | 201 ++++++++++++++
 integration-test-common/build.gradle.kts           |   1 +
 .../test/util/JwksMockServerHelper.java            | 196 ++++++++++++++
 spark-connector/spark-common/build.gradle.kts      |   1 +
 .../test/authorization/SparkJwksAuthIT.java        | 135 ++++++++++
 .../authorization/SparkJwksAuthorizationIT.java    | 289 +++++++++++++++++++++
 .../authorization/SparkJwksAuthorizationIT33.java  |  21 ++
 .../authorization/SparkJwksAuthorizationIT34.java  |  21 ++
 .../authorization/SparkJwksAuthorizationIT35.java  |  21 ++
 10 files changed, 887 insertions(+)

diff --git a/clients/client-java/build.gradle.kts 
b/clients/client-java/build.gradle.kts
index 9a4f74a5fb..11929c8fec 100644
--- a/clients/client-java/build.gradle.kts
+++ b/clients/client-java/build.gradle.kts
@@ -50,6 +50,7 @@ dependencies {
   testImplementation(libs.awaitility)
   testImplementation(libs.bundles.jersey)
   testImplementation(libs.bundles.jwt)
+  testImplementation(libs.nimbus.jose.jwt)
   testImplementation(libs.commons.lang3)
   testImplementation(libs.hadoop3.client)
   testImplementation(libs.junit.jupiter.api)
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JwksTokenValidatorIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JwksTokenValidatorIT.java
new file mode 100644
index 0000000000..9ff4d93a08
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JwksTokenValidatorIT.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test.authorization;
+
+import com.google.common.collect.Maps;
+import com.nimbusds.jose.jwk.RSAKey;
+import com.nimbusds.jose.jwk.gen.RSAKeyGenerator;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Map;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.client.GravitinoVersion;
+import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.gravitino.integration.test.util.JwksMockServerHelper;
+import org.apache.gravitino.integration.test.util.OAuthMockDataProvider;
+import org.apache.gravitino.server.authentication.OAuthConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for {@link 
org.apache.gravitino.server.authentication.JwksTokenValidator}.
+ * Exercises the full {@code client_credentials → /token → Bearer → JWKS 
validation} path.
+ */
+public class JwksTokenValidatorIT extends BaseIT {
+
+  private static final String SERVICE_AUDIENCE = "service1";
+  private static final String SUBJECT = "gravitino";
+  private static final String KEY_ID = "test-kid";
+  private static final String METALAKE_NAME = "jwks_auth_metalake";
+  private static final String ALICE = "alice";
+  private static final String BOB = "bob";
+
+  private static JwksMockServerHelper mockServerHelper;
+  private static RSAKey rsaKey;
+  private static volatile String currentToken;
+
+  private static String validToken;
+
+  @BeforeAll
+  @Override
+  public void startIntegrationTest() throws Exception {
+    mockServerHelper = JwksMockServerHelper.create(KEY_ID);
+    rsaKey = mockServerHelper.rsaKey();
+    validToken =
+        JwksMockServerHelper.mintToken(
+            rsaKey, SUBJECT, SERVICE_AUDIENCE, 
Instant.now().plusSeconds(1_000_000));
+    currentToken = validToken;
+    mockServerHelper.setTokenSupplier(() -> currentToken);
+
+    Map<String, String> configs = Maps.newHashMap();
+    configs.put(Configs.AUTHENTICATORS.getKey(), 
AuthenticatorType.OAUTH.name().toLowerCase());
+    configs.put(OAuthConfig.SERVICE_AUDIENCE.getKey(), SERVICE_AUDIENCE);
+    configs.put(
+        OAuthConfig.TOKEN_VALIDATOR_CLASS.getKey(),
+        "org.apache.gravitino.server.authentication.JwksTokenValidator");
+    configs.put(OAuthConfig.JWKS_URI.getKey(), mockServerHelper.jwksUri());
+    configs.put(OAuthConfig.PRINCIPAL_FIELDS.getKey(), "sub");
+    configs.put(OAuthConfig.ALLOW_SKEW_SECONDS.getKey(), "6");
+    configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), "true");
+    configs.put(Configs.SERVICE_ADMINS.getKey(), SUBJECT);
+    registerCustomConfigs(configs);
+
+    
OAuthMockDataProvider.getInstance().setTokenData(validToken.getBytes(StandardCharsets.UTF_8));
+    super.startIntegrationTest();
+
+    client.createMetalake(METALAKE_NAME, "JWKS auth test metalake", 
Maps.newHashMap());
+    client.loadMetalake(METALAKE_NAME).addUser(ALICE);
+  }
+
+  @AfterAll
+  @Override
+  public void stopIntegrationTest() throws IOException, InterruptedException {
+    if (mockServerHelper != null) {
+      mockServerHelper.close();
+    }
+    super.stopIntegrationTest();
+  }
+
+  @Test
+  public void testValidTokenAuthentication() throws Exception {
+    withToken(
+        validToken,
+        oauthClient -> {
+          GravitinoVersion version = oauthClient.serverVersion();
+          String projectVersion = System.getenv("PROJECT_VERSION");
+          if (projectVersion != null) {
+            Assertions.assertEquals(projectVersion, version.version());
+          }
+          Assertions.assertFalse(version.compileDate().isEmpty());
+          if (testMode.equals(ITUtils.EMBEDDED_TEST_MODE)) {
+            Assertions.assertEquals(readGitCommitIdFromGitFile(), 
version.gitCommit());
+          }
+        });
+  }
+
+  @Test
+  public void testExpiredTokenFails() throws Exception {
+    String expiredToken =
+        JwksMockServerHelper.mintToken(
+            rsaKey, SUBJECT, SERVICE_AUDIENCE, 
Instant.now().minusSeconds(3600));
+    withToken(
+        expiredToken,
+        badClient -> Assertions.assertThrows(RuntimeException.class, 
badClient::serverVersion));
+  }
+
+  @Test
+  public void testWrongAudienceFails() throws Exception {
+    String wrongAudToken =
+        JwksMockServerHelper.mintToken(
+            rsaKey, SUBJECT, "wrong-audience", 
Instant.now().plusSeconds(1_000_000));
+    withToken(
+        wrongAudToken,
+        badClient -> Assertions.assertThrows(RuntimeException.class, 
badClient::serverVersion));
+  }
+
+  @Test
+  public void testTokenSignedWithDifferentKeyFails() throws Exception {
+    RSAKey differentKey = new 
RSAKeyGenerator(2048).keyID("other-kid").generate();
+    String wrongKeyToken =
+        JwksMockServerHelper.mintToken(
+            differentKey, SUBJECT, SERVICE_AUDIENCE, 
Instant.now().plusSeconds(1_000_000));
+    withToken(
+        wrongKeyToken,
+        badClient -> Assertions.assertThrows(RuntimeException.class, 
badClient::serverVersion));
+  }
+
+  @Test
+  public void testPrivilegedUserCanAccessMetalake() throws Exception {
+    String aliceToken =
+        JwksMockServerHelper.mintToken(
+            rsaKey, ALICE, SERVICE_AUDIENCE, 
Instant.now().plusSeconds(1_000_000));
+    withToken(
+        aliceToken,
+        aliceClient -> {
+          GravitinoMetalake ml = aliceClient.loadMetalake(METALAKE_NAME);
+          Assertions.assertNotNull(ml);
+          Assertions.assertEquals(METALAKE_NAME, ml.name());
+        });
+  }
+
+  @Test
+  public void testUnprivilegedUserForbidden() throws Exception {
+    String bobToken =
+        JwksMockServerHelper.mintToken(
+            rsaKey, BOB, SERVICE_AUDIENCE, 
Instant.now().plusSeconds(1_000_000));
+    withToken(
+        bobToken,
+        bobClient ->
+            Assertions.assertThrows(
+                ForbiddenException.class, () -> 
bobClient.loadMetalake(METALAKE_NAME)));
+  }
+
+  /** Swaps {@code currentToken}, builds a fresh provider+client, runs {@code 
action}, restores. */
+  private void withToken(String token, ClientConsumer action) throws Exception 
{
+    String previous = currentToken;
+    currentToken = token;
+    DefaultOAuth2TokenProvider provider =
+        DefaultOAuth2TokenProvider.builder()
+            .withUri(mockServerHelper.baseUri())
+            .withPath("token")
+            .withCredential("test-client:test-secret")
+            .withScope("openid")
+            .build();
+    try (GravitinoAdminClient tempClient =
+        GravitinoAdminClient.builder(serverUri).withOAuth(provider).build()) {
+      action.accept(tempClient);
+    } finally {
+      currentToken = previous;
+    }
+  }
+
+  @FunctionalInterface
+  private interface ClientConsumer {
+    void accept(GravitinoAdminClient client) throws Exception;
+  }
+}
diff --git a/integration-test-common/build.gradle.kts 
b/integration-test-common/build.gradle.kts
index 7de6bf080b..5f7063496d 100644
--- a/integration-test-common/build.gradle.kts
+++ b/integration-test-common/build.gradle.kts
@@ -39,6 +39,7 @@ dependencies {
   testImplementation(libs.bundles.jetty)
   testImplementation(libs.bundles.jersey)
   testImplementation(libs.bundles.jwt)
+  testImplementation(libs.nimbus.jose.jwt)
   testImplementation(libs.bundles.log4j)
   testImplementation(libs.commons.cli)
   testImplementation(libs.commons.lang3)
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/JwksMockServerHelper.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/JwksMockServerHelper.java
new file mode 100644
index 0000000000..136c4fd9e3
--- /dev/null
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/JwksMockServerHelper.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.util;
+
+import com.nimbusds.jose.JWSAlgorithm;
+import com.nimbusds.jose.JWSHeader;
+import com.nimbusds.jose.crypto.RSASSASigner;
+import com.nimbusds.jose.jwk.JWKSet;
+import com.nimbusds.jose.jwk.RSAKey;
+import com.nimbusds.jose.jwk.gen.RSAKeyGenerator;
+import com.nimbusds.jwt.JWTClaimsSet;
+import com.nimbusds.jwt.SignedJWT;
+import com.sun.net.httpserver.HttpServer;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+
+/** In-process JWKS + OAuth2 token mock server for integration tests. */
+public class JwksMockServerHelper implements Closeable {
+
+  private final HttpServer httpServer;
+  private final RSAKey rsaKey;
+  private final int port;
+
+  @Nullable private volatile Supplier<String> tokenSupplier;
+  private final Map<String, String> userTokens = new ConcurrentHashMap<>();
+  @Nullable private volatile String fallbackToken;
+
+  private JwksMockServerHelper(HttpServer httpServer, RSAKey rsaKey, int port) 
{
+    this.httpServer = httpServer;
+    this.rsaKey = rsaKey;
+    this.port = port;
+  }
+
+  /** Returns the RSA key pair used for signing JWTs. */
+  public RSAKey rsaKey() {
+    return rsaKey;
+  }
+
+  /** Returns the port the mock server is listening on. */
+  public int port() {
+    return port;
+  }
+
+  /** Returns the base URI of the mock server, e.g. {@code 
http://localhost:12345}. */
+  public String baseUri() {
+    return "http://localhost:"; + port;
+  }
+
+  /** Returns the JWKS endpoint URI, e.g. {@code http://localhost:12345/jwks}. 
*/
+  public String jwksUri() {
+    return baseUri() + "/jwks";
+  }
+
+  /** Configures single-token mode: every {@code /token} request returns the 
supplier's token. */
+  public void setTokenSupplier(Supplier<String> supplier) {
+    this.tokenSupplier = supplier;
+  }
+
+  /** Registers a pre-minted JWT for a specific {@code client_id} (multi-user 
mode). */
+  public void registerUserToken(String clientId, String token) {
+    userTokens.put(clientId, token);
+  }
+
+  /** Sets the fallback token when {@code client_id} is not found in the user 
token map. */
+  public void setFallbackToken(String token) {
+    this.fallbackToken = token;
+  }
+
+  /** Stops the mock HTTP server. */
+  @Override
+  public void close() throws IOException {
+    httpServer.stop(0);
+  }
+
+  /** Mints a compact RS256-signed JWT using this helper's RSA key. */
+  public String mintToken(String subject, String audience, Instant expiry) 
throws Exception {
+    return mintToken(rsaKey, subject, audience, expiry);
+  }
+
+  /** Mints a compact RS256-signed JWT using the specified RSA key. */
+  @SuppressWarnings("JavaUtilDate")
+  public static String mintToken(RSAKey key, String subject, String audience, 
Instant expiry)
+      throws Exception {
+    JWTClaimsSet claims =
+        new JWTClaimsSet.Builder()
+            .subject(subject)
+            .audience(audience)
+            .issueTime(Date.from(Instant.now()))
+            .expirationTime(Date.from(expiry))
+            .build();
+    SignedJWT jwt =
+        new SignedJWT(
+            new 
JWSHeader.Builder(JWSAlgorithm.RS256).keyID(key.getKeyID()).build(), claims);
+    jwt.sign(new RSASSASigner(key));
+    return jwt.serialize();
+  }
+
+  /** Builds the JSON body returned by the mock {@code /token} endpoint. */
+  public static String buildTokenResponseJson(String accessToken) {
+    return "{\"access_token\":\""
+        + accessToken
+        + "\",\"token_type\":\"bearer\",\"expires_in\":86400}";
+  }
+
+  /** Parses a single parameter from a URL-encoded form body. */
+  static String parseFormParam(String body, String key) {
+    for (String pair : body.split("&")) {
+      String[] kv = pair.split("=", 2);
+      if (kv.length == 2 && URLDecoder.decode(kv[0], 
StandardCharsets.UTF_8).equals(key)) {
+        return URLDecoder.decode(kv[1], StandardCharsets.UTF_8);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates and starts a new mock server with a freshly generated RSA key 
pair on a random port.
+   */
+  public static JwksMockServerHelper create(String keyId) throws Exception {
+    RSAKey rsaKey = new RSAKeyGenerator(2048).keyID(keyId).generate();
+    String jwksJson = new JWKSet(rsaKey.toPublicJWK()).toString();
+
+    HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+
+    JwksMockServerHelper helper =
+        new JwksMockServerHelper(server, rsaKey, 
server.getAddress().getPort());
+
+    // /jwks – serves the JWKS public key document.
+    server.createContext(
+        "/jwks",
+        exchange -> {
+          byte[] body = jwksJson.getBytes(StandardCharsets.UTF_8);
+          exchange.getResponseHeaders().set("Content-Type", 
"application/json");
+          exchange.sendResponseHeaders(200, body.length);
+          exchange.getResponseBody().write(body);
+          exchange.getResponseBody().close();
+        });
+
+    // /token – mock OAuth2 client_credentials endpoint.
+    server.createContext(
+        "/token",
+        exchange -> {
+          String postBody =
+              new String(exchange.getRequestBody().readAllBytes(), 
StandardCharsets.UTF_8);
+
+          String token;
+          Supplier<String> supplier = helper.tokenSupplier;
+          if (supplier != null) {
+            // Single-token mode: use the supplier.
+            token = supplier.get();
+          } else {
+            // Multi-user mode: look up client_id.
+            String clientId = parseFormParam(postBody, "client_id");
+            String fb = helper.fallbackToken;
+            token =
+                clientId != null && helper.userTokens.containsKey(clientId)
+                    ? helper.userTokens.get(clientId)
+                    : (fb != null ? fb : "");
+          }
+
+          byte[] resp = 
buildTokenResponseJson(token).getBytes(StandardCharsets.UTF_8);
+          exchange.getResponseHeaders().set("Content-Type", 
"application/json");
+          exchange.sendResponseHeaders(200, resp.length);
+          exchange.getResponseBody().write(resp);
+          exchange.getResponseBody().close();
+        });
+
+    server.start();
+    return helper;
+  }
+}
diff --git a/spark-connector/spark-common/build.gradle.kts 
b/spark-connector/spark-common/build.gradle.kts
index 6727387f3c..80c6ea8faa 100644
--- a/spark-connector/spark-common/build.gradle.kts
+++ b/spark-connector/spark-common/build.gradle.kts
@@ -121,6 +121,7 @@ dependencies {
   }
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
+  testImplementation(libs.nimbus.jose.jwt)
   testImplementation(libs.mysql.driver)
   testImplementation(libs.postgresql.driver)
   testImplementation(libs.testcontainers)
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthIT.java
new file mode 100644
index 0000000000..88975e468e
--- /dev/null
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthIT.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Map;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthProperties;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.JwksMockServerHelper;
+import org.apache.gravitino.integration.test.util.OAuthMockDataProvider;
+import org.apache.gravitino.server.authentication.OAuthConfig;
+import org.apache.gravitino.spark.connector.GravitinoSparkConfig;
+import org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * End-to-end integration test for Spark OAuth2 authentication via JWKS. 
Exercises the full path:
+ * Spark plugin → {@code client_credentials} → JWT fetch → Gravitino REST → 
JWKS validation.
+ */
+public class SparkJwksAuthIT extends BaseIT {
+
+  private static final String SERVICE_AUDIENCE = "service1";
+  private static final String METALAKE = "jwks_test_metalake";
+  private static final String KEY_ID = "test-kid";
+
+  private static JwksMockServerHelper mockServerHelper;
+
+  private static String validToken;
+  private static SparkSession sparkSession;
+
+  @BeforeAll
+  @Override
+  public void startIntegrationTest() throws Exception {
+    mockServerHelper = JwksMockServerHelper.create(KEY_ID);
+    validToken =
+        mockServerHelper.mintToken(
+            "gravitino", SERVICE_AUDIENCE, 
Instant.now().plusSeconds(1_000_000));
+    mockServerHelper.setTokenSupplier(() -> validToken);
+
+    Map<String, String> configs = Maps.newHashMap();
+    configs.put(Configs.AUTHENTICATORS.getKey(), 
AuthenticatorType.OAUTH.name().toLowerCase());
+    configs.put(OAuthConfig.SERVICE_AUDIENCE.getKey(), SERVICE_AUDIENCE);
+    configs.put(
+        OAuthConfig.TOKEN_VALIDATOR_CLASS.getKey(),
+        "org.apache.gravitino.server.authentication.JwksTokenValidator");
+    configs.put(OAuthConfig.JWKS_URI.getKey(), mockServerHelper.jwksUri());
+    configs.put(OAuthConfig.PRINCIPAL_FIELDS.getKey(), "sub");
+    configs.put(OAuthConfig.ALLOW_SKEW_SECONDS.getKey(), "6");
+    registerCustomConfigs(configs);
+
+    
OAuthMockDataProvider.getInstance().setTokenData(validToken.getBytes(StandardCharsets.UTF_8));
+    super.startIntegrationTest();
+
+    client.createMetalake(METALAKE, "JWKS auth test metalake", 
Maps.newHashMap());
+
+    SparkConf sparkConf =
+        new SparkConf()
+            .set("spark.plugins", GravitinoSparkPlugin.class.getName())
+            .set(GravitinoSparkConfig.GRAVITINO_URI, serverUri)
+            .set(GravitinoSparkConfig.GRAVITINO_METALAKE, METALAKE)
+            .set(GravitinoSparkConfig.GRAVITINO_AUTH_TYPE, 
AuthProperties.OAUTH2_AUTH_TYPE)
+            .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_URI, 
mockServerHelper.baseUri())
+            .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_PATH, "token")
+            .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_CREDENTIAL, 
"test-client:test-secret")
+            .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_SCOPE, "openid");
+
+    sparkSession =
+        SparkSession.builder()
+            .master("local[1]")
+            .appName("SparkJwksAuthIT")
+            .config(sparkConf)
+            .getOrCreate();
+  }
+
+  @AfterAll
+  @Override
+  public void stopIntegrationTest() throws IOException, InterruptedException {
+    if (sparkSession != null) {
+      sparkSession.close();
+    }
+    if (mockServerHelper != null) {
+      mockServerHelper.close();
+    }
+    super.stopIntegrationTest();
+  }
+
+  @Test
+  public void testSparkConnectsWithJwksAuth() throws Exception {
+    // Trigger the Spark plugin's OAuth handshake with Gravitino on the first 
SQL call.
+    Assertions.assertDoesNotThrow(() -> sparkSession.sql("SHOW 
CATALOGS").collect());
+
+    // Verify the full client_credentials → JWKS validation path via a direct 
Gravitino client.
+    DefaultOAuth2TokenProvider oauthProvider =
+        DefaultOAuth2TokenProvider.builder()
+            .withUri(mockServerHelper.baseUri())
+            .withPath("token")
+            .withCredential("test-client:test-secret")
+            .withScope("openid")
+            .build();
+    try (GravitinoAdminClient oauthClient =
+        
GravitinoAdminClient.builder(serverUri).withOAuth(oauthProvider).build()) {
+      GravitinoMetalake metalake = oauthClient.loadMetalake(METALAKE);
+      Assertions.assertEquals(METALAKE, metalake.name());
+    }
+  }
+}
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT.java
new file mode 100644
index 0000000000..83c1840ce7
--- /dev/null
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.auth.AuthProperties;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.JwksMockServerHelper;
+import org.apache.gravitino.integration.test.util.OAuthMockDataProvider;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.apache.gravitino.server.authentication.OAuthConfig;
+import org.apache.gravitino.spark.connector.GravitinoSparkConfig;
+import org.apache.gravitino.spark.connector.jdbc.JdbcPropertiesConstants;
+import org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end Docker integration test for Spark OAuth2 authentication and 
authorization via JWKS.
+ * User identity is derived solely from the JWT {@code sub} claim.
+ */
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public abstract class SparkJwksAuthorizationIT extends BaseIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkJwksAuthorizationIT.class);
+
+  private static final String GRAVITINO_ADMIN = "gravitino";
+  private static final String ALICE = "alice";
+  private static final String BOB = "bob";
+
+  private static final String ALICE_CREDENTIAL = ALICE + ":alice-secret";
+  private static final String BOB_CREDENTIAL = BOB + ":bob-secret";
+
+  private static final String SERVICE_AUDIENCE = "service1";
+  private static final String KEY_ID = "test-kid";
+  private static final String METALAKE = "jwks_authz_metalake";
+  private static final String CATALOG = "jdbc_catalog";
+  private static final String SCHEMA = "jdbc_schema";
+  private static final String ALICE_TABLE = "alice_table";
+  private static final String ALICE_ROLE = "alice-role";
+  private static final String BOB_ROLE = "bob-role";
+
+  protected final ContainerSuite containerSuite = ContainerSuite.getInstance();
+
+  private static JwksMockServerHelper mockServerHelper;
+
+  private static String gravitinoUri;
+
+  private static SparkSession aliceSparkSession;
+  private static GravitinoAdminClient bobClient;
+
+  private String mysqlUrl;
+  private String mysqlUsername;
+  private String mysqlPassword;
+  private String mysqlDriver;
+
+  @BeforeAll
+  @Override
+  public void startIntegrationTest() throws Exception {
+    mockServerHelper = JwksMockServerHelper.create(KEY_ID);
+
+    Instant farFuture = Instant.now().plusSeconds(1_000_000);
+    String adminToken = mockServerHelper.mintToken(GRAVITINO_ADMIN, 
SERVICE_AUDIENCE, farFuture);
+    String aliceToken = mockServerHelper.mintToken(ALICE, SERVICE_AUDIENCE, 
farFuture);
+    String bobToken = mockServerHelper.mintToken(BOB, SERVICE_AUDIENCE, 
farFuture);
+
+    mockServerHelper.registerUserToken(GRAVITINO_ADMIN, adminToken);
+    mockServerHelper.registerUserToken(ALICE, aliceToken);
+    mockServerHelper.registerUserToken(BOB, bobToken);
+    mockServerHelper.setFallbackToken(adminToken);
+    LOG.info("Mock JWKS+token server started on port {}", 
mockServerHelper.port());
+
+    Map<String, String> configs = Maps.newHashMap();
+    configs.put(Configs.AUTHENTICATORS.getKey(), 
AuthenticatorType.OAUTH.name().toLowerCase());
+    configs.put(OAuthConfig.SERVICE_AUDIENCE.getKey(), SERVICE_AUDIENCE);
+    configs.put(
+        OAuthConfig.TOKEN_VALIDATOR_CLASS.getKey(),
+        "org.apache.gravitino.server.authentication.JwksTokenValidator");
+    configs.put(OAuthConfig.JWKS_URI.getKey(), mockServerHelper.jwksUri());
+    configs.put(OAuthConfig.PRINCIPAL_FIELDS.getKey(), "sub");
+    configs.put(OAuthConfig.ALLOW_SKEW_SECONDS.getKey(), "6");
+    configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), "true");
+    configs.put(Configs.SERVICE_ADMINS.getKey(), GRAVITINO_ADMIN);
+    configs.put(Configs.CACHE_ENABLED.getKey(), "false");
+    registerCustomConfigs(configs);
+
+    
OAuthMockDataProvider.getInstance().setTokenData(adminToken.getBytes(StandardCharsets.UTF_8));
+
+    initMysqlContainer();
+    super.startIntegrationTest();
+    gravitinoUri = String.format("http://127.0.0.1:%d";, 
getGravitinoServerPort());
+
+    initMetadata();
+
+    aliceSparkSession = buildSparkSession(ALICE_CREDENTIAL);
+    bobClient = buildGravitinoClient(BOB_CREDENTIAL);
+  }
+
+  @AfterAll
+  @Override
+  public void stopIntegrationTest() throws IOException, InterruptedException {
+    if (aliceSparkSession != null) {
+      aliceSparkSession.stop();
+    }
+    if (bobClient != null) {
+      bobClient.close();
+    }
+    if (mockServerHelper != null) {
+      mockServerHelper.close();
+    }
+    super.stopIntegrationTest();
+  }
+
+  private void initMysqlContainer() throws SQLException {
+    
containerSuite.startMySQLContainer(TestDatabaseName.MYSQL_CATALOG_MYSQL_IT);
+    mysqlUrl = containerSuite.getMySQLContainer().getJdbcUrl();
+    mysqlUsername = containerSuite.getMySQLContainer().getUsername();
+    mysqlPassword = containerSuite.getMySQLContainer().getPassword();
+    mysqlDriver =
+        containerSuite
+            .getMySQLContainer()
+            .getDriverClassName(TestDatabaseName.MYSQL_CATALOG_MYSQL_IT);
+  }
+
+  private void initMetadata() {
+    client.createMetalake(METALAKE, "", new HashMap<>());
+    GravitinoMetalake metalake = client.loadMetalake(METALAKE);
+    metalake.addUser(ALICE);
+    metalake.addUser(BOB);
+
+    Map<String, String> props = Maps.newHashMap();
+    props.put(JdbcPropertiesConstants.GRAVITINO_JDBC_URL, mysqlUrl);
+    props.put(JdbcPropertiesConstants.GRAVITINO_JDBC_USER, mysqlUsername);
+    props.put(JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD, mysqlPassword);
+    props.put(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER, mysqlDriver);
+    Catalog catalog =
+        metalake.createCatalog(CATALOG, Catalog.Type.RELATIONAL, "jdbc-mysql", 
"", props);
+    catalog.asSchemas().createSchema(SCHEMA, "", new HashMap<>());
+
+    SecurableObject catalogAccess =
+        SecurableObjects.ofCatalog(
+            CATALOG,
+            ImmutableList.of(
+                Privileges.UseCatalog.allow(),
+                Privileges.UseSchema.allow(),
+                Privileges.CreateTable.allow(),
+                Privileges.SelectTable.allow()));
+    metalake.createRole(ALICE_ROLE, new HashMap<>(), 
ImmutableList.of(catalogAccess));
+    metalake.grantRolesToUser(ImmutableList.of(ALICE_ROLE), ALICE);
+  }
+
+  @Test
+  @Order(1)
+  public void testAliceCreatesTableViaSparkOAuth() {
+    aliceSparkSession.sql("USE " + CATALOG + "." + SCHEMA);
+    aliceSparkSession.sql("CREATE TABLE " + ALICE_TABLE + " (id INT, name 
STRING)");
+
+    List<Row> tables = aliceSparkSession.sql("SHOW TABLES").collectAsList();
+    Assertions.assertTrue(
+        tables.stream().anyMatch(r -> 
ALICE_TABLE.equalsIgnoreCase(r.getString(1))),
+        "Alice's Spark session should see the table she created");
+
+    GravitinoMetalake adminMetalake = client.loadMetalake(METALAKE);
+    Optional<Owner> tableOwner =
+        adminMetalake.getOwner(
+            MetadataObjects.of(
+                ImmutableList.of(CATALOG, SCHEMA, ALICE_TABLE), 
MetadataObject.Type.TABLE));
+    Assertions.assertTrue(tableOwner.isPresent(), "Table should have an owner 
recorded");
+    Assertions.assertEquals(ALICE, tableOwner.get().name(), "Table owner 
should be Alice");
+    LOG.info("Alice created '{}' via Spark OAuth2 (sub=alice)", ALICE_TABLE);
+  }
+
+  @Test
+  @Order(2)
+  public void testBobCannotAccessCatalogBeforeGrant() {
+    GravitinoMetalake bobMetalake = bobClient.loadMetalake(METALAKE);
+    Assertions.assertThrows(
+        ForbiddenException.class,
+        () -> bobMetalake.loadCatalog(CATALOG),
+        "Bob should be denied catalog access before grant");
+    LOG.info("Bob correctly received ForbiddenException before grant 
(sub=bob)");
+  }
+
+  @Test
+  @Order(3)
+  public void testBobAccessCatalogAfterGrant() {
+    GravitinoMetalake adminMetalake = client.loadMetalake(METALAKE);
+    SecurableObject bobCatalogAccess =
+        SecurableObjects.ofCatalog(
+            CATALOG,
+            ImmutableList.of(
+                Privileges.UseCatalog.allow(),
+                Privileges.UseSchema.allow(),
+                Privileges.SelectTable.allow()));
+    adminMetalake.createRole(BOB_ROLE, new HashMap<>(), 
ImmutableList.of(bobCatalogAccess));
+    adminMetalake.grantRolesToUser(ImmutableList.of(BOB_ROLE), BOB);
+
+    GravitinoMetalake bobMetalake = bobClient.loadMetalake(METALAKE);
+    Catalog catalog = bobMetalake.loadCatalog(CATALOG);
+    Assertions.assertNotNull(catalog);
+
+    boolean tableExists =
+        catalog.asTableCatalog().tableExists(NameIdentifier.of(SCHEMA, 
ALICE_TABLE));
+    Assertions.assertTrue(
+        tableExists, "Bob should see the table Alice created after being 
granted access");
+    LOG.info("Bob's JWT-authenticated client sees '{}' after grant (sub=bob)", 
ALICE_TABLE);
+  }
+
+  private SparkSession buildSparkSession(String credential) {
+    SparkConf conf =
+        new SparkConf()
+            .set("spark.plugins", GravitinoSparkPlugin.class.getName())
+            .set(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri)
+            .set(GravitinoSparkConfig.GRAVITINO_METALAKE, METALAKE)
+            .set(GravitinoSparkConfig.GRAVITINO_AUTH_TYPE, 
AuthProperties.OAUTH2_AUTH_TYPE)
+            .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_URI, 
mockServerHelper.baseUri())
+            .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_PATH, "token")
+            .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_CREDENTIAL, credential)
+            .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_SCOPE, "openid");
+    return SparkSession.builder()
+        .master("local[1]")
+        .appName("SparkJwksAuthorizationIT")
+        .config(conf)
+        .getOrCreate();
+  }
+
+  private GravitinoAdminClient buildGravitinoClient(String credential) {
+    DefaultOAuth2TokenProvider provider =
+        DefaultOAuth2TokenProvider.builder()
+            .withUri(mockServerHelper.baseUri())
+            .withPath("token")
+            .withCredential(credential)
+            .withScope("openid")
+            .build();
+    return 
GravitinoAdminClient.builder(gravitinoUri).withOAuth(provider).build();
+  }
+}
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT33.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT33.java
new file mode 100644
index 0000000000..2ec8cdd566
--- /dev/null
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT33.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+public class SparkJwksAuthorizationIT33 extends SparkJwksAuthorizationIT {}
diff --git 
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT34.java
 
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT34.java
new file mode 100644
index 0000000000..5c7563d560
--- /dev/null
+++ 
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT34.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+public class SparkJwksAuthorizationIT34 extends SparkJwksAuthorizationIT {}
diff --git 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT35.java
 
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT35.java
new file mode 100644
index 0000000000..b99c11503d
--- /dev/null
+++ 
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT35.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+public class SparkJwksAuthorizationIT35 extends SparkJwksAuthorizationIT {}


Reply via email to