This is an automated email from the ASF dual-hosted git repository.

yuqi1129 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new c4259fccdc [#10559] feat(trino-connector): Forward Trino session 
credentials to Gravitino server (#10730)
c4259fccdc is described below

commit c4259fccdc35aae856a47100357a68a6b7d4cb80
Author: Yuhui <[email protected]>
AuthorDate: Tue Apr 21 11:39:54 2026 +0800

    [#10559] feat(trino-connector): Forward Trino session credentials to 
Gravitino server (#10730)
    
    ### What changes were proposed in this pull request?
    
    Add per-query credential forwarding from Trino to Gravitino via a new
    `gravitino.client.session.forwardUser` option. When enabled, the
    connector forwards the Trino session identity (username or Bearer token)
    to Gravitino on every request instead of using static credentials.
    
    ### Why are the changes needed?
    
    Fix: #10559
    
    In a multi-user deployment, each Trino user should authenticate with
    their own identity so Gravitino can apply per-user authorization
    policies.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New connector property: `gravitino.client.session.forwardUser=true` —
    forwards the Trino session identity to Gravitino per request. Behavior
    depends on `authType`: forwards the session username for `simple`, or a
    Bearer token from extra credentials for `oauth2`.
    
    ### How was this patch tested?
    
    - Unit tests: `TestGravitinoAuthProvider`
    - Manual integration test: verified session credentials correctly
    forwarded, no cross-contamination under concurrent queries
---
 docs/trino-connector/authentication.md             |  28 +++-
 .../init/trino/config/catalog/gravitino.properties |   2 +
 .../integration/test/TrinoQueryTestTool.java       |  26 ++++
 .../trino-test-tools/trino_integration_test.sh     |  61 ++++++++-
 .../gravitino/trino/connector/GravitinoConfig.java |  55 ++++++++
 .../trino/connector/GravitinoConnector.java        | 113 ++++++++++++++-
 .../trino/connector/GravitinoMetadata.java         |   1 -
 .../connector/catalog/CatalogConnectorContext.java |  52 +++++--
 .../connector/catalog/CatalogConnectorManager.java |  11 +-
 .../GravitinoAuthProvider.java                     | 151 ++++++++++++++++-----
 .../TestGravitinoConnectorForwardUser.java         | 118 ++++++++++++++++
 .../TestGravitinoConnectorNullChecks.java          |  11 ++
 .../TestGravitinoAuthProvider.java                 |  64 ++++++++-
 13 files changed, 641 insertions(+), 52 deletions(-)

diff --git a/docs/trino-connector/authentication.md 
b/docs/trino-connector/authentication.md
index 262cc2455c..c4f0a77c26 100644
--- a/docs/trino-connector/authentication.md
+++ b/docs/trino-connector/authentication.md
@@ -31,7 +31,7 @@ gravitino.user=admin
 
 | Property                          | Description                              
                            | Default value    | Required                       
        | Since version |
 
|-----------------------------------|----------------------------------------------------------------------|------------------|----------------------------------------|---------------|
-| `gravitino.client.authType`       | Authentication type: `simple`, `oauth2`, 
or `kerberos`              | (none)           | No                              
       | 1.3.0         |
+| `gravitino.client.authType`       | Authentication type: `simple`, `oauth2`, 
or `kerberos`                 | (none)        | No                              
       | 1.3.0         |
 | `gravitino.user`                  | Username for simple authentication       
                            | (none)           | No (uses system user if not 
specified) | 1.3.0         |
 
 ### OAuth2 Authentication
@@ -57,7 +57,7 @@ gravitino.client.oauth2.scope=gravitino
 
 | Property                             | Description                           
                 | Default value | Required                    | Since version |
 
|--------------------------------------|--------------------------------------------------------|---------------|-----------------------------|---------------|
-| `gravitino.client.authType`          | Authentication type: `simple`, 
`oauth2`, or `kerberos` | (none)        | Yes (to enable OAuth2)      | 1.3.0   
      |
+| `gravitino.client.authType`          | Authentication type: `simple`, 
`oauth2`, or `kerberos` | (none)        | Yes (to enable OAuth2)           | 
1.3.0         |
 | `gravitino.client.oauth2.serverUri`  | OAuth2 server URI                     
                 | (none)        | Yes if authType is `oauth2` | 1.3.0         |
 | `gravitino.client.oauth2.credential` | OAuth2 credentials in format 
`client_id:client_secret` | (none)        | Yes if authType is `oauth2` | 1.3.0 
        |
 | `gravitino.client.oauth2.path`       | OAuth2 token endpoint path            
                 | (none)        | Yes if authType is `oauth2` | 1.3.0         |
@@ -88,6 +88,7 @@ gravitino.client.kerberos.keytabFilePath=/path/to/user.keytab
 | `gravitino.client.kerberos.principal`      | Kerberos principal  | (none)    
    | Yes if authType is `kerberos`           | 1.3.0         |
 | `gravitino.client.kerberos.keytabFilePath` | Path to keytab file | (none)    
    | No (uses ticket cache if not specified) | 1.3.0         |
 
+
 ### Example: Connecting to OAuth-protected Gravitino Server
 
 This example shows how to configure the Trino connector to connect to a 
Gravitino server protected by OAuth authentication.
@@ -123,6 +124,29 @@ gravitino.client.oauth2.scope=test
 SHOW CATALOGS;
 ```
 
+### Session Credential Forwarding
+
+Setting `gravitino.client.session.forwardUser=true` with `authType=simple` 
creates a dedicated Gravitino client per Trino session user, so each user is 
visible in the Gravitino audit log instead of the shared `gravitino.user`.
+
+**Configuration:**
+
+```properties
+connector.name=gravitino
+gravitino.metalake=metalake
+gravitino.uri=http://localhost:8090
+
+gravitino.client.authType=simple
+gravitino.client.session.forwardUser=true
+```
+
+**Configuration properties:**
+
+| Property                                                   | Description     
                                                                           | 
Default value | Required | Since version |
+|------------------------------------------------------------|--------------------------------------------------------------------------------------------|---------------|----------|---------------|
+| `gravitino.client.session.forwardUser`                     | When `true` 
with `authType=simple`, forwards the Trino session user to Gravitino per-query 
| `false`       | No       | 1.3.0         |
+| `gravitino.client.session.cache.maxSize`                   | Maximum number 
of per-user sessions to keep in the cache                                   | 
`500`         | No       | 1.3.0         |
+| `gravitino.client.session.cache.expireAfterAccessSeconds`  | Seconds before 
an idle per-user session is evicted from the cache                          | 
`3600`        | No       | 1.3.0         |
+
 ### Notes
 
 - The Gravitino server must be configured with the corresponding 
authentication mechanism enabled.
diff --git 
a/integration-test-common/docker-script/init/trino/config/catalog/gravitino.properties
 
b/integration-test-common/docker-script/init/trino/config/catalog/gravitino.properties
index d84f93364c..08789c84c0 100644
--- 
a/integration-test-common/docker-script/init/trino/config/catalog/gravitino.properties
+++ 
b/integration-test-common/docker-script/init/trino/config/catalog/gravitino.properties
@@ -21,3 +21,5 @@ connector.name = gravitino
 gravitino.uri = http://GRAVITINO_HOST_IP:GRAVITINO_HOST_PORT
 gravitino.metalake = GRAVITINO_METALAKE_NAME
 gravitino.trino.skip-version-validation=true
+gravitino.client.authType = simple
+gravitino.client.session.forwardUser = true
diff --git 
a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java
 
b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java
index 7078107220..a3823847b7 100644
--- 
a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java
+++ 
b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.trino.connector.integration.test;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -102,6 +103,11 @@ public class TrinoQueryTestTool {
               + "The JAR file under ${trino_connector_dir} will be copied into 
the test image, "
               + "the default value is 
${project_root}/trino-connector/trino-connector/build/libs.");
 
+      options.addOption(
+          "env_only",
+          false,
+          "Start the environment (Gravitino + Trino) and keep it running for 
manual testing. Press Ctrl+C to shutdown.");
+
       options.addOption("help", false, "Print this help message");
 
       CommandLineParser parser = new PosixParser();
@@ -254,6 +260,26 @@ public class TrinoQueryTestTool {
         return;
       }
 
+      if (commandLine.hasOption("env_only")) {
+        CountDownLatch shutdownLatch = new CountDownLatch(1);
+        Runtime.getRuntime().addShutdownHook(new 
Thread(shutdownLatch::countDown));
+
+        
System.out.println("=======================================================");
+        System.out.println("Environment is ready for manual testing.");
+        System.out.println("  Gravitino URI : " + 
TrinoQueryITBase.gravitinoUri);
+        System.out.println("  Trino URI     : " + TrinoQueryITBase.trinoUri);
+        System.out.println("Connect to Trino CLI:");
+        System.out.println("  docker exec -it trino-ci-trino trino");
+        System.out.println("Press Ctrl+C to shutdown the environment.");
+        
System.out.println("=======================================================");
+        try {
+          shutdownLatch.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        return;
+      }
+
       if (testSet == null) {
         testerRunner.testSql();
       } else {
diff --git 
a/trino-connector/integration-test/trino-test-tools/trino_integration_test.sh 
b/trino-connector/integration-test/trino-test-tools/trino_integration_test.sh
index 54032704c6..7c45d79df6 100755
--- 
a/trino-connector/integration-test/trino-test-tools/trino_integration_test.sh
+++ 
b/trino-connector/integration-test/trino-test-tools/trino_integration_test.sh
@@ -26,6 +26,35 @@ export HADOOP_USER_NAME=anonymous
 echo $GRAVITINO_ROOT_DIR
 cd $GRAVITINO_ROOT_DIR
 
+PID_FILE="$GRAVITINO_ROOT_DIR/integration-test-common/build/trino-test-env.pid"
+
+if [ "$1" = "--stop" ]; then
+  if [ -f "$PID_FILE" ]; then
+    GRADLE_PID=$(cat "$PID_FILE")
+    if kill -0 "$GRADLE_PID" 2>/dev/null; then
+      PGID=$(ps -o pgid= -p "$GRADLE_PID" 2>/dev/null | tr -d ' ')
+      if [[ -n "$PGID" && "$PGID" =~ ^[0-9]+$ ]]; then
+        echo "Stopping environment (PGID: $PGID)..."
+        kill -TERM -- "-$PGID" 2>/dev/null
+        for i in $(seq 1 60); do
+          kill -0 "$GRADLE_PID" 2>/dev/null || break
+          sleep 1
+        done
+      else
+        echo "Could not determine process group for PID $GRADLE_PID, stopping 
containers directly..."
+      fi
+    else
+      echo "Environment process ($GRADLE_PID) is no longer running, stopping 
containers directly..."
+    fi
+    "$GRAVITINO_ROOT_DIR/integration-test-common/docker-script/shutdown.sh"
+    rm -f "$PID_FILE"
+  else
+    echo "No running environment found, stopping containers directly..."
+    "$GRAVITINO_ROOT_DIR/integration-test-common/docker-script/shutdown.sh"
+  fi
+  exit 0
+fi
+
 # Parse --auto_patch and --trino_version from arguments.
 # --auto_patch is consumed here and not forwarded to Gradle.
 # --trino_version is forwarded to Gradle and also used for patch selection.
@@ -106,4 +135,34 @@ if [ "$auto_patch" = true ]; then
     fi
 fi
 
-./gradlew :trino-connector:integration-test:TrinoTest -PappArgs="\"$args\""
+if echo "$args" | grep -q -- '--env_only'; then
+  
LOG_FILE="$GRAVITINO_ROOT_DIR/integration-test-common/build/trino-test-env.log"
+  mkdir -p "$(dirname "$PID_FILE")" "$(dirname "$LOG_FILE")"
+
+  ./gradlew :trino-connector:integration-test:TrinoTest -PappArgs="\"$args\"" 
> "$LOG_FILE" 2>&1 &
+  GRADLE_PID=$!
+  echo $GRADLE_PID > "$PID_FILE"
+
+  echo "Starting environment, please wait..."
+  while kill -0 "$GRADLE_PID" 2>/dev/null; do
+    if grep -q "Press Ctrl+C to shutdown" "$LOG_FILE" 2>/dev/null; then
+      break
+    fi
+    sleep 1
+  done
+
+  if ! kill -0 "$GRADLE_PID" 2>/dev/null; then
+    echo "Environment failed to start. Check logs at: $LOG_FILE"
+    rm -f "$PID_FILE"
+    exit 1
+  fi
+
+  grep -A 6 "=======================================================" 
"$LOG_FILE" | head -8
+  echo ""
+  echo "Environment is running in background (PID: $GRADLE_PID)"
+  echo "Logs  : $LOG_FILE"
+  echo "Stop  : 
./trino-connector/integration-test/trino-test-tools/trino_integration_test.sh 
--stop"
+  exit 0
+else
+  ./gradlew :trino-connector:integration-test:TrinoTest -PappArgs="\"$args\""
+fi
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
index f32d5dc98d..09629ccf42 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.trino.connector.security.GravitinoAuthProvider;
 
 /** Gravitino config. */
 public class GravitinoConfig {
@@ -151,6 +152,20 @@ public class GravitinoConfig {
           "",
           false);
 
+  private static final ConfigEntry GRAVITINO_SESSION_CACHE_MAX_SIZE =
+      new ConfigEntry(
+          GravitinoAuthProvider.SESSION_CACHE_MAX_SIZE_KEY,
+          "Maximum number of per-user sessions to keep in the cache when 
session.forwardUser=true",
+          "500",
+          false);
+
+  private static final ConfigEntry 
GRAVITINO_SESSION_CACHE_EXPIRE_AFTER_ACCESS_SECONDS =
+      new ConfigEntry(
+          GravitinoAuthProvider.SESSION_CACHE_EXPIRE_AFTER_ACCESS_SECONDS_KEY,
+          "Seconds before an idle per-user session is evicted from the cache 
when session.forwardUser=true",
+          "3600",
+          false);
+
   /**
    * Constructs a new GravitinoConfig with the specified configuration.
    *
@@ -389,6 +404,46 @@ public class GravitinoConfig {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Returns whether Trino session user forwarding is enabled.
+   *
+   * @return true if forwardUser is set to true
+   */
+  public boolean isForwardUser() {
+    return Boolean.parseBoolean(
+        config.getOrDefault(GravitinoAuthProvider.FORWARD_SESSION_USER_KEY, 
"false"));
+  }
+
+  /**
+   * Retrieves the maximum number of per-user sessions to keep in the cache.
+   *
+   * @return the session cache maximum size
+   */
+  public long getSessionCacheMaxSize() {
+    return parseLongConfigEntry(GRAVITINO_SESSION_CACHE_MAX_SIZE);
+  }
+
+  /**
+   * Retrieves the expiry (in seconds) for idle per-user sessions in the cache.
+   *
+   * @return the session cache expiry in seconds
+   */
+  public long getSessionCacheExpireAfterAccessSeconds() {
+    return 
parseLongConfigEntry(GRAVITINO_SESSION_CACHE_EXPIRE_AFTER_ACCESS_SECONDS);
+  }
+
+  private long parseLongConfigEntry(ConfigEntry entry) {
+    String value = config.getOrDefault(entry.key, entry.defaultValue);
+    try {
+      return Long.parseLong(value);
+    } catch (NumberFormatException e) {
+      throw new TrinoException(
+          GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT,
+          "Invalid value for config '" + entry.key + "': expected a number, 
got: " + value,
+          e);
+    }
+  }
+
   /**
    * Retrieves a comma-separated list of catalog name regex patterns that 
should be excluded from
    * loading
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
index 6247be1d4f..aa339ed7d6 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
@@ -19,8 +19,12 @@
 package org.apache.gravitino.trino.connector;
 
 import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 import io.trino.spi.TrinoException;
 import io.trino.spi.connector.Connector;
 import io.trino.spi.connector.ConnectorAccessControl;
@@ -36,11 +40,20 @@ import io.trino.spi.connector.ConnectorTransactionHandle;
 import io.trino.spi.session.PropertyMetadata;
 import io.trino.spi.transaction.IsolationLevel;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
 import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
+import org.apache.gravitino.trino.connector.security.GravitinoAuthProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * GravitinoConnector serves as the entry point for operations on the 
connector managed by Trino and
@@ -49,9 +62,13 @@ import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdap
  */
 public class GravitinoConnector implements Connector {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoConnector.class);
+
   private final NameIdentifier catalogIdentifier;
   protected final CatalogConnectorContext catalogConnectorContext;
   private final CatalogConnectorMetadata connectorMetadata;
+  private final boolean forwardUser;
+  private final Cache<String, UserSession> perUserSessionCache;
 
   /**
    * Constructs a new GravitinoConnector with the specified catalog identifier 
and catalog connector
@@ -64,6 +81,10 @@ public class GravitinoConnector implements Connector {
     this.catalogConnectorContext = catalogConnectorContext;
     this.connectorMetadata =
         new CatalogConnectorMetadata(catalogConnectorContext.getMetalake(), 
this.catalogIdentifier);
+
+    GravitinoConfig config = catalogConnectorContext.getConfig();
+    this.forwardUser = config.isForwardUser();
+    this.perUserSessionCache = forwardUser ? buildSessionCache(config) : null;
   }
 
   @Override
@@ -90,8 +111,11 @@ public class GravitinoConnector implements Connector {
     ConnectorMetadata internalMetadata =
         internalConnector.getMetadata(session, 
gravitinoTransactionHandle.getInternalHandle());
     Preconditions.checkArgument(internalMetadata != null, "Internal metadata 
must not be null");
+
+    CatalogConnectorMetadata metadata =
+        forwardUser ? resolveSessionMetadata(session) : connectorMetadata;
     return createGravitinoMetadata(
-        connectorMetadata, catalogConnectorContext.getMetadataAdapter(), 
internalMetadata);
+        metadata, catalogConnectorContext.getMetadataAdapter(), 
internalMetadata);
   }
 
   protected GravitinoMetadata createGravitinoMetadata(
@@ -179,8 +203,95 @@ public class GravitinoConnector implements Connector {
   }
 
   public void shutdown() {
+    if (forwardUser) {
+      perUserSessionCache.invalidateAll();
+    }
     Connector internalConnector = 
catalogConnectorContext.getInternalConnector();
     internalConnector.shutdown();
     catalogConnectorContext.close();
   }
+
+  private CatalogConnectorMetadata resolveSessionMetadata(ConnectorSession 
session) {
+    String credKey = "simple:" + session.getUser();
+    try {
+      return perUserSessionCache.get(
+              credKey,
+              () -> {
+                GravitinoAdminClient userClient =
+                    GravitinoAuthProvider.buildForSession(
+                        catalogConnectorContext.getConfig(), session);
+                GravitinoMetalake userMetalake =
+                    
userClient.loadMetalake(catalogConnectorContext.getMetalake().name());
+                return new UserSession(
+                    userClient, new CatalogConnectorMetadata(userMetalake, 
catalogIdentifier));
+              })
+          .metadata;
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      LOG.warn(
+          "Failed to create per-user Gravitino client for user '{}': {}",
+          session.getUser(),
+          cause.getMessage());
+      throw new TrinoException(
+          PERMISSION_DENIED,
+          "Failed to authenticate user '"
+              + session.getUser()
+              + "' with Gravitino: "
+              + cause.getMessage(),
+          cause);
+    }
+  }
+
+  private Cache<String, UserSession> buildSessionCache(GravitinoConfig config) 
{
+    Map<String, String> clientConfig = config.getClientConfig();
+    String authTypeStr = clientConfig.get(GravitinoAuthProvider.AUTH_TYPE_KEY);
+    if (StringUtils.isBlank(authTypeStr)) {
+      throw new TrinoException(
+          GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT,
+          "gravitino.client.session.forwardUser=true requires 
gravitino.client.authType to be set");
+    }
+    GravitinoAuthProvider.AuthType authType = 
GravitinoAuthProvider.parseAuthType(authTypeStr);
+    if (authType != GravitinoAuthProvider.AuthType.SIMPLE) {
+      throw new TrinoException(
+          GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT,
+          "gravitino.client.session.forwardUser=true only supports 
authType=simple, got: "
+              + authTypeStr);
+    }
+
+    return CacheBuilder.newBuilder()
+        .maximumSize(config.getSessionCacheMaxSize())
+        .expireAfterAccess(config.getSessionCacheExpireAfterAccessSeconds(), 
TimeUnit.SECONDS)
+        .removalListener(
+            (RemovalNotification<String, UserSession> notification) -> {
+              UserSession session = notification.getValue();
+              if (session != null) {
+                session.close();
+              }
+            })
+        .build();
+  }
+
+  /** Holds a per-user {@link GravitinoAdminClient} together with its derived 
metadata. */
+  private static final class UserSession {
+    final GravitinoAdminClient client;
+    final CatalogConnectorMetadata metadata;
+    private volatile boolean closed = false;
+
+    UserSession(GravitinoAdminClient client, CatalogConnectorMetadata 
metadata) {
+      this.client = client;
+      this.metadata = metadata;
+    }
+
+    void close() {
+      if (closed) {
+        return;
+      }
+      closed = true;
+      try {
+        client.close();
+      } catch (Exception e) {
+        LOG.warn("Failed to close GravitinoAdminClient", e);
+      }
+    }
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
index ba3ca624dc..ca8d1150bd 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
@@ -329,7 +329,6 @@ public abstract class GravitinoMetadata implements 
ConnectorMetadata {
       ColumnHandle column,
       Optional<String> comment) {
     String columnName = getColumnName(column);
-
     String commentString = "";
     if (comment.isPresent() && !StringUtils.isBlank(comment.get())) {
       commentString = comment.get();
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
index 83a8a6124e..448ca9b9c8 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
@@ -25,6 +25,7 @@ import io.trino.spi.session.PropertyMetadata;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.trino.connector.GravitinoConfig;
 import org.apache.gravitino.trino.connector.GravitinoConnector;
 import org.apache.gravitino.trino.connector.GravitinoConnectorPluginManager;
 import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
@@ -47,6 +48,8 @@ public class CatalogConnectorContext {
 
   private final CatalogConnectorAdapter adapter;
 
+  private final GravitinoConfig config;
+
   /**
    * Constructs a new CatalogConnectorContext.
    *
@@ -54,16 +57,19 @@ public class CatalogConnectorContext {
    * @param metalake the Gravitino metalake
    * @param internalConnector the internal connector
    * @param adapter the catalog connector adapter
+   * @param config the Gravitino connector configuration
    */
   public CatalogConnectorContext(
       GravitinoCatalog catalog,
       GravitinoMetalake metalake,
       Connector internalConnector,
-      CatalogConnectorAdapter adapter) {
+      CatalogConnectorAdapter adapter,
+      GravitinoConfig config) {
     this.catalog = catalog;
     this.metalake = metalake;
     this.internalConnector = internalConnector;
     this.adapter = adapter;
+    this.config = config;
   }
 
   /**
@@ -138,6 +144,15 @@ public class CatalogConnectorContext {
     return adapter.getColumnProperties();
   }
 
+  /**
+   * Returns the Gravitino connector configuration.
+   *
+   * @return the Gravitino config
+   */
+  public GravitinoConfig getConfig() {
+    return config;
+  }
+
   /** Closes the internal connector associated with this context. */
   public void close() {
     this.internalConnector.shutdown();
@@ -158,6 +173,7 @@ public class CatalogConnectorContext {
     private GravitinoCatalog catalog;
     private GravitinoMetalake metalake;
     private ConnectorContext context;
+    private GravitinoConfig config;
 
     /**
      * Constructs a new Builder with the specified connector adapter.
@@ -169,24 +185,30 @@ public class CatalogConnectorContext {
     }
 
     /**
-     * Constructs a new Builder with the specified connector adapter and 
catalog.
+     * Constructs a new Builder with the specified connector adapter, catalog, 
and config. Used
+     * internally by {@link #clone(GravitinoCatalog)} to propagate all builder 
state.
      *
      * @param connectorAdapter the connector adapter to use
      * @param catalog the catalog to use
+     * @param config the Gravitino config to propagate
      */
-    private Builder(CatalogConnectorAdapter connectorAdapter, GravitinoCatalog 
catalog) {
+    private Builder(
+        CatalogConnectorAdapter connectorAdapter,
+        GravitinoCatalog catalog,
+        GravitinoConfig config) {
       this.connectorAdapter = connectorAdapter;
       this.catalog = catalog;
+      this.config = config;
     }
 
     /**
-     * Clones the builder with a new catalog.
+     * Clones the builder with a new catalog, preserving all other state 
including config.
      *
      * @param catalog the new catalog to use
      * @return a new builder with the specified catalog
      */
     public Builder clone(GravitinoCatalog catalog) {
-      return new Builder(connectorAdapter, catalog);
+      return new Builder(connectorAdapter, catalog, this.config);
     }
 
     /**
@@ -211,6 +233,17 @@ public class CatalogConnectorContext {
       return this;
     }
 
+    /**
+     * Sets the Gravitino connector configuration.
+     *
+     * @param config the Gravitino config
+     * @return the builder
+     */
+    public Builder withConfig(GravitinoConfig config) {
+      this.config = config;
+      return this;
+    }
+
     /**
      * Builds a new CatalogConnectorContext instance.
      *
@@ -218,16 +251,17 @@ public class CatalogConnectorContext {
      * @throws Exception if the metalake, catalog, or context is not set
      */
     public CatalogConnectorContext build() throws Exception {
-      Preconditions.checkArgument(metalake != null, "metalake is not null");
-      Preconditions.checkArgument(catalog != null, "catalog is not null");
-      Preconditions.checkArgument(context != null, "context is not null");
+      Preconditions.checkArgument(metalake != null, "metalake must not be 
null");
+      Preconditions.checkArgument(catalog != null, "catalog must not be null");
+      Preconditions.checkArgument(context != null, "context must not be null");
+      Preconditions.checkArgument(config != null, "config must not be null");
       Map<String, String> connectorConfig = 
connectorAdapter.buildInternalConnectorConfig(catalog);
       String internalConnectorName = connectorAdapter.internalConnectorName();
 
       Connector connector =
           
GravitinoConnectorPluginManager.instance(context.getClass().getClassLoader())
               .createConnector(internalConnectorName, connectorConfig, 
context);
-      return new CatalogConnectorContext(catalog, metalake, connector, 
connectorAdapter);
+      return new CatalogConnectorContext(catalog, metalake, connector, 
connectorAdapter, config);
     }
   }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
index 7357531830..7539a0b439 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
@@ -42,6 +42,7 @@ import 
org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.trino.connector.GravitinoConfig;
 import org.apache.gravitino.trino.connector.GravitinoErrorCode;
 import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
+import org.apache.gravitino.trino.connector.security.GravitinoAuthProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,13 +114,14 @@ public class CatalogConnectorManager {
    * @param client the Gravitino admin client
    */
   public void config(GravitinoConfig config, GravitinoAdminClient client) {
-    Preconditions.checkArgument(config != null, "config is not null");
+    Preconditions.checkArgument(config != null, "config must not be null");
     this.config = config;
     if (client == null) {
-      String authType = 
config.getClientConfig().getOrDefault("gravitino.client.authType", "none");
+      String authType =
+          
config.getClientConfig().getOrDefault(GravitinoAuthProvider.AUTH_TYPE_KEY, 
"none");
       LOG.info("Building Gravitino client with authType: {}", authType);
       try {
-        this.gravitinoClient = GravitinoAuthProvider.buildClient(config);
+        this.gravitinoClient = GravitinoAuthProvider.build(config);
       } catch (IllegalArgumentException e) {
         throw new TrinoException(
             GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT,
@@ -414,7 +416,8 @@ public class CatalogConnectorManager {
           
catalogConnectorFactory.createCatalogConnectorContextBuilder(catalog);
       builder
           .withMetalake(metalakes.computeIfAbsent(catalog.getMetalake(), 
this::retrieveMetalake))
-          .withContext(context);
+          .withContext(context)
+          .withConfig(config);
 
       CatalogConnectorContext connectorContext = builder.build();
       String fullCatalogName = getTrinoCatalogName(catalog);
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/GravitinoAuthProvider.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/security/GravitinoAuthProvider.java
similarity index 59%
rename from 
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/GravitinoAuthProvider.java
rename to 
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/security/GravitinoAuthProvider.java
index da12c32547..247639c6cd 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/GravitinoAuthProvider.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/security/GravitinoAuthProvider.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.trino.connector.catalog;
+package org.apache.gravitino.trino.connector.security;
 
+import io.trino.spi.connector.ConnectorSession;
 import java.io.File;
-import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
@@ -33,45 +33,62 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Builds a {@link GravitinoAdminClient} with the appropriate authentication 
provider based on the
- * Gravitino config.
+ * Gravitino config, and produces a per-user client for session forwarding via 
{@link
+ * #buildForSession(GravitinoConfig, ConnectorSession)}.
  */
-class GravitinoAuthProvider {
+public class GravitinoAuthProvider {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoAuthProvider.class);
 
   /** Authentication type configuration key. */
-  static final String AUTH_TYPE_KEY =
+  public static final String AUTH_TYPE_KEY =
       GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + "authType";
 
   /** Simple authentication user configuration key. */
-  static final String SIMPLE_AUTH_USER_KEY = "gravitino.user";
+  public static final String SIMPLE_AUTH_USER_KEY = "gravitino.user";
+
+  /**
+   * When set to {@code true}, the Trino session user/token is forwarded to 
Gravitino per-request
+   * via a per-user {@link GravitinoAdminClient} instead of the shared service 
client.
+   */
+  public static final String FORWARD_SESSION_USER_KEY =
+      GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + 
"session.forwardUser";
 
   /** OAuth2 server URI configuration key. */
-  static final String OAUTH_SERVER_URI_KEY =
+  public static final String OAUTH_SERVER_URI_KEY =
       GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + 
"oauth2.serverUri";
 
   /** OAuth2 credential configuration key. */
-  static final String OAUTH_CREDENTIAL_KEY =
+  public static final String OAUTH_CREDENTIAL_KEY =
       GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + 
"oauth2.credential";
 
   /** OAuth2 path configuration key. */
-  static final String OAUTH_PATH_KEY =
+  public static final String OAUTH_PATH_KEY =
       GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + 
"oauth2.path";
 
   /** OAuth2 scope configuration key. */
-  static final String OAUTH_SCOPE_KEY =
+  public static final String OAUTH_SCOPE_KEY =
       GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + 
"oauth2.scope";
 
   /** Kerberos principal configuration key. */
-  static final String KERBEROS_PRINCIPAL_KEY =
+  public static final String KERBEROS_PRINCIPAL_KEY =
       GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + 
"kerberos.principal";
 
   /** Kerberos keytab file path configuration key. */
-  static final String KERBEROS_KEYTAB_FILE_PATH_KEY =
+  public static final String KERBEROS_KEYTAB_FILE_PATH_KEY =
       GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + 
"kerberos.keytabFilePath";
 
+  /** Session cache maximum size configuration key. */
+  public static final String SESSION_CACHE_MAX_SIZE_KEY =
+      GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX + 
"session.cache.maxSize";
+
+  /** Session cache expiry (seconds) configuration key. */
+  public static final String SESSION_CACHE_EXPIRE_AFTER_ACCESS_SECONDS_KEY =
+      GravitinoClientConfiguration.GRAVITINO_CLIENT_CONFIG_PREFIX
+          + "session.cache.expireAfterAccessSeconds";
+
   /** Authentication types supported by the Trino connector. */
-  enum AuthType {
+  public enum AuthType {
     SIMPLE,
     OAUTH2,
     KERBEROS,
@@ -81,30 +98,21 @@ class GravitinoAuthProvider {
   private GravitinoAuthProvider() {}
 
   /**
-   * Builds a GravitinoAdminClient from the given config, applying 
authentication settings found in
-   * the client config.
+   * Builds a {@link GravitinoAdminClient} from the given config, applying 
authentication settings
+   * found in the client config.
    *
    * @param config the Gravitino configuration containing server URI and 
client properties
-   * @return a configured GravitinoAdminClient
+   * @return a configured {@link GravitinoAdminClient}
    */
-  public static GravitinoAdminClient buildClient(GravitinoConfig config) {
-    Map<String, String> clientConfig = new HashMap<>(config.getClientConfig());
+  public static GravitinoAdminClient build(GravitinoConfig config) {
+    Map<String, String> clientConfig = config.getClientConfig();
     String uri = config.getURI();
     String authTypeStr = clientConfig.get(AUTH_TYPE_KEY);
 
     GravitinoAdminClient.AdminClientBuilder builder = 
GravitinoAdminClient.builder(uri);
 
     if (StringUtils.isNotBlank(authTypeStr)) {
-      AuthType authType;
-      try {
-        authType = AuthType.valueOf(authTypeStr.toUpperCase(Locale.ROOT));
-      } catch (IllegalArgumentException e) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Invalid authentication type: %s. Valid values are: simple, 
oauth2, kerberos, none",
-                authTypeStr),
-            e);
-      }
+      AuthType authType = parseAuthType(authTypeStr);
 
       switch (authType) {
         case SIMPLE:
@@ -122,7 +130,85 @@ class GravitinoAuthProvider {
       }
     }
 
-    // Remove auth-specific keys before passing to withClientConfig
+    removeAuthSpecificKeys(clientConfig);
+    builder.withClientConfig(clientConfig);
+
+    return builder.build();
+  }
+
+  /**
+   * Alias for {@link #build(GravitinoConfig)}, kept for backward 
compatibility with existing tests.
+   *
+   * @deprecated Use {@link #build(GravitinoConfig)} directly.
+   */
+  @Deprecated
+  @SuppressWarnings("InlineMeSuggester")
+  public static GravitinoAdminClient buildClient(GravitinoConfig config) {
+    return build(config);
+  }
+
+  /**
+   * Builds a per-user {@link GravitinoAdminClient} whose credentials come 
from the given Trino
+   * connector session. This is the entry point for the per-user client cache 
when {@code
+   * forwardUser=true}.
+   *
+   * <p>Currently only {@code authType=simple} is supported: the Trino session 
username is used as
+   * the Gravitino simple-auth identity.
+   *
+   * @param config the Gravitino connector configuration
+   * @param session the current Trino connector session
+   * @return a new {@link GravitinoAdminClient} authenticated for the session 
user
+   * @throws IllegalArgumentException if forwarding is not configured or 
authType is missing
+   * @throws UnsupportedOperationException if the authType does not support 
session forwarding
+   */
+  public static GravitinoAdminClient buildForSession(
+      GravitinoConfig config, ConnectorSession session) {
+    Map<String, String> clientConfig = config.getClientConfig();
+    String uri = config.getURI();
+    String authTypeStr = clientConfig.get(AUTH_TYPE_KEY);
+    boolean forwardUser =
+        
Boolean.parseBoolean(clientConfig.getOrDefault(FORWARD_SESSION_USER_KEY, 
"false"));
+
+    if (!forwardUser) {
+      throw new IllegalArgumentException(
+          "buildForSession called but forwardUser is not enabled in config");
+    }
+
+    if (StringUtils.isBlank(authTypeStr)) {
+      throw new IllegalArgumentException(
+          "buildForSession requires an authType to be set in config");
+    }
+
+    AuthType authType = parseAuthType(authTypeStr);
+
+    GravitinoAdminClient.AdminClientBuilder builder = 
GravitinoAdminClient.builder(uri);
+
+    if (authType != AuthType.SIMPLE) {
+      throw new UnsupportedOperationException(
+          "Auth type "
+              + authType
+              + " does not support session forwarding. Only simple is 
supported.");
+    }
+    builder.withSimpleAuth(session.getUser());
+
+    removeAuthSpecificKeys(clientConfig);
+    builder.withClientConfig(clientConfig);
+    return builder.build();
+  }
+
+  public static AuthType parseAuthType(String authTypeStr) {
+    try {
+      return AuthType.valueOf(authTypeStr.toUpperCase(Locale.ROOT));
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Invalid authentication type: %s. Valid values are: simple, 
oauth2, kerberos, none",
+              authTypeStr),
+          e);
+    }
+  }
+
+  private static void removeAuthSpecificKeys(Map<String, String> clientConfig) 
{
     clientConfig.remove(AUTH_TYPE_KEY);
     clientConfig.remove(OAUTH_SERVER_URI_KEY);
     clientConfig.remove(OAUTH_CREDENTIAL_KEY);
@@ -130,9 +216,9 @@ class GravitinoAuthProvider {
     clientConfig.remove(OAUTH_SCOPE_KEY);
     clientConfig.remove(KERBEROS_PRINCIPAL_KEY);
     clientConfig.remove(KERBEROS_KEYTAB_FILE_PATH_KEY);
-
-    builder.withClientConfig(clientConfig);
-    return builder.build();
+    clientConfig.remove(FORWARD_SESSION_USER_KEY);
+    clientConfig.remove(SESSION_CACHE_MAX_SIZE_KEY);
+    clientConfig.remove(SESSION_CACHE_EXPIRE_AFTER_ACCESS_SECONDS_KEY);
   }
 
   private static void buildSimpleAuth(
@@ -220,7 +306,6 @@ class GravitinoAuthProvider {
           KERBEROS_KEYTAB_FILE_PATH_KEY);
     }
 
-    // host is set by GravitinoAdminClient.Builder.withKerberosAuth() from the 
server URI
     return kerberosBuilder.build();
   }
 }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorForwardUser.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorForwardUser.java
new file mode 100644
index 0000000000..4380942be9
--- /dev/null
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorForwardUser.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.spi.TrinoException;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
+import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
+import org.apache.gravitino.trino.connector.security.GravitinoAuthProvider;
+import org.junit.jupiter.api.Test;
+
+/** Tests for forwardUser startup validation in {@link GravitinoConnector}. */
+class TestGravitinoConnectorForwardUser {
+
+  @Test
+  void testForwardUserWithoutAuthTypeThrowsAtConstruction() {
+    CatalogConnectorContext ctx =
+        mockContextWithConfig(
+            ImmutableMap.of(GravitinoAuthProvider.FORWARD_SESSION_USER_KEY, 
"true"));
+
+    TrinoException ex = assertThrows(TrinoException.class, () -> new 
GravitinoConnector(ctx));
+    assertEquals(GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT.toErrorCode(), 
ex.getErrorCode());
+  }
+
+  @Test
+  void testForwardUserWithKerberosAuthTypeThrowsAtConstruction() {
+    CatalogConnectorContext ctx =
+        mockContextWithConfig(
+            ImmutableMap.of(
+                GravitinoAuthProvider.FORWARD_SESSION_USER_KEY, "true",
+                GravitinoAuthProvider.AUTH_TYPE_KEY, "kerberos"));
+
+    TrinoException ex = assertThrows(TrinoException.class, () -> new 
GravitinoConnector(ctx));
+    assertEquals(GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT.toErrorCode(), 
ex.getErrorCode());
+  }
+
+  @Test
+  void testForwardUserWithOAuth2AuthTypeThrowsAtConstruction() {
+    CatalogConnectorContext ctx =
+        mockContextWithConfig(
+            ImmutableMap.of(
+                GravitinoAuthProvider.FORWARD_SESSION_USER_KEY, "true",
+                GravitinoAuthProvider.AUTH_TYPE_KEY, "oauth2"));
+
+    TrinoException ex = assertThrows(TrinoException.class, () -> new 
GravitinoConnector(ctx));
+    assertEquals(GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT.toErrorCode(), 
ex.getErrorCode());
+  }
+
+  @Test
+  void testForwardUserWithSimpleAuthTypeSucceeds() {
+    CatalogConnectorContext ctx =
+        mockContextWithConfig(
+            ImmutableMap.of(
+                GravitinoAuthProvider.FORWARD_SESSION_USER_KEY, "true",
+                GravitinoAuthProvider.AUTH_TYPE_KEY, "simple"));
+
+    assertDoesNotThrow(() -> new GravitinoConnector(ctx));
+  }
+
+  @Test
+  void testForwardUserFalseWithNoAuthTypeSucceeds() {
+    CatalogConnectorContext ctx = mockContextWithConfig(ImmutableMap.of());
+    assertDoesNotThrow(() -> new GravitinoConnector(ctx));
+  }
+
+  private static CatalogConnectorContext mockContextWithConfig(
+      ImmutableMap<String, String> extraConfig) {
+    GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class);
+    
when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake", 
"catalog"));
+
+    GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+    Catalog catalog = mock(Catalog.class);
+    when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+    when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+    when(metalake.loadCatalog(any())).thenReturn(catalog);
+
+    ImmutableMap.Builder<String, String> builder =
+        ImmutableMap.<String, String>builder()
+            .put("gravitino.uri", "http://localhost:8090";)
+            .put("gravitino.metalake", "test");
+    builder.putAll(extraConfig);
+    GravitinoConfig config = new GravitinoConfig(builder.build());
+
+    CatalogConnectorContext ctx = mock(CatalogConnectorContext.class);
+    when(ctx.getCatalog()).thenReturn(mockCatalog);
+    when(ctx.getMetalake()).thenReturn(metalake);
+    when(ctx.getConfig()).thenReturn(config);
+    return ctx;
+  }
+}
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
index 1c1bbcb8b6..f30f60d8a2 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableMap;
 import io.trino.spi.connector.Connector;
 import io.trino.spi.transaction.IsolationLevel;
 import org.apache.gravitino.Catalog;
@@ -45,6 +46,11 @@ class TestGravitinoConnectorNullChecks {
     when(mockContext.getCatalog()).thenReturn(mockCatalog);
     GravitinoMetalake metalake = mockMetalake();
     when(mockContext.getMetalake()).thenReturn(metalake);
+    when(mockContext.getConfig())
+        .thenReturn(
+            new GravitinoConfig(
+                ImmutableMap.of(
+                    "gravitino.uri", "http://localhost:8090";, 
"gravitino.metalake", "test")));
     when(mockContext.getInternalConnector()).thenReturn(null);
     GravitinoConnector connector = new GravitinoConnector(mockContext);
     assertThrows(
@@ -61,6 +67,11 @@ class TestGravitinoConnectorNullChecks {
     when(mockContext.getCatalog()).thenReturn(mockCatalog);
     GravitinoMetalake metalake = mockMetalake();
     when(mockContext.getMetalake()).thenReturn(metalake);
+    when(mockContext.getConfig())
+        .thenReturn(
+            new GravitinoConfig(
+                ImmutableMap.of(
+                    "gravitino.uri", "http://localhost:8090";, 
"gravitino.metalake", "test")));
 
     Connector mockInternalConnector = mock(Connector.class);
     when(mockContext.getInternalConnector()).thenReturn(mockInternalConnector);
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestGravitinoAuthProvider.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/security/TestGravitinoAuthProvider.java
similarity index 71%
rename from 
trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestGravitinoAuthProvider.java
rename to 
trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/security/TestGravitinoAuthProvider.java
index 5a341614b7..53e5e3df5f 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestGravitinoAuthProvider.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/security/TestGravitinoAuthProvider.java
@@ -16,19 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.trino.connector.catalog;
+package org.apache.gravitino.trino.connector.security;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
+import io.trino.spi.connector.ConnectorSession;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import org.apache.gravitino.client.GravitinoAdminClient;
 import org.apache.gravitino.trino.connector.GravitinoConfig;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+@SuppressWarnings("deprecation")
 public class TestGravitinoAuthProvider {
 
   @Test
@@ -139,6 +145,62 @@ public class TestGravitinoAuthProvider {
                             keytabFile.getAbsolutePath()))));
   }
 
+  @Test
+  public void testBuildResultContainsClient() {
+    GravitinoAdminClient client = 
GravitinoAuthProvider.build(buildConfig(ImmutableMap.of()));
+    assertNotNull(client);
+  }
+
+  @Test
+  public void testBuildSimpleWithoutForwardUser() {
+    GravitinoAdminClient client =
+        GravitinoAuthProvider.build(
+            buildConfig(ImmutableMap.of(GravitinoAuthProvider.AUTH_TYPE_KEY, 
"simple")));
+    assertNotNull(client);
+  }
+
+  @Test
+  public void testBuildForSessionSimple() {
+    GravitinoConfig config =
+        buildConfig(
+            ImmutableMap.of(
+                GravitinoAuthProvider.AUTH_TYPE_KEY, "simple",
+                GravitinoAuthProvider.FORWARD_SESSION_USER_KEY, "true"));
+
+    ConnectorSession session = mock(ConnectorSession.class);
+    when(session.getUser()).thenReturn("alice");
+
+    GravitinoAdminClient client = 
GravitinoAuthProvider.buildForSession(config, session);
+    assertNotNull(client);
+  }
+
+  @Test
+  public void testBuildForSessionThrowsWhenForwardUserDisabled() {
+    GravitinoConfig config =
+        buildConfig(ImmutableMap.of(GravitinoAuthProvider.AUTH_TYPE_KEY, 
"simple"));
+    ConnectorSession session = mock(ConnectorSession.class);
+    when(session.getUser()).thenReturn("alice");
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> GravitinoAuthProvider.buildForSession(config, session));
+  }
+
+  @Test
+  public void testBuildForSessionThrowsForNonSimpleAuthType() {
+    GravitinoConfig config =
+        buildConfig(
+            ImmutableMap.of(
+                GravitinoAuthProvider.AUTH_TYPE_KEY, "kerberos",
+                GravitinoAuthProvider.FORWARD_SESSION_USER_KEY, "true"));
+    ConnectorSession session = mock(ConnectorSession.class);
+    when(session.getUser()).thenReturn("alice");
+
+    assertThrows(
+        UnsupportedOperationException.class,
+        () -> GravitinoAuthProvider.buildForSession(config, session));
+  }
+
   private GravitinoConfig buildConfig(ImmutableMap<String, String> authConfig) 
{
     ImmutableMap.Builder<String, String> builder =
         ImmutableMap.<String, String>builder()

Reply via email to