This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 3ef3661366 [#9567]test(auth): Add OAuth integration tests (#10549)
3ef3661366 is described below
commit 3ef3661366f7391599bf832b9eb421ae2a9c8027
Author: Bharath Krishna <[email protected]>
AuthorDate: Tue Apr 28 08:14:28 2026 -0700
[#9567]test(auth): Add OAuth integration tests (#10549)
### What changes were proposed in this pull request?
Adds OAuth2/JWKS integration test coverage for `JwksTokenValidator`
using an in-process `JwksMockServerHelper` (serves `/jwks` + `/token`) —
no external services required for the core tests.
**`JwksTokenValidatorIT`** (client-java, embedded): valid token
accepted; expired, wrong-audience, and wrong-key tokens rejected;
per-user authorization via `ForbiddenException`.
**`SparkJwksAuthIT`** (spark-common, embedded): end-to-end Spark plugin
→ `client_credentials` → JWKS validation, verified via
`GravitinoAdminClient` with the same OAuth provider.
**`SparkJwksAuthorizationIT33/34/35`** (Docker, MySQL): Alice creates a
table via Spark OAuth2; Bob is denied until admin grants access.
### Why are the changes needed?
`JwksTokenValidator` had no integration test coverage.
Fix: #9567
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Embedded tests (no Docker): `./gradlew :clients:client-java:test
--tests "*.JwksTokenValidatorIT"` and `./gradlew
:spark-connector:spark-common:test --tests "*.SparkJwksAuthIT"`
- Docker tests: `./gradlew test -PskipTests=false
-PskipDockerTests=false` (requires MySQL container, tagged
`@Tag("gravitino-docker-test")`)
---
clients/client-java/build.gradle.kts | 1 +
.../test/authorization/JwksTokenValidatorIT.java | 201 ++++++++++++++
integration-test-common/build.gradle.kts | 1 +
.../test/util/JwksMockServerHelper.java | 196 ++++++++++++++
spark-connector/spark-common/build.gradle.kts | 1 +
.../test/authorization/SparkJwksAuthIT.java | 135 ++++++++++
.../authorization/SparkJwksAuthorizationIT.java | 289 +++++++++++++++++++++
.../authorization/SparkJwksAuthorizationIT33.java | 21 ++
.../authorization/SparkJwksAuthorizationIT34.java | 21 ++
.../authorization/SparkJwksAuthorizationIT35.java | 21 ++
10 files changed, 887 insertions(+)
diff --git a/clients/client-java/build.gradle.kts
b/clients/client-java/build.gradle.kts
index 9a4f74a5fb..11929c8fec 100644
--- a/clients/client-java/build.gradle.kts
+++ b/clients/client-java/build.gradle.kts
@@ -50,6 +50,7 @@ dependencies {
testImplementation(libs.awaitility)
testImplementation(libs.bundles.jersey)
testImplementation(libs.bundles.jwt)
+ testImplementation(libs.nimbus.jose.jwt)
testImplementation(libs.commons.lang3)
testImplementation(libs.hadoop3.client)
testImplementation(libs.junit.jupiter.api)
diff --git
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JwksTokenValidatorIT.java
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JwksTokenValidatorIT.java
new file mode 100644
index 0000000000..9ff4d93a08
--- /dev/null
+++
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JwksTokenValidatorIT.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test.authorization;
+
+import com.google.common.collect.Maps;
+import com.nimbusds.jose.jwk.RSAKey;
+import com.nimbusds.jose.jwk.gen.RSAKeyGenerator;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Map;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.client.GravitinoVersion;
+import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.gravitino.integration.test.util.JwksMockServerHelper;
+import org.apache.gravitino.integration.test.util.OAuthMockDataProvider;
+import org.apache.gravitino.server.authentication.OAuthConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for {@link
org.apache.gravitino.server.authentication.JwksTokenValidator}.
+ * Exercises the full {@code client_credentials → /token → Bearer → JWKS
validation} path.
+ */
+public class JwksTokenValidatorIT extends BaseIT {
+
+ private static final String SERVICE_AUDIENCE = "service1";
+ private static final String SUBJECT = "gravitino";
+ private static final String KEY_ID = "test-kid";
+ private static final String METALAKE_NAME = "jwks_auth_metalake";
+ private static final String ALICE = "alice";
+ private static final String BOB = "bob";
+
+ private static JwksMockServerHelper mockServerHelper;
+ private static RSAKey rsaKey;
+ private static volatile String currentToken;
+
+ private static String validToken;
+
+ @BeforeAll
+ @Override
+ public void startIntegrationTest() throws Exception {
+ mockServerHelper = JwksMockServerHelper.create(KEY_ID);
+ rsaKey = mockServerHelper.rsaKey();
+ validToken =
+ JwksMockServerHelper.mintToken(
+ rsaKey, SUBJECT, SERVICE_AUDIENCE,
Instant.now().plusSeconds(1_000_000));
+ currentToken = validToken;
+ mockServerHelper.setTokenSupplier(() -> currentToken);
+
+ Map<String, String> configs = Maps.newHashMap();
+ configs.put(Configs.AUTHENTICATORS.getKey(),
AuthenticatorType.OAUTH.name().toLowerCase());
+ configs.put(OAuthConfig.SERVICE_AUDIENCE.getKey(), SERVICE_AUDIENCE);
+ configs.put(
+ OAuthConfig.TOKEN_VALIDATOR_CLASS.getKey(),
+ "org.apache.gravitino.server.authentication.JwksTokenValidator");
+ configs.put(OAuthConfig.JWKS_URI.getKey(), mockServerHelper.jwksUri());
+ configs.put(OAuthConfig.PRINCIPAL_FIELDS.getKey(), "sub");
+ configs.put(OAuthConfig.ALLOW_SKEW_SECONDS.getKey(), "6");
+ configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), "true");
+ configs.put(Configs.SERVICE_ADMINS.getKey(), SUBJECT);
+ registerCustomConfigs(configs);
+
+
OAuthMockDataProvider.getInstance().setTokenData(validToken.getBytes(StandardCharsets.UTF_8));
+ super.startIntegrationTest();
+
+ client.createMetalake(METALAKE_NAME, "JWKS auth test metalake",
Maps.newHashMap());
+ client.loadMetalake(METALAKE_NAME).addUser(ALICE);
+ }
+
+ @AfterAll
+ @Override
+ public void stopIntegrationTest() throws IOException, InterruptedException {
+ if (mockServerHelper != null) {
+ mockServerHelper.close();
+ }
+ super.stopIntegrationTest();
+ }
+
+ @Test
+ public void testValidTokenAuthentication() throws Exception {
+ withToken(
+ validToken,
+ oauthClient -> {
+ GravitinoVersion version = oauthClient.serverVersion();
+ String projectVersion = System.getenv("PROJECT_VERSION");
+ if (projectVersion != null) {
+ Assertions.assertEquals(projectVersion, version.version());
+ }
+ Assertions.assertFalse(version.compileDate().isEmpty());
+ if (testMode.equals(ITUtils.EMBEDDED_TEST_MODE)) {
+ Assertions.assertEquals(readGitCommitIdFromGitFile(),
version.gitCommit());
+ }
+ });
+ }
+
+ @Test
+ public void testExpiredTokenFails() throws Exception {
+ String expiredToken =
+ JwksMockServerHelper.mintToken(
+ rsaKey, SUBJECT, SERVICE_AUDIENCE,
Instant.now().minusSeconds(3600));
+ withToken(
+ expiredToken,
+ badClient -> Assertions.assertThrows(RuntimeException.class,
badClient::serverVersion));
+ }
+
+ @Test
+ public void testWrongAudienceFails() throws Exception {
+ String wrongAudToken =
+ JwksMockServerHelper.mintToken(
+ rsaKey, SUBJECT, "wrong-audience",
Instant.now().plusSeconds(1_000_000));
+ withToken(
+ wrongAudToken,
+ badClient -> Assertions.assertThrows(RuntimeException.class,
badClient::serverVersion));
+ }
+
+ @Test
+ public void testTokenSignedWithDifferentKeyFails() throws Exception {
+ RSAKey differentKey = new
RSAKeyGenerator(2048).keyID("other-kid").generate();
+ String wrongKeyToken =
+ JwksMockServerHelper.mintToken(
+ differentKey, SUBJECT, SERVICE_AUDIENCE,
Instant.now().plusSeconds(1_000_000));
+ withToken(
+ wrongKeyToken,
+ badClient -> Assertions.assertThrows(RuntimeException.class,
badClient::serverVersion));
+ }
+
+ @Test
+ public void testPrivilegedUserCanAccessMetalake() throws Exception {
+ String aliceToken =
+ JwksMockServerHelper.mintToken(
+ rsaKey, ALICE, SERVICE_AUDIENCE,
Instant.now().plusSeconds(1_000_000));
+ withToken(
+ aliceToken,
+ aliceClient -> {
+ GravitinoMetalake ml = aliceClient.loadMetalake(METALAKE_NAME);
+ Assertions.assertNotNull(ml);
+ Assertions.assertEquals(METALAKE_NAME, ml.name());
+ });
+ }
+
+ @Test
+ public void testUnprivilegedUserForbidden() throws Exception {
+ String bobToken =
+ JwksMockServerHelper.mintToken(
+ rsaKey, BOB, SERVICE_AUDIENCE,
Instant.now().plusSeconds(1_000_000));
+ withToken(
+ bobToken,
+ bobClient ->
+ Assertions.assertThrows(
+ ForbiddenException.class, () ->
bobClient.loadMetalake(METALAKE_NAME)));
+ }
+
+ /** Swaps {@code currentToken}, builds a fresh provider+client, runs {@code
action}, restores. */
+ private void withToken(String token, ClientConsumer action) throws Exception
{
+ String previous = currentToken;
+ currentToken = token;
+ DefaultOAuth2TokenProvider provider =
+ DefaultOAuth2TokenProvider.builder()
+ .withUri(mockServerHelper.baseUri())
+ .withPath("token")
+ .withCredential("test-client:test-secret")
+ .withScope("openid")
+ .build();
+ try (GravitinoAdminClient tempClient =
+ GravitinoAdminClient.builder(serverUri).withOAuth(provider).build()) {
+ action.accept(tempClient);
+ } finally {
+ currentToken = previous;
+ }
+ }
+
+ @FunctionalInterface
+ private interface ClientConsumer {
+ void accept(GravitinoAdminClient client) throws Exception;
+ }
+}
diff --git a/integration-test-common/build.gradle.kts
b/integration-test-common/build.gradle.kts
index 7de6bf080b..5f7063496d 100644
--- a/integration-test-common/build.gradle.kts
+++ b/integration-test-common/build.gradle.kts
@@ -39,6 +39,7 @@ dependencies {
testImplementation(libs.bundles.jetty)
testImplementation(libs.bundles.jersey)
testImplementation(libs.bundles.jwt)
+ testImplementation(libs.nimbus.jose.jwt)
testImplementation(libs.bundles.log4j)
testImplementation(libs.commons.cli)
testImplementation(libs.commons.lang3)
diff --git
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/JwksMockServerHelper.java
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/JwksMockServerHelper.java
new file mode 100644
index 0000000000..136c4fd9e3
--- /dev/null
+++
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/JwksMockServerHelper.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.util;
+
+import com.nimbusds.jose.JWSAlgorithm;
+import com.nimbusds.jose.JWSHeader;
+import com.nimbusds.jose.crypto.RSASSASigner;
+import com.nimbusds.jose.jwk.JWKSet;
+import com.nimbusds.jose.jwk.RSAKey;
+import com.nimbusds.jose.jwk.gen.RSAKeyGenerator;
+import com.nimbusds.jwt.JWTClaimsSet;
+import com.nimbusds.jwt.SignedJWT;
+import com.sun.net.httpserver.HttpServer;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+
+/** In-process JWKS + OAuth2 token mock server for integration tests. */
+public class JwksMockServerHelper implements Closeable {
+
+ private final HttpServer httpServer;
+ private final RSAKey rsaKey;
+ private final int port;
+
+ @Nullable private volatile Supplier<String> tokenSupplier;
+ private final Map<String, String> userTokens = new ConcurrentHashMap<>();
+ @Nullable private volatile String fallbackToken;
+
+ private JwksMockServerHelper(HttpServer httpServer, RSAKey rsaKey, int port)
{
+ this.httpServer = httpServer;
+ this.rsaKey = rsaKey;
+ this.port = port;
+ }
+
+ /** Returns the RSA key pair used for signing JWTs. */
+ public RSAKey rsaKey() {
+ return rsaKey;
+ }
+
+ /** Returns the port the mock server is listening on. */
+ public int port() {
+ return port;
+ }
+
+ /** Returns the base URI of the mock server, e.g. {@code
http://localhost:12345}. */
+ public String baseUri() {
+ return "http://localhost:" + port;
+ }
+
+ /** Returns the JWKS endpoint URI, e.g. {@code http://localhost:12345/jwks}.
*/
+ public String jwksUri() {
+ return baseUri() + "/jwks";
+ }
+
+ /** Configures single-token mode: every {@code /token} request returns the
supplier's token. */
+ public void setTokenSupplier(Supplier<String> supplier) {
+ this.tokenSupplier = supplier;
+ }
+
+ /** Registers a pre-minted JWT for a specific {@code client_id} (multi-user
mode). */
+ public void registerUserToken(String clientId, String token) {
+ userTokens.put(clientId, token);
+ }
+
+ /** Sets the fallback token when {@code client_id} is not found in the user
token map. */
+ public void setFallbackToken(String token) {
+ this.fallbackToken = token;
+ }
+
+ /** Stops the mock HTTP server. */
+ @Override
+ public void close() throws IOException {
+ httpServer.stop(0);
+ }
+
+ /** Mints a compact RS256-signed JWT using this helper's RSA key. */
+ public String mintToken(String subject, String audience, Instant expiry)
throws Exception {
+ return mintToken(rsaKey, subject, audience, expiry);
+ }
+
+ /** Mints a compact RS256-signed JWT using the specified RSA key. */
+ @SuppressWarnings("JavaUtilDate")
+ public static String mintToken(RSAKey key, String subject, String audience,
Instant expiry)
+ throws Exception {
+ JWTClaimsSet claims =
+ new JWTClaimsSet.Builder()
+ .subject(subject)
+ .audience(audience)
+ .issueTime(Date.from(Instant.now()))
+ .expirationTime(Date.from(expiry))
+ .build();
+ SignedJWT jwt =
+ new SignedJWT(
+ new
JWSHeader.Builder(JWSAlgorithm.RS256).keyID(key.getKeyID()).build(), claims);
+ jwt.sign(new RSASSASigner(key));
+ return jwt.serialize();
+ }
+
+ /** Builds the JSON body returned by the mock {@code /token} endpoint. */
+ public static String buildTokenResponseJson(String accessToken) {
+ return "{\"access_token\":\""
+ + accessToken
+ + "\",\"token_type\":\"bearer\",\"expires_in\":86400}";
+ }
+
+ /** Parses a single parameter from a URL-encoded form body. */
+ static String parseFormParam(String body, String key) {
+ for (String pair : body.split("&")) {
+ String[] kv = pair.split("=", 2);
+ if (kv.length == 2 && URLDecoder.decode(kv[0],
StandardCharsets.UTF_8).equals(key)) {
+ return URLDecoder.decode(kv[1], StandardCharsets.UTF_8);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates and starts a new mock server with a freshly generated RSA key
pair on a random port.
+ */
+ public static JwksMockServerHelper create(String keyId) throws Exception {
+ RSAKey rsaKey = new RSAKeyGenerator(2048).keyID(keyId).generate();
+ String jwksJson = new JWKSet(rsaKey.toPublicJWK()).toString();
+
+ HttpServer server = HttpServer.create(new InetSocketAddress(0), 0);
+
+ JwksMockServerHelper helper =
+ new JwksMockServerHelper(server, rsaKey,
server.getAddress().getPort());
+
+ // /jwks – serves the JWKS public key document.
+ server.createContext(
+ "/jwks",
+ exchange -> {
+ byte[] body = jwksJson.getBytes(StandardCharsets.UTF_8);
+ exchange.getResponseHeaders().set("Content-Type",
"application/json");
+ exchange.sendResponseHeaders(200, body.length);
+ exchange.getResponseBody().write(body);
+ exchange.getResponseBody().close();
+ });
+
+ // /token – mock OAuth2 client_credentials endpoint.
+ server.createContext(
+ "/token",
+ exchange -> {
+ String postBody =
+ new String(exchange.getRequestBody().readAllBytes(),
StandardCharsets.UTF_8);
+
+ String token;
+ Supplier<String> supplier = helper.tokenSupplier;
+ if (supplier != null) {
+ // Single-token mode: use the supplier.
+ token = supplier.get();
+ } else {
+ // Multi-user mode: look up client_id.
+ String clientId = parseFormParam(postBody, "client_id");
+ String fb = helper.fallbackToken;
+ token =
+ clientId != null && helper.userTokens.containsKey(clientId)
+ ? helper.userTokens.get(clientId)
+ : (fb != null ? fb : "");
+ }
+
+ byte[] resp =
buildTokenResponseJson(token).getBytes(StandardCharsets.UTF_8);
+ exchange.getResponseHeaders().set("Content-Type",
"application/json");
+ exchange.sendResponseHeaders(200, resp.length);
+ exchange.getResponseBody().write(resp);
+ exchange.getResponseBody().close();
+ });
+
+ server.start();
+ return helper;
+ }
+}
diff --git a/spark-connector/spark-common/build.gradle.kts
b/spark-connector/spark-common/build.gradle.kts
index 6727387f3c..80c6ea8faa 100644
--- a/spark-connector/spark-common/build.gradle.kts
+++ b/spark-connector/spark-common/build.gradle.kts
@@ -121,6 +121,7 @@ dependencies {
}
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
+ testImplementation(libs.nimbus.jose.jwt)
testImplementation(libs.mysql.driver)
testImplementation(libs.postgresql.driver)
testImplementation(libs.testcontainers)
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthIT.java
new file mode 100644
index 0000000000..88975e468e
--- /dev/null
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthIT.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Map;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthProperties;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.JwksMockServerHelper;
+import org.apache.gravitino.integration.test.util.OAuthMockDataProvider;
+import org.apache.gravitino.server.authentication.OAuthConfig;
+import org.apache.gravitino.spark.connector.GravitinoSparkConfig;
+import org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * End-to-end integration test for Spark OAuth2 authentication via JWKS.
Exercises the full path:
+ * Spark plugin → {@code client_credentials} → JWT fetch → Gravitino REST →
JWKS validation.
+ */
+public class SparkJwksAuthIT extends BaseIT {
+
+ private static final String SERVICE_AUDIENCE = "service1";
+ private static final String METALAKE = "jwks_test_metalake";
+ private static final String KEY_ID = "test-kid";
+
+ private static JwksMockServerHelper mockServerHelper;
+
+ private static String validToken;
+ private static SparkSession sparkSession;
+
+ @BeforeAll
+ @Override
+ public void startIntegrationTest() throws Exception {
+ mockServerHelper = JwksMockServerHelper.create(KEY_ID);
+ validToken =
+ mockServerHelper.mintToken(
+ "gravitino", SERVICE_AUDIENCE,
Instant.now().plusSeconds(1_000_000));
+ mockServerHelper.setTokenSupplier(() -> validToken);
+
+ Map<String, String> configs = Maps.newHashMap();
+ configs.put(Configs.AUTHENTICATORS.getKey(),
AuthenticatorType.OAUTH.name().toLowerCase());
+ configs.put(OAuthConfig.SERVICE_AUDIENCE.getKey(), SERVICE_AUDIENCE);
+ configs.put(
+ OAuthConfig.TOKEN_VALIDATOR_CLASS.getKey(),
+ "org.apache.gravitino.server.authentication.JwksTokenValidator");
+ configs.put(OAuthConfig.JWKS_URI.getKey(), mockServerHelper.jwksUri());
+ configs.put(OAuthConfig.PRINCIPAL_FIELDS.getKey(), "sub");
+ configs.put(OAuthConfig.ALLOW_SKEW_SECONDS.getKey(), "6");
+ registerCustomConfigs(configs);
+
+
OAuthMockDataProvider.getInstance().setTokenData(validToken.getBytes(StandardCharsets.UTF_8));
+ super.startIntegrationTest();
+
+ client.createMetalake(METALAKE, "JWKS auth test metalake",
Maps.newHashMap());
+
+ SparkConf sparkConf =
+ new SparkConf()
+ .set("spark.plugins", GravitinoSparkPlugin.class.getName())
+ .set(GravitinoSparkConfig.GRAVITINO_URI, serverUri)
+ .set(GravitinoSparkConfig.GRAVITINO_METALAKE, METALAKE)
+ .set(GravitinoSparkConfig.GRAVITINO_AUTH_TYPE,
AuthProperties.OAUTH2_AUTH_TYPE)
+ .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_URI,
mockServerHelper.baseUri())
+ .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_PATH, "token")
+ .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_CREDENTIAL,
"test-client:test-secret")
+ .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_SCOPE, "openid");
+
+ sparkSession =
+ SparkSession.builder()
+ .master("local[1]")
+ .appName("SparkJwksAuthIT")
+ .config(sparkConf)
+ .getOrCreate();
+ }
+
+ @AfterAll
+ @Override
+ public void stopIntegrationTest() throws IOException, InterruptedException {
+ if (sparkSession != null) {
+ sparkSession.close();
+ }
+ if (mockServerHelper != null) {
+ mockServerHelper.close();
+ }
+ super.stopIntegrationTest();
+ }
+
+ @Test
+ public void testSparkConnectsWithJwksAuth() throws Exception {
+ // Trigger the Spark plugin's OAuth handshake with Gravitino on the first
SQL call.
+ Assertions.assertDoesNotThrow(() -> sparkSession.sql("SHOW
CATALOGS").collect());
+
+ // Verify the full client_credentials → JWKS validation path via a direct
Gravitino client.
+ DefaultOAuth2TokenProvider oauthProvider =
+ DefaultOAuth2TokenProvider.builder()
+ .withUri(mockServerHelper.baseUri())
+ .withPath("token")
+ .withCredential("test-client:test-secret")
+ .withScope("openid")
+ .build();
+ try (GravitinoAdminClient oauthClient =
+
GravitinoAdminClient.builder(serverUri).withOAuth(oauthProvider).build()) {
+ GravitinoMetalake metalake = oauthClient.loadMetalake(METALAKE);
+ Assertions.assertEquals(METALAKE, metalake.name());
+ }
+ }
+}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT.java
new file mode 100644
index 0000000000..83c1840ce7
--- /dev/null
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.auth.AuthProperties;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.JwksMockServerHelper;
+import org.apache.gravitino.integration.test.util.OAuthMockDataProvider;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.apache.gravitino.server.authentication.OAuthConfig;
+import org.apache.gravitino.spark.connector.GravitinoSparkConfig;
+import org.apache.gravitino.spark.connector.jdbc.JdbcPropertiesConstants;
+import org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end Docker integration test for Spark OAuth2 authentication and
authorization via JWKS.
+ * User identity is derived solely from the JWT {@code sub} claim.
+ */
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public abstract class SparkJwksAuthorizationIT extends BaseIT {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkJwksAuthorizationIT.class);
+
+ private static final String GRAVITINO_ADMIN = "gravitino";
+ private static final String ALICE = "alice";
+ private static final String BOB = "bob";
+
+ private static final String ALICE_CREDENTIAL = ALICE + ":alice-secret";
+ private static final String BOB_CREDENTIAL = BOB + ":bob-secret";
+
+ private static final String SERVICE_AUDIENCE = "service1";
+ private static final String KEY_ID = "test-kid";
+ private static final String METALAKE = "jwks_authz_metalake";
+ private static final String CATALOG = "jdbc_catalog";
+ private static final String SCHEMA = "jdbc_schema";
+ private static final String ALICE_TABLE = "alice_table";
+ private static final String ALICE_ROLE = "alice-role";
+ private static final String BOB_ROLE = "bob-role";
+
+ protected final ContainerSuite containerSuite = ContainerSuite.getInstance();
+
+ private static JwksMockServerHelper mockServerHelper;
+
+ private static String gravitinoUri;
+
+ private static SparkSession aliceSparkSession;
+ private static GravitinoAdminClient bobClient;
+
+ private String mysqlUrl;
+ private String mysqlUsername;
+ private String mysqlPassword;
+ private String mysqlDriver;
+
+ @BeforeAll
+ @Override
+ public void startIntegrationTest() throws Exception {
+ mockServerHelper = JwksMockServerHelper.create(KEY_ID);
+
+ Instant farFuture = Instant.now().plusSeconds(1_000_000);
+ String adminToken = mockServerHelper.mintToken(GRAVITINO_ADMIN,
SERVICE_AUDIENCE, farFuture);
+ String aliceToken = mockServerHelper.mintToken(ALICE, SERVICE_AUDIENCE,
farFuture);
+ String bobToken = mockServerHelper.mintToken(BOB, SERVICE_AUDIENCE,
farFuture);
+
+ mockServerHelper.registerUserToken(GRAVITINO_ADMIN, adminToken);
+ mockServerHelper.registerUserToken(ALICE, aliceToken);
+ mockServerHelper.registerUserToken(BOB, bobToken);
+ mockServerHelper.setFallbackToken(adminToken);
+ LOG.info("Mock JWKS+token server started on port {}",
mockServerHelper.port());
+
+ Map<String, String> configs = Maps.newHashMap();
+ configs.put(Configs.AUTHENTICATORS.getKey(),
AuthenticatorType.OAUTH.name().toLowerCase());
+ configs.put(OAuthConfig.SERVICE_AUDIENCE.getKey(), SERVICE_AUDIENCE);
+ configs.put(
+ OAuthConfig.TOKEN_VALIDATOR_CLASS.getKey(),
+ "org.apache.gravitino.server.authentication.JwksTokenValidator");
+ configs.put(OAuthConfig.JWKS_URI.getKey(), mockServerHelper.jwksUri());
+ configs.put(OAuthConfig.PRINCIPAL_FIELDS.getKey(), "sub");
+ configs.put(OAuthConfig.ALLOW_SKEW_SECONDS.getKey(), "6");
+ configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), "true");
+ configs.put(Configs.SERVICE_ADMINS.getKey(), GRAVITINO_ADMIN);
+ configs.put(Configs.CACHE_ENABLED.getKey(), "false");
+ registerCustomConfigs(configs);
+
+
OAuthMockDataProvider.getInstance().setTokenData(adminToken.getBytes(StandardCharsets.UTF_8));
+
+ initMysqlContainer();
+ super.startIntegrationTest();
+ gravitinoUri = String.format("http://127.0.0.1:%d",
getGravitinoServerPort());
+
+ initMetadata();
+
+ aliceSparkSession = buildSparkSession(ALICE_CREDENTIAL);
+ bobClient = buildGravitinoClient(BOB_CREDENTIAL);
+ }
+
+ @AfterAll
+ @Override
+ public void stopIntegrationTest() throws IOException, InterruptedException {
+ if (aliceSparkSession != null) {
+ aliceSparkSession.stop();
+ }
+ if (bobClient != null) {
+ bobClient.close();
+ }
+ if (mockServerHelper != null) {
+ mockServerHelper.close();
+ }
+ super.stopIntegrationTest();
+ }
+
+ private void initMysqlContainer() throws SQLException {
+
containerSuite.startMySQLContainer(TestDatabaseName.MYSQL_CATALOG_MYSQL_IT);
+ mysqlUrl = containerSuite.getMySQLContainer().getJdbcUrl();
+ mysqlUsername = containerSuite.getMySQLContainer().getUsername();
+ mysqlPassword = containerSuite.getMySQLContainer().getPassword();
+ mysqlDriver =
+ containerSuite
+ .getMySQLContainer()
+ .getDriverClassName(TestDatabaseName.MYSQL_CATALOG_MYSQL_IT);
+ }
+
+ private void initMetadata() {
+ client.createMetalake(METALAKE, "", new HashMap<>());
+ GravitinoMetalake metalake = client.loadMetalake(METALAKE);
+ metalake.addUser(ALICE);
+ metalake.addUser(BOB);
+
+ Map<String, String> props = Maps.newHashMap();
+ props.put(JdbcPropertiesConstants.GRAVITINO_JDBC_URL, mysqlUrl);
+ props.put(JdbcPropertiesConstants.GRAVITINO_JDBC_USER, mysqlUsername);
+ props.put(JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD, mysqlPassword);
+ props.put(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER, mysqlDriver);
+ Catalog catalog =
+ metalake.createCatalog(CATALOG, Catalog.Type.RELATIONAL, "jdbc-mysql",
"", props);
+ catalog.asSchemas().createSchema(SCHEMA, "", new HashMap<>());
+
+ SecurableObject catalogAccess =
+ SecurableObjects.ofCatalog(
+ CATALOG,
+ ImmutableList.of(
+ Privileges.UseCatalog.allow(),
+ Privileges.UseSchema.allow(),
+ Privileges.CreateTable.allow(),
+ Privileges.SelectTable.allow()));
+ metalake.createRole(ALICE_ROLE, new HashMap<>(),
ImmutableList.of(catalogAccess));
+ metalake.grantRolesToUser(ImmutableList.of(ALICE_ROLE), ALICE);
+ }
+
+ @Test
+ @Order(1)
+ public void testAliceCreatesTableViaSparkOAuth() {
+ aliceSparkSession.sql("USE " + CATALOG + "." + SCHEMA);
+ aliceSparkSession.sql("CREATE TABLE " + ALICE_TABLE + " (id INT, name
STRING)");
+
+ List<Row> tables = aliceSparkSession.sql("SHOW TABLES").collectAsList();
+ Assertions.assertTrue(
+ tables.stream().anyMatch(r ->
ALICE_TABLE.equalsIgnoreCase(r.getString(1))),
+ "Alice's Spark session should see the table she created");
+
+ GravitinoMetalake adminMetalake = client.loadMetalake(METALAKE);
+ Optional<Owner> tableOwner =
+ adminMetalake.getOwner(
+ MetadataObjects.of(
+ ImmutableList.of(CATALOG, SCHEMA, ALICE_TABLE),
MetadataObject.Type.TABLE));
+ Assertions.assertTrue(tableOwner.isPresent(), "Table should have an owner
recorded");
+ Assertions.assertEquals(ALICE, tableOwner.get().name(), "Table owner
should be Alice");
+ LOG.info("Alice created '{}' via Spark OAuth2 (sub=alice)", ALICE_TABLE);
+ }
+
+ @Test
+ @Order(2)
+ public void testBobCannotAccessCatalogBeforeGrant() {
+ GravitinoMetalake bobMetalake = bobClient.loadMetalake(METALAKE);
+ Assertions.assertThrows(
+ ForbiddenException.class,
+ () -> bobMetalake.loadCatalog(CATALOG),
+ "Bob should be denied catalog access before grant");
+ LOG.info("Bob correctly received ForbiddenException before grant
(sub=bob)");
+ }
+
+ @Test
+ @Order(3)
+ public void testBobAccessCatalogAfterGrant() {
+ GravitinoMetalake adminMetalake = client.loadMetalake(METALAKE);
+ SecurableObject bobCatalogAccess =
+ SecurableObjects.ofCatalog(
+ CATALOG,
+ ImmutableList.of(
+ Privileges.UseCatalog.allow(),
+ Privileges.UseSchema.allow(),
+ Privileges.SelectTable.allow()));
+ adminMetalake.createRole(BOB_ROLE, new HashMap<>(),
ImmutableList.of(bobCatalogAccess));
+ adminMetalake.grantRolesToUser(ImmutableList.of(BOB_ROLE), BOB);
+
+ GravitinoMetalake bobMetalake = bobClient.loadMetalake(METALAKE);
+ Catalog catalog = bobMetalake.loadCatalog(CATALOG);
+ Assertions.assertNotNull(catalog);
+
+ boolean tableExists =
+ catalog.asTableCatalog().tableExists(NameIdentifier.of(SCHEMA,
ALICE_TABLE));
+ Assertions.assertTrue(
+ tableExists, "Bob should see the table Alice created after being
granted access");
+ LOG.info("Bob's JWT-authenticated client sees '{}' after grant (sub=bob)",
ALICE_TABLE);
+ }
+
+ private SparkSession buildSparkSession(String credential) {
+ SparkConf conf =
+ new SparkConf()
+ .set("spark.plugins", GravitinoSparkPlugin.class.getName())
+ .set(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri)
+ .set(GravitinoSparkConfig.GRAVITINO_METALAKE, METALAKE)
+ .set(GravitinoSparkConfig.GRAVITINO_AUTH_TYPE,
AuthProperties.OAUTH2_AUTH_TYPE)
+ .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_URI,
mockServerHelper.baseUri())
+ .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_PATH, "token")
+ .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_CREDENTIAL, credential)
+ .set(GravitinoSparkConfig.GRAVITINO_OAUTH2_SCOPE, "openid");
+ return SparkSession.builder()
+ .master("local[1]")
+ .appName("SparkJwksAuthorizationIT")
+ .config(conf)
+ .getOrCreate();
+ }
+
+ private GravitinoAdminClient buildGravitinoClient(String credential) {
+ DefaultOAuth2TokenProvider provider =
+ DefaultOAuth2TokenProvider.builder()
+ .withUri(mockServerHelper.baseUri())
+ .withPath("token")
+ .withCredential(credential)
+ .withScope("openid")
+ .build();
+ return
GravitinoAdminClient.builder(gravitinoUri).withOAuth(provider).build();
+ }
+}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT33.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT33.java
new file mode 100644
index 0000000000..2ec8cdd566
--- /dev/null
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT33.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+public class SparkJwksAuthorizationIT33 extends SparkJwksAuthorizationIT {}
diff --git
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT34.java
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT34.java
new file mode 100644
index 0000000000..5c7563d560
--- /dev/null
+++
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT34.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+public class SparkJwksAuthorizationIT34 extends SparkJwksAuthorizationIT {}
diff --git
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT35.java
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT35.java
new file mode 100644
index 0000000000..b99c11503d
--- /dev/null
+++
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkJwksAuthorizationIT35.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.authorization;
+
+public class SparkJwksAuthorizationIT35 extends SparkJwksAuthorizationIT {}