This is an automated email from the ASF dual-hosted git repository.

bharos92 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 12d878fadf [#10766] fix(iceberg): skip table import for staged creates 
in IcebergTableHookDispatcher (#10767)
12d878fadf is described below

commit 12d878fadf3a811d93b3096867cee76d54b07c10
Author: Bharath Krishna <[email protected]>
AuthorDate: Thu Apr 16 08:09:58 2026 -0700

    [#10766] fix(iceberg): skip table import for staged creates in 
IcebergTableHookDispatcher (#10767)
    
    <!--
    1. Title: [#<issue>] <type>(<scope>): <subject>
       Examples:
         - "[#123] feat(operator): Support xxx"
         - "[#233] fix: Check null before access result in xxx"
         - "[MINOR] refactor: Fix typo in variable name"
         - "[MINOR] docs: Fix typo in README"
         - "[#255] test: Fix flaky test NameOfTheTest"
       Reference: https://www.conventionalcommits.org/en/v1.0.0/
    2. If the PR is unfinished, please mark this PR as draft.
    -->
    
    ### What changes were proposed in this pull request?
    
    When credential vending is enabled,
    `IcebergTableHookDispatcher.createTable()` now
    skips `importTable()` and `setTableOwner()` for staged creates
    (`stageCreate=true`).
    Instead, the import and ownership assignment are deferred to
    `updateTable()`, which
    checks if the table entity already exists in the EntityStore and, if not
    (i.e., a
    staged create being committed), performs the import and ownership setup
    at that point.
    
    
    ### Why are the changes needed?
    
    With credential vending enabled, Trino sends all CREATE TABLE requests
    with
    `stageCreate=true`. The Iceberg REST protocol stages the table (builds
    metadata
    in memory without committing) and returns vended credentials. The table
    is only
    committed to the catalog when the client later calls `updateTable`.
    
    `IcebergTableHookDispatcher.createTable()` unconditionally called
    `importTable()`,
    which tried to load the non-existent table from the catalog and threw
    `NoSuchTableException`, failing every table create when credential
    vending is active.
    
    Fix: #10766
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This fixes an internal error that prevented table creation when
    credential
    vending was enabled. No API or property changes.
    
    ### How was this patch tested?
    
    - Unit tests
    - Also tested this with a dev Gravitino deployment
---
 .../dispatcher/IcebergTableHookDispatcher.java     | 39 ++++++++----
 .../dispatcher/TestIcebergTableHookDispatcher.java | 69 +++++++++++++++++++++-
 2 files changed, 95 insertions(+), 13 deletions(-)

diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index 9775b36b5c..3a52e2428d 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -33,6 +33,7 @@ import 
org.apache.gravitino.listener.api.event.IcebergRequestContext;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.UpdateRequirement;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
@@ -58,14 +59,11 @@ public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatch
   public LoadTableResponse createTable(
       IcebergRequestContext context, Namespace namespace, CreateTableRequest 
createTableRequest) {
     LoadTableResponse response = dispatcher.createTable(context, namespace, 
createTableRequest);
-    importTable(context.catalogName(), namespace, createTableRequest.name());
-    IcebergOwnershipUtils.setTableOwner(
-        metalake,
-        context.catalogName(),
-        namespace,
-        createTableRequest.name(),
-        context.userName(),
-        GravitinoEnv.getInstance().ownerDispatcher());
+    // Skip import and ownership for staged creates (credential vending).
+    // The table will be imported when the staged table is committed via 
updateTable.
+    if (!createTableRequest.stageCreate()) {
+      importTableAndSetOwner(context, namespace, createTableRequest.name());
+    }
 
     return response;
   }
@@ -75,7 +73,18 @@ public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatch
       IcebergRequestContext context,
       TableIdentifier tableIdentifier,
       UpdateTableRequest updateTableRequest) {
-    return dispatcher.updateTable(context, tableIdentifier, 
updateTableRequest);
+    LoadTableResponse response =
+        dispatcher.updateTable(context, tableIdentifier, updateTableRequest);
+    // Import the table and set ownership only when committing a staged table 
create.
+    // A staged create commit is identified by the AssertTableDoesNotExist 
requirement,
+    // which is set by UpdateRequirements.forCreateTable().
+    boolean isStagedCreateCommit =
+        updateTableRequest.requirements().stream()
+            
.anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance);
+    if (isStagedCreateCommit) {
+      importTableAndSetOwner(context, tableIdentifier.namespace(), 
tableIdentifier.name());
+    }
+    return response;
   }
 
   @Override
@@ -186,12 +195,20 @@ public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatch
     return dispatcher.getTableMetadataLocation(context, tableIdentifier);
   }
 
-  private void importTable(String catalogName, Namespace namespace, String 
tableName) {
+  private void importTableAndSetOwner(
+      IcebergRequestContext context, Namespace namespace, String tableName) {
     TableDispatcher tableDispatcher = 
GravitinoEnv.getInstance().tableDispatcher();
     if (tableDispatcher != null) {
       tableDispatcher.loadTable(
           IcebergIdentifierUtils.toGravitinoTableIdentifier(
-              metalake, catalogName, TableIdentifier.of(namespace, 
tableName)));
+              metalake, context.catalogName(), TableIdentifier.of(namespace, 
tableName)));
     }
+    IcebergOwnershipUtils.setTableOwner(
+        metalake,
+        context.catalogName(),
+        namespace,
+        tableName,
+        context.userName(),
+        GravitinoEnv.getInstance().ownerDispatcher());
   }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
index 904489cb06..ef5f25f832 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
@@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
@@ -45,6 +47,7 @@ import 
org.apache.gravitino.listener.api.event.IcebergRequestContext;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateRequirement;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
@@ -251,9 +254,65 @@ public class TestIcebergTableHookDispatcher {
   }
 
   @Test
-  public void testUpdateTablePassesThrough() {
+  public void testCreateTableSkipsImportAndOwnershipForStageCreate() {
+    Namespace namespace = Namespace.of("test_schema");
+    CreateTableRequest request =
+        CreateTableRequest.builder()
+            .withName("test_table")
+            .withSchema(TABLE_SCHEMA)
+            .stageCreate()
+            .build();
+
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+    when(mockDispatcher.createTable(mockContext, namespace, 
request)).thenReturn(mockResponse);
+
+    LoadTableResponse result = hookDispatcher.createTable(mockContext, 
namespace, request);
+
+    Assertions.assertEquals(mockResponse, result);
+    verify(mockDispatcher).createTable(mockContext, namespace, request);
+
+    // Verify table import was NOT called for staged create
+    verify(mockTableDispatcher, never()).loadTable(any());
+
+    // Verify ownership was NOT set for staged create
+    verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
+  }
+
+  @Test
+  public void testUpdateTableImportsAndSetsOwnershipForStagedCommit() {
+    TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table");
+    // A staged create commit carries AssertTableDoesNotExist as its 
requirement.
+    UpdateTableRequest request =
+        new UpdateTableRequest(
+            List.of(new UpdateRequirement.AssertTableDoesNotExist()), 
Collections.emptyList());
+    LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+
+    when(mockDispatcher.updateTable(mockContext, tableId, 
request)).thenReturn(mockResponse);
+
+    LoadTableResponse result = hookDispatcher.updateTable(mockContext, 
tableId, request);
+
+    Assertions.assertEquals(mockResponse, result);
+    verify(mockDispatcher).updateTable(mockContext, tableId, request);
+
+    // Verify table import was called
+    NameIdentifier gravitinoTableId =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(TEST_METALAKE, 
TEST_CATALOG, tableId);
+    verify(mockTableDispatcher).loadTable(gravitinoTableId);
+
+    // Verify ownership was set
+    ArgumentCaptor<String> userCaptor = ArgumentCaptor.forClass(String.class);
+    verify(mockOwnerDispatcher)
+        .setOwner(eq(TEST_METALAKE), any(), userCaptor.capture(), 
eq(Owner.Type.USER));
+    Assertions.assertEquals(TEST_USER, userCaptor.getValue());
+  }
+
+  @Test
+  public void testUpdateTableSkipsImportForRegularUpdate() {
     TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table");
-    UpdateTableRequest request = mock(UpdateTableRequest.class);
+    // Regular table update (e.g. property change): no AssertTableDoesNotExist 
requirement.
+    // Import and ownership must NOT be triggered regardless of entity store 
state.
+    UpdateTableRequest request =
+        new UpdateTableRequest(Collections.emptyList(), 
Collections.emptyList());
     LoadTableResponse mockResponse = mock(LoadTableResponse.class);
 
     when(mockDispatcher.updateTable(mockContext, tableId, 
request)).thenReturn(mockResponse);
@@ -262,6 +321,12 @@ public class TestIcebergTableHookDispatcher {
 
     Assertions.assertEquals(mockResponse, result);
     verify(mockDispatcher).updateTable(mockContext, tableId, request);
+
+    // Verify table import was NOT called for a regular update
+    verify(mockTableDispatcher, never()).loadTable(any());
+
+    // Verify ownership was NOT set
+    verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
   }
 
   @Test

Reply via email to