This is an automated email from the ASF dual-hosted git repository.
bharos92 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 12d878fadf [#10766] fix(iceberg): skip table import for staged creates
in IcebergTableHookDispatcher (#10767)
12d878fadf is described below
commit 12d878fadf3a811d93b3096867cee76d54b07c10
Author: Bharath Krishna <[email protected]>
AuthorDate: Thu Apr 16 08:09:58 2026 -0700
[#10766] fix(iceberg): skip table import for staged creates in
IcebergTableHookDispatcher (#10767)
<!--
1. Title: [#<issue>] <type>(<scope>): <subject>
Examples:
- "[#123] feat(operator): Support xxx"
- "[#233] fix: Check null before access result in xxx"
- "[MINOR] refactor: Fix typo in variable name"
- "[MINOR] docs: Fix typo in README"
- "[#255] test: Fix flaky test NameOfTheTest"
Reference: https://www.conventionalcommits.org/en/v1.0.0/
2. If the PR is unfinished, please mark this PR as draft.
-->
### What changes were proposed in this pull request?
When credential vending is enabled,
`IcebergTableHookDispatcher.createTable()` now
skips `importTable()` and `setTableOwner()` for staged creates
(`stageCreate=true`).
Instead, the import and ownership assignment are deferred to
`updateTable()`, which
checks if the table entity already exists in the EntityStore and, if not
(i.e., a
staged create being committed), performs the import and ownership setup
at that point.
### Why are the changes needed?
With credential vending enabled, Trino sends all CREATE TABLE requests
with
`stageCreate=true`. The Iceberg REST protocol stages the table (builds
metadata
in memory without committing) and returns vended credentials. The table
is only
committed to the catalog when the client later calls `updateTable`.
`IcebergTableHookDispatcher.createTable()` unconditionally called
`importTable()`,
which tried to load the non-existent table from the catalog and threw
`NoSuchTableException`, failing every table create when credential
vending is active.
Fix: #10766
### Does this PR introduce _any_ user-facing change?
No. This fixes an internal error that prevented table creation when
credential
vending was enabled. No API or property changes.
### How was this patch tested?
- Unit tests
- Also tested this with a dev Gravitino deployment
---
.../dispatcher/IcebergTableHookDispatcher.java | 39 ++++++++----
.../dispatcher/TestIcebergTableHookDispatcher.java | 69 +++++++++++++++++++++-
2 files changed, 95 insertions(+), 13 deletions(-)
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index 9775b36b5c..3a52e2428d 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -33,6 +33,7 @@ import
org.apache.gravitino.listener.api.event.IcebergRequestContext;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
@@ -58,14 +59,11 @@ public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatch
public LoadTableResponse createTable(
IcebergRequestContext context, Namespace namespace, CreateTableRequest
createTableRequest) {
LoadTableResponse response = dispatcher.createTable(context, namespace,
createTableRequest);
- importTable(context.catalogName(), namespace, createTableRequest.name());
- IcebergOwnershipUtils.setTableOwner(
- metalake,
- context.catalogName(),
- namespace,
- createTableRequest.name(),
- context.userName(),
- GravitinoEnv.getInstance().ownerDispatcher());
+ // Skip import and ownership for staged creates (credential vending).
+ // The table will be imported when the staged table is committed via
updateTable.
+ if (!createTableRequest.stageCreate()) {
+ importTableAndSetOwner(context, namespace, createTableRequest.name());
+ }
return response;
}
@@ -75,7 +73,18 @@ public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatch
IcebergRequestContext context,
TableIdentifier tableIdentifier,
UpdateTableRequest updateTableRequest) {
- return dispatcher.updateTable(context, tableIdentifier,
updateTableRequest);
+ LoadTableResponse response =
+ dispatcher.updateTable(context, tableIdentifier, updateTableRequest);
+ // Import the table and set ownership only when committing a staged table
create.
+ // A staged create commit is identified by the AssertTableDoesNotExist
requirement,
+ // which is set by UpdateRequirements.forCreateTable().
+ boolean isStagedCreateCommit =
+ updateTableRequest.requirements().stream()
+
.anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance);
+ if (isStagedCreateCommit) {
+ importTableAndSetOwner(context, tableIdentifier.namespace(),
tableIdentifier.name());
+ }
+ return response;
}
@Override
@@ -186,12 +195,20 @@ public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatch
return dispatcher.getTableMetadataLocation(context, tableIdentifier);
}
- private void importTable(String catalogName, Namespace namespace, String
tableName) {
+ private void importTableAndSetOwner(
+ IcebergRequestContext context, Namespace namespace, String tableName) {
TableDispatcher tableDispatcher =
GravitinoEnv.getInstance().tableDispatcher();
if (tableDispatcher != null) {
tableDispatcher.loadTable(
IcebergIdentifierUtils.toGravitinoTableIdentifier(
- metalake, catalogName, TableIdentifier.of(namespace,
tableName)));
+ metalake, context.catalogName(), TableIdentifier.of(namespace,
tableName)));
}
+ IcebergOwnershipUtils.setTableOwner(
+ metalake,
+ context.catalogName(),
+ namespace,
+ tableName,
+ context.userName(),
+ GravitinoEnv.getInstance().ownerDispatcher());
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
index 904489cb06..ef5f25f832 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
@@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -29,6 +30,7 @@ import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
@@ -45,6 +47,7 @@ import
org.apache.gravitino.listener.api.event.IcebergRequestContext;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.TableEntity;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
@@ -251,9 +254,65 @@ public class TestIcebergTableHookDispatcher {
}
@Test
- public void testUpdateTablePassesThrough() {
+ public void testCreateTableSkipsImportAndOwnershipForStageCreate() {
+ Namespace namespace = Namespace.of("test_schema");
+ CreateTableRequest request =
+ CreateTableRequest.builder()
+ .withName("test_table")
+ .withSchema(TABLE_SCHEMA)
+ .stageCreate()
+ .build();
+
+ LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+ when(mockDispatcher.createTable(mockContext, namespace,
request)).thenReturn(mockResponse);
+
+ LoadTableResponse result = hookDispatcher.createTable(mockContext,
namespace, request);
+
+ Assertions.assertEquals(mockResponse, result);
+ verify(mockDispatcher).createTable(mockContext, namespace, request);
+
+ // Verify table import was NOT called for staged create
+ verify(mockTableDispatcher, never()).loadTable(any());
+
+ // Verify ownership was NOT set for staged create
+ verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
+ }
+
+ @Test
+ public void testUpdateTableImportsAndSetsOwnershipForStagedCommit() {
+ TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table");
+ // A staged create commit carries AssertTableDoesNotExist as its
requirement.
+ UpdateTableRequest request =
+ new UpdateTableRequest(
+ List.of(new UpdateRequirement.AssertTableDoesNotExist()),
Collections.emptyList());
+ LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+
+ when(mockDispatcher.updateTable(mockContext, tableId,
request)).thenReturn(mockResponse);
+
+ LoadTableResponse result = hookDispatcher.updateTable(mockContext,
tableId, request);
+
+ Assertions.assertEquals(mockResponse, result);
+ verify(mockDispatcher).updateTable(mockContext, tableId, request);
+
+ // Verify table import was called
+ NameIdentifier gravitinoTableId =
+ IcebergIdentifierUtils.toGravitinoTableIdentifier(TEST_METALAKE,
TEST_CATALOG, tableId);
+ verify(mockTableDispatcher).loadTable(gravitinoTableId);
+
+ // Verify ownership was set
+ ArgumentCaptor<String> userCaptor = ArgumentCaptor.forClass(String.class);
+ verify(mockOwnerDispatcher)
+ .setOwner(eq(TEST_METALAKE), any(), userCaptor.capture(),
eq(Owner.Type.USER));
+ Assertions.assertEquals(TEST_USER, userCaptor.getValue());
+ }
+
+ @Test
+ public void testUpdateTableSkipsImportForRegularUpdate() {
TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table");
- UpdateTableRequest request = mock(UpdateTableRequest.class);
+ // Regular table update (e.g. property change): no AssertTableDoesNotExist
requirement.
+ // Import and ownership must NOT be triggered regardless of entity store
state.
+ UpdateTableRequest request =
+ new UpdateTableRequest(Collections.emptyList(),
Collections.emptyList());
LoadTableResponse mockResponse = mock(LoadTableResponse.class);
when(mockDispatcher.updateTable(mockContext, tableId,
request)).thenReturn(mockResponse);
@@ -262,6 +321,12 @@ public class TestIcebergTableHookDispatcher {
Assertions.assertEquals(mockResponse, result);
verify(mockDispatcher).updateTable(mockContext, tableId, request);
+
+ // Verify table import was NOT called for a regular update
+ verify(mockTableDispatcher, never()).loadTable(any());
+
+ // Verify ownership was NOT set
+ verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
}
@Test