Copilot commented on code in PR #10730:
URL: https://github.com/apache/gravitino/pull/10730#discussion_r3056869472


##########
trino-connector/integration-test/trino-test-tools/trino_integration_test.sh:
##########
@@ -26,6 +26,28 @@ export HADOOP_USER_NAME=anonymous
 echo $GRAVITINO_ROOT_DIR
 cd $GRAVITINO_ROOT_DIR
 
+PID_FILE="$GRAVITINO_ROOT_DIR/integration-test-common/build/trino-test-env.pid"
+
+if [ "$1" = "--stop" ]; then
+  if [ -f "$PID_FILE" ]; then
+    GRADLE_PID=$(cat "$PID_FILE")
+    if kill -0 "$GRADLE_PID" 2>/dev/null; then
+      PGID=$(ps -o pgid= -p "$GRADLE_PID" 2>/dev/null | tr -d ' ')
+      echo "Stopping environment (PGID: $PGID)..."
+      kill -TERM -- "-$PGID" 2>/dev/null
+      for i in $(seq 1 60); do
+        kill -0 "$GRADLE_PID" 2>/dev/null || break
+        sleep 1
+      done
+    fi
+    rm -f "$PID_FILE"
+  else
+    echo "No running environment found, stopping containers directly..."
+    "$GRAVITINO_ROOT_DIR/integration-test-common/docker-script/shutdown.sh"
+  fi
+  exit 0
+fi

Review Comment:
   If `PID_FILE` exists but the Gradle PID is no longer running, the script 
removes the PID file but does not call the container shutdown script. This can 
leave the environment running with no easy cleanup path. Consider invoking 
`shutdown.sh` in the case where the PID is missing/dead (or when PGID lookup 
fails), not only when the PID file is absent.



##########
docs/trino-connector/authentication.md:
##########
@@ -123,6 +124,51 @@ gravitino.client.oauth2.scope=test
 SHOW CATALOGS;
 ```
 
+### Session Credential Forwarding
+
+Setting `gravitino.client.session.forwardUser=true` enables per-query 
credential forwarding from Trino to Gravitino. The behavior depends on the 
configured `authType`:
+
+- **`authType=simple`**: The Trino session username is encoded as a `Basic` 
credential and forwarded per-request, so each Trino user is visible in the 
Gravitino audit log instead of the shared `gravitino.user`.
+- **`authType=oauth2`**: The Bearer token from Trino session extra credentials 
is forwarded per-request to Gravitino, enabling per-user OAuth2 authorization.
+
+**Configuration for simple auth (forward session user):**
+
+```properties
+connector.name=gravitino
+gravitino.metalake=metalake
+gravitino.uri=http://localhost:8090
+
+gravitino.client.authType=simple
+gravitino.client.session.forwardUser=true
+```
+
+**Configuration for OAuth2 (forward session Bearer token):**
+
+```properties
+connector.name=gravitino
+gravitino.metalake=metalake
+gravitino.uri=http://localhost:8090
+
+gravitino.client.authType=oauth2
+gravitino.client.session.forwardUser=true
+gravitino.client.oauth2.token.credentialKey=gravitino.token
+```
+
+**Query execution with OAuth2 token:**
+
+```sql
+-- Pass an OAuth2 token through Trino extra credentials
+SET SESSION gravitino.extra_credentials = 
'gravitino.token=<your-oauth2-token>';
+SHOW SCHEMAS IN my_catalog;
+```
+
+**Configuration properties:**
+
+| Property                                        | Description                
                                                                                
                              | Default value | Required                        
                         | Since version |
+|-------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------------------------------------------------------|---------------|
+| `gravitino.client.session.forwardUser`          | When `true`, forwards the 
Trino session user (`simple`) or Bearer token (`oauth2`) to Gravitino 
per-request                              | `false`       | No                   
                                    | 1.9.0         |
+| `gravitino.client.oauth2.token.credentialKey`   | Key name in Trino extra 
credentials that holds the Bearer token. Only used when `authType=oauth2` and 
`forwardUser=true`                  | (none)        | Yes if `authType=oauth2` 
and `forwardUser=true`          | 1.9.0         |

Review Comment:
   The documented 'Since version' values (`1.9.0`) appear inconsistent with the 
`@since 1.3.0` tags added in the new Java classes (`TrinoSessionContext`, 
`TrinoSessionAuthProvider`, `ExtraHeadersProvider`). Align these version 
annotations so users and maintainers have a single, correct introduction 
version for the feature.
   ```suggestion
   | `gravitino.client.session.forwardUser`          | When `true`, forwards 
the Trino session user (`simple`) or Bearer token (`oauth2`) to Gravitino 
per-request                              | `false`       | No                   
                                    | 1.3.0         |
   | `gravitino.client.oauth2.token.credentialKey`   | Key name in Trino extra 
credentials that holds the Bearer token. Only used when `authType=oauth2` and 
`forwardUser=true`                  | (none)        | Yes if `authType=oauth2` 
and `forwardUser=true`          | 1.3.0         |
   ```



##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/SessionAwareCatalogMetadata.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.catalog;
+
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.SchemaTableName;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.trino.connector.metadata.GravitinoColumn;
+import org.apache.gravitino.trino.connector.metadata.GravitinoSchema;
+import org.apache.gravitino.trino.connector.metadata.GravitinoTable;
+import org.apache.gravitino.trino.connector.security.TrinoSessionContext;
+
+/**
+ * A session-aware facade over {@link CatalogConnectorMetadata} that 
automatically applies and
+ * clears Trino session credentials around every Gravitino API call.
+ *
+ * <p>When {@code sessionContext} is {@code null} (auth forwarding is not 
configured), calls are
+ * forwarded to the delegate without any session management overhead.
+ *
+ * @since 1.3.0
+ */
+public class SessionAwareCatalogMetadata {
+
+  private final CatalogConnectorMetadata delegate;
+  @Nullable private final TrinoSessionContext sessionContext;
+
+  /**
+   * Constructs a new SessionAwareCatalogMetadata.
+   *
+   * @param delegate the underlying metadata implementation
+   * @param sessionContext the session context used to forward Trino 
credentials, or {@code null}
+   *     when forwarding is not configured
+   */
+  public SessionAwareCatalogMetadata(
+      CatalogConnectorMetadata delegate, @Nullable TrinoSessionContext 
sessionContext) {
+    this.delegate = delegate;
+    this.sessionContext = sessionContext;
+  }
+
+  public List<String> listSchemaNames(ConnectorSession session) {
+    return doAs(session, delegate::listSchemaNames);
+  }
+
+  public GravitinoSchema getSchema(ConnectorSession session, String 
schemaName) {
+    return doAs(session, () -> delegate.getSchema(schemaName));
+  }
+
+  public GravitinoTable getTable(ConnectorSession session, String schemaName, 
String tableName) {
+    return doAs(session, () -> delegate.getTable(schemaName, tableName));
+  }
+
+  public List<String> listTables(ConnectorSession session, String schemaName) {
+    return doAs(session, () -> delegate.listTables(schemaName));
+  }
+
+  public boolean tableExists(ConnectorSession session, String schemaName, 
String tableName) {
+    return doAs(session, () -> delegate.tableExists(schemaName, tableName));
+  }
+
+  public void createTable(ConnectorSession session, GravitinoTable table, 
boolean ignoreExisting) {
+    doAs(session, () -> delegate.createTable(table, ignoreExisting));
+  }
+
+  public void createSchema(ConnectorSession session, GravitinoSchema schema) {
+    doAs(session, () -> delegate.createSchema(schema));
+  }
+
+  public void dropSchema(ConnectorSession session, String schemaName, boolean 
cascade) {
+    doAs(session, () -> delegate.dropSchema(schemaName, cascade));
+  }
+
+  public void dropTable(ConnectorSession session, SchemaTableName tableName) {
+    doAs(session, () -> delegate.dropTable(tableName));
+  }
+
+  public void renameSchema(ConnectorSession session, String source, String 
target) {
+    doAs(session, () -> delegate.renameSchema(source, target));
+  }
+
+  public void renameTable(
+      ConnectorSession session, SchemaTableName oldTableName, SchemaTableName 
newTableName) {
+    doAs(session, () -> delegate.renameTable(oldTableName, newTableName));
+  }
+
+  public void setTableComment(
+      ConnectorSession session, SchemaTableName schemaTableName, String 
comment) {
+    doAs(session, () -> delegate.setTableComment(schemaTableName, comment));
+  }
+
+  public void setTableProperties(
+      ConnectorSession session, SchemaTableName schemaTableName, Map<String, 
String> properties) {
+    doAs(session, () -> delegate.setTableProperties(schemaTableName, 
properties));
+  }
+
+  public void addColumn(
+      ConnectorSession session, SchemaTableName schemaTableName, 
GravitinoColumn column) {
+    doAs(session, () -> delegate.addColumn(schemaTableName, column));
+  }
+
+  public void addColumn(
+      ConnectorSession session,
+      SchemaTableName schemaTableName,
+      GravitinoColumn column,
+      TableChange.ColumnPosition position) {
+    doAs(session, () -> delegate.addColumn(schemaTableName, column, position));
+  }
+
+  public void dropColumn(
+      ConnectorSession session, SchemaTableName schemaTableName, String 
columnName) {
+    doAs(session, () -> delegate.dropColumn(schemaTableName, columnName));
+  }
+
+  public void renameColumn(
+      ConnectorSession session, SchemaTableName schemaTableName, String 
columnName, String target) {
+    doAs(session, () -> delegate.renameColumn(schemaTableName, columnName, 
target));
+  }
+
+  public void setColumnComment(
+      ConnectorSession session,
+      SchemaTableName schemaTableName,
+      String columnName,
+      String comment) {
+    doAs(session, () -> delegate.setColumnComment(schemaTableName, columnName, 
comment));
+  }
+
+  public void setColumnType(
+      ConnectorSession session, SchemaTableName schemaTableName, String 
columnName, Type type) {
+    doAs(session, () -> delegate.setColumnType(schemaTableName, columnName, 
type));
+  }
+
+  private <T> T doAs(ConnectorSession session, Supplier<T> action) {
+    if (sessionContext != null) {
+      sessionContext.applySession(session);
+    }
+    try {
+      return action.get();
+    } finally {
+      if (sessionContext != null) {
+        sessionContext.clearSession();
+      }
+    }
+  }

Review Comment:
   This unconditionally clears the session context at the end of every wrapped 
call. If session credentials are also applied in an outer scope (e.g., 
per-query begin/cleanup) or if calls can be nested on the same thread, the 
inner `clearSession()` will erase the outer context and break subsequent 
operations. A safer pattern is to save the previous thread-local value and 
restore it in `finally` (or implement a nesting/reference-count mechanism).



##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/security/TrinoSessionAuthProvider.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.security;
+
+import io.trino.spi.connector.ConnectorSession;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.client.CustomTokenProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link CustomTokenProvider} that forwards the Trino session user's 
credentials to Gravitino on
+ * every request by storing them in a {@link ThreadLocal}.
+ *
+ * <p>There are two operating modes, selected at construction time:
+ *
+ * <ul>
+ *   <li><b>BEARER_TOKEN</b> – reads a Bearer token from the Trino session's 
extra credentials using
+ *       a configured key, and forwards it as {@code Authorization: Bearer 
<token>}. Used when
+ *       {@code authType=oauth2} and {@code forwardUser=true}.
+ *   <li><b>SIMPLE_SESSION</b> – encodes the Trino session username using the 
same Basic-auth format
+ *       as {@code SimpleTokenProvider}, so the Gravitino server sees the 
actual Trino user rather
+ *       than a shared service account. Used when {@code authType=simple} and 
{@code
+ *       forwardUser=true}.
+ * </ul>
+ *
+ * <p>When no session has been applied (e.g. during connector start-up), 
{@link #hasTokenData()}
+ * returns {@code false} so that the request is sent without an {@code 
Authorization} header.
+ *
+ * @since 1.3.0
+ */
+public class TrinoSessionAuthProvider extends CustomTokenProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TrinoSessionAuthProvider.class);
+
+  /** Distinguishes between the two operating modes. */
+  enum Mode {
+    BEARER_TOKEN,
+    SIMPLE_SESSION
+  }
+
+  private static final ThreadLocal<byte[]> TOKEN_HOLDER = new ThreadLocal<>();

Review Comment:
   Using a static `ThreadLocal` makes the token state global across all 
`TrinoSessionAuthProvider` instances in the JVM. If multiple Gravitino 
clients/connectors exist in the same process (or tests run in the same JVM), 
one provider can overwrite/clear another provider’s token on the same thread. 
Make `TOKEN_HOLDER` an instance field (non-static) so session state is isolated 
per client/provider.
   ```suggestion
     private final ThreadLocal<byte[]> TOKEN_HOLDER = new ThreadLocal<>();
   ```



##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/security/TrinoSessionAuthProvider.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.security;
+
+import io.trino.spi.connector.ConnectorSession;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.client.CustomTokenProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link CustomTokenProvider} that forwards the Trino session user's 
credentials to Gravitino on
+ * every request by storing them in a {@link ThreadLocal}.
+ *
+ * <p>There are two operating modes, selected at construction time:
+ *
+ * <ul>
+ *   <li><b>BEARER_TOKEN</b> – reads a Bearer token from the Trino session's 
extra credentials using
+ *       a configured key, and forwards it as {@code Authorization: Bearer 
<token>}. Used when
+ *       {@code authType=oauth2} and {@code forwardUser=true}.
+ *   <li><b>SIMPLE_SESSION</b> – encodes the Trino session username using the 
same Basic-auth format
+ *       as {@code SimpleTokenProvider}, so the Gravitino server sees the 
actual Trino user rather
+ *       than a shared service account. Used when {@code authType=simple} and 
{@code
+ *       forwardUser=true}.
+ * </ul>
+ *
+ * <p>When no session has been applied (e.g. during connector start-up), 
{@link #hasTokenData()}
+ * returns {@code false} so that the request is sent without an {@code 
Authorization} header.
+ *
+ * @since 1.3.0
+ */
+public class TrinoSessionAuthProvider extends CustomTokenProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TrinoSessionAuthProvider.class);
+
+  /** Distinguishes between the two operating modes. */
+  enum Mode {
+    BEARER_TOKEN,
+    SIMPLE_SESSION
+  }
+
+  private static final ThreadLocal<byte[]> TOKEN_HOLDER = new ThreadLocal<>();
+
+  /** Cache of encoded tokens per username to avoid repeated Base64 encoding. 
*/
+  private static final ConcurrentHashMap<String, byte[]> TOKEN_CACHE = new 
ConcurrentHashMap<>();
+
+  private final Mode mode;
+
+  /** The key to look up in Trino extra credentials (only used in OAUTH2_TOKEN 
mode). */
+  @Nullable private final String credentialKey;
+
+  /**
+   * Creates a provider in BEARER_TOKEN mode.
+   *
+   * @param credentialKey the key whose value in the Trino session's extra 
credentials is the Bearer
+   *     token
+   */
+  TrinoSessionAuthProvider(String credentialKey) {
+    this.mode = Mode.BEARER_TOKEN;
+    this.credentialKey = credentialKey;
+    this.schemeName = "Bearer"; // required by parent abstract; not used 
because we override
+  }
+
+  /**
+   * Creates a provider in SIMPLE_SESSION mode. The Trino session user name is 
encoded using the
+   * same Basic-auth format as {@code SimpleTokenProvider}.
+   */
+  TrinoSessionAuthProvider() {
+    this.mode = Mode.SIMPLE_SESSION;
+    this.credentialKey = null;
+    this.schemeName = "Basic"; // required by parent abstract; not used 
because we override
+  }
+
+  /**
+   * Reads credentials from the current session and stores them in the 
per-thread holder.
+   *
+   * @param session the current Trino connector session
+   */
+  void applySession(ConnectorSession session) {
+    // Check if token is already set for this thread to avoid redundant 
operations
+    if (TOKEN_HOLDER.get() != null) {
+      return;
+    }
+

Review Comment:
   Early-returning when a token is already set can cause stale credentials to 
be used if `applySession()` is invoked again on the same thread for a different 
Trino session (or if some outer scope applies a session once per query). 
Instead of skipping, overwrite the thread-local value (or track/compare the 
current user/token to ensure correctness).
   ```suggestion
   
   ```



##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/security/TrinoSessionAuthProvider.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.security;
+
+import io.trino.spi.connector.ConnectorSession;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.client.CustomTokenProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link CustomTokenProvider} that forwards the Trino session user's 
credentials to Gravitino on
+ * every request by storing them in a {@link ThreadLocal}.
+ *
+ * <p>There are two operating modes, selected at construction time:
+ *
+ * <ul>
+ *   <li><b>BEARER_TOKEN</b> – reads a Bearer token from the Trino session's 
extra credentials using
+ *       a configured key, and forwards it as {@code Authorization: Bearer 
<token>}. Used when
+ *       {@code authType=oauth2} and {@code forwardUser=true}.
+ *   <li><b>SIMPLE_SESSION</b> – encodes the Trino session username using the 
same Basic-auth format
+ *       as {@code SimpleTokenProvider}, so the Gravitino server sees the 
actual Trino user rather
+ *       than a shared service account. Used when {@code authType=simple} and 
{@code
+ *       forwardUser=true}.
+ * </ul>
+ *
+ * <p>When no session has been applied (e.g. during connector start-up), 
{@link #hasTokenData()}
+ * returns {@code false} so that the request is sent without an {@code 
Authorization} header.
+ *
+ * @since 1.3.0
+ */
+public class TrinoSessionAuthProvider extends CustomTokenProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TrinoSessionAuthProvider.class);
+
+  /** Distinguishes between the two operating modes. */
+  enum Mode {
+    BEARER_TOKEN,
+    SIMPLE_SESSION
+  }
+
+  private static final ThreadLocal<byte[]> TOKEN_HOLDER = new ThreadLocal<>();
+
+  /** Cache of encoded tokens per username to avoid repeated Base64 encoding. 
*/
+  private static final ConcurrentHashMap<String, byte[]> TOKEN_CACHE = new 
ConcurrentHashMap<>();

Review Comment:
   The username→token cache is unbounded and will grow for every distinct user 
seen by the coordinator/workers, which can be problematic in large multi-tenant 
clusters. Consider removing the cache (Base64 encoding is cheap), adding an 
upper bound/eviction (e.g., Caffeine), or scoping the cache to the 
provider/connector lifecycle with clear/close behavior.



##########
trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java:
##########
@@ -102,6 +103,11 @@ public static void main(String[] args) throws Exception {
               + "The JAR file under ${trino_connector_dir} will be copied into 
the test image, "
               + "the default value is 
${project_root}/trino-connector/trino-connector/build/libs.");
 
+      options.addOption(
+          "env_only",
+          false,
+          "Start the environment (Gravitino + Trino) and keep it running for 
manual testing. Press Ctrl+C to shutdown.");

Review Comment:
   This defines `env_only` as the short option via `addOption(String opt, 
...)`, which typically expects a single-character `-x` style flag. The script 
passes `--env_only`, which is parsed as a long option and may not be recognized 
here, causing the tool to ignore the flag. Define `env_only` as a proper long 
option (and optionally add a short alias) so `--env_only` works as intended.



##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/security/TrinoSessionAuthProvider.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.security;
+
+import io.trino.spi.connector.ConnectorSession;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.client.CustomTokenProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link CustomTokenProvider} that forwards the Trino session user's 
credentials to Gravitino on
+ * every request by storing them in a {@link ThreadLocal}.
+ *
+ * <p>There are two operating modes, selected at construction time:
+ *
+ * <ul>
+ *   <li><b>BEARER_TOKEN</b> – reads a Bearer token from the Trino session's 
extra credentials using
+ *       a configured key, and forwards it as {@code Authorization: Bearer 
<token>}. Used when
+ *       {@code authType=oauth2} and {@code forwardUser=true}.
+ *   <li><b>SIMPLE_SESSION</b> – encodes the Trino session username using the 
same Basic-auth format
+ *       as {@code SimpleTokenProvider}, so the Gravitino server sees the 
actual Trino user rather
+ *       than a shared service account. Used when {@code authType=simple} and 
{@code
+ *       forwardUser=true}.
+ * </ul>
+ *
+ * <p>When no session has been applied (e.g. during connector start-up), 
{@link #hasTokenData()}
+ * returns {@code false} so that the request is sent without an {@code 
Authorization} header.
+ *
+ * @since 1.3.0
+ */
+public class TrinoSessionAuthProvider extends CustomTokenProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TrinoSessionAuthProvider.class);
+
+  /** Distinguishes between the two operating modes. */
+  enum Mode {
+    BEARER_TOKEN,
+    SIMPLE_SESSION
+  }
+
+  private static final ThreadLocal<byte[]> TOKEN_HOLDER = new ThreadLocal<>();
+
+  /** Cache of encoded tokens per username to avoid repeated Base64 encoding. 
*/
+  private static final ConcurrentHashMap<String, byte[]> TOKEN_CACHE = new 
ConcurrentHashMap<>();
+
+  private final Mode mode;
+
+  /** The key to look up in Trino extra credentials (only used in OAUTH2_TOKEN 
mode). */
+  @Nullable private final String credentialKey;
+
+  /**
+   * Creates a provider in BEARER_TOKEN mode.
+   *
+   * @param credentialKey the key whose value in the Trino session's extra 
credentials is the Bearer
+   *     token
+   */
+  TrinoSessionAuthProvider(String credentialKey) {
+    this.mode = Mode.BEARER_TOKEN;
+    this.credentialKey = credentialKey;
+    this.schemeName = "Bearer"; // required by parent abstract; not used 
because we override
+  }
+
+  /**
+   * Creates a provider in SIMPLE_SESSION mode. The Trino session user name is 
encoded using the
+   * same Basic-auth format as {@code SimpleTokenProvider}.
+   */
+  TrinoSessionAuthProvider() {
+    this.mode = Mode.SIMPLE_SESSION;
+    this.credentialKey = null;
+    this.schemeName = "Basic"; // required by parent abstract; not used 
because we override
+  }
+
+  /**
+   * Reads credentials from the current session and stores them in the 
per-thread holder.
+   *
+   * @param session the current Trino connector session
+   */
+  void applySession(ConnectorSession session) {
+    // Check if token is already set for this thread to avoid redundant 
operations
+    if (TOKEN_HOLDER.get() != null) {
+      return;
+    }
+

Review Comment:
   The new session-forwarding behavior is core to the feature, but current 
tests only validate `GravitinoAuthProvider.build(...)` returns a 
non-null/nullable session context. Consider adding unit tests for 
`TrinoSessionAuthProvider` covering: (1) `hasTokenData()` before/after 
`applySession()` and after `clearSession()`, (2) SIMPLE mode Basic header 
encoding for a username, (3) OAUTH2 mode reads from extra credentials and sets 
Bearer header, and (4) nested/overlapping `applySession()` calls do not leak or 
reuse stale credentials.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to