This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.2 by this push:
new fbba09e1cd [Cherry-pick to branch-1.2] [#10766] fix(iceberg): skip
table import for staged creates in IcebergTableHookDispatcher (#10767) (#10802)
fbba09e1cd is described below
commit fbba09e1cd9a5e566818c5a953979b492f30dc0a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Apr 17 13:47:43 2026 +0800
[Cherry-pick to branch-1.2] [#10766] fix(iceberg): skip table import for
staged creates in IcebergTableHookDispatcher (#10767) (#10802)
**Cherry-pick Information:**
- Original commit: 12d878fadf3a811d93b3096867cee76d54b07c10
- Target branch: `branch-1.2`
- Status: ⚠️ **Has conflicts - manual resolution required**
Please review and resolve the conflicts before merging.
---------
Co-authored-by: Bharath Krishna <[email protected]>
---
.../dispatcher/IcebergTableHookDispatcher.java | 39 ++++++++----
.../dispatcher/TestIcebergTableHookDispatcher.java | 69 +++++++++++++++++++++-
2 files changed, 95 insertions(+), 13 deletions(-)
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index 0f498db38d..46c918a5b9 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -32,6 +32,7 @@ import
org.apache.gravitino.listener.api.event.IcebergRequestContext;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
@@ -57,14 +58,11 @@ public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatch
public LoadTableResponse createTable(
IcebergRequestContext context, Namespace namespace, CreateTableRequest
createTableRequest) {
LoadTableResponse response = dispatcher.createTable(context, namespace,
createTableRequest);
- importTable(context.catalogName(), namespace, createTableRequest.name());
- IcebergOwnershipUtils.setTableOwner(
- metalake,
- context.catalogName(),
- namespace,
- createTableRequest.name(),
- context.userName(),
- GravitinoEnv.getInstance().ownerDispatcher());
+ // Skip import and ownership for staged creates (credential vending).
+ // The table will be imported when the staged table is committed via
updateTable.
+ if (!createTableRequest.stageCreate()) {
+ importTableAndSetOwner(context, namespace, createTableRequest.name());
+ }
return response;
}
@@ -74,7 +72,18 @@ public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatch
IcebergRequestContext context,
TableIdentifier tableIdentifier,
UpdateTableRequest updateTableRequest) {
- return dispatcher.updateTable(context, tableIdentifier,
updateTableRequest);
+ LoadTableResponse response =
+ dispatcher.updateTable(context, tableIdentifier, updateTableRequest);
+ // Import the table and set ownership only when committing a staged table
create.
+ // A staged create commit is identified by the AssertTableDoesNotExist
requirement,
+ // which is set by UpdateRequirements.forCreateTable().
+ boolean isStagedCreateCommit =
+ updateTableRequest.requirements().stream()
+
.anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance);
+ if (isStagedCreateCommit) {
+ importTableAndSetOwner(context, tableIdentifier.namespace(),
tableIdentifier.name());
+ }
+ return response;
}
@Override
@@ -179,12 +188,20 @@ public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatch
return dispatcher.planTableScan(context, tableIdentifier, scanRequest);
}
- private void importTable(String catalogName, Namespace namespace, String
tableName) {
+ private void importTableAndSetOwner(
+ IcebergRequestContext context, Namespace namespace, String tableName) {
TableDispatcher tableDispatcher =
GravitinoEnv.getInstance().tableDispatcher();
if (tableDispatcher != null) {
tableDispatcher.loadTable(
IcebergIdentifierUtils.toGravitinoTableIdentifier(
- metalake, catalogName, TableIdentifier.of(namespace,
tableName)));
+ metalake, context.catalogName(), TableIdentifier.of(namespace,
tableName)));
}
+ IcebergOwnershipUtils.setTableOwner(
+ metalake,
+ context.catalogName(),
+ namespace,
+ tableName,
+ context.userName(),
+ GravitinoEnv.getInstance().ownerDispatcher());
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
index 904489cb06..ef5f25f832 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
@@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -29,6 +30,7 @@ import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
@@ -45,6 +47,7 @@ import
org.apache.gravitino.listener.api.event.IcebergRequestContext;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.TableEntity;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
@@ -251,9 +254,65 @@ public class TestIcebergTableHookDispatcher {
}
@Test
- public void testUpdateTablePassesThrough() {
+ public void testCreateTableSkipsImportAndOwnershipForStageCreate() {
+ Namespace namespace = Namespace.of("test_schema");
+ CreateTableRequest request =
+ CreateTableRequest.builder()
+ .withName("test_table")
+ .withSchema(TABLE_SCHEMA)
+ .stageCreate()
+ .build();
+
+ LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+ when(mockDispatcher.createTable(mockContext, namespace,
request)).thenReturn(mockResponse);
+
+ LoadTableResponse result = hookDispatcher.createTable(mockContext,
namespace, request);
+
+ Assertions.assertEquals(mockResponse, result);
+ verify(mockDispatcher).createTable(mockContext, namespace, request);
+
+ // Verify table import was NOT called for staged create
+ verify(mockTableDispatcher, never()).loadTable(any());
+
+ // Verify ownership was NOT set for staged create
+ verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
+ }
+
+ @Test
+ public void testUpdateTableImportsAndSetsOwnershipForStagedCommit() {
+ TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table");
+ // A staged create commit carries AssertTableDoesNotExist as its
requirement.
+ UpdateTableRequest request =
+ new UpdateTableRequest(
+ List.of(new UpdateRequirement.AssertTableDoesNotExist()),
Collections.emptyList());
+ LoadTableResponse mockResponse = mock(LoadTableResponse.class);
+
+ when(mockDispatcher.updateTable(mockContext, tableId,
request)).thenReturn(mockResponse);
+
+ LoadTableResponse result = hookDispatcher.updateTable(mockContext,
tableId, request);
+
+ Assertions.assertEquals(mockResponse, result);
+ verify(mockDispatcher).updateTable(mockContext, tableId, request);
+
+ // Verify table import was called
+ NameIdentifier gravitinoTableId =
+ IcebergIdentifierUtils.toGravitinoTableIdentifier(TEST_METALAKE,
TEST_CATALOG, tableId);
+ verify(mockTableDispatcher).loadTable(gravitinoTableId);
+
+ // Verify ownership was set
+ ArgumentCaptor<String> userCaptor = ArgumentCaptor.forClass(String.class);
+ verify(mockOwnerDispatcher)
+ .setOwner(eq(TEST_METALAKE), any(), userCaptor.capture(),
eq(Owner.Type.USER));
+ Assertions.assertEquals(TEST_USER, userCaptor.getValue());
+ }
+
+ @Test
+ public void testUpdateTableSkipsImportForRegularUpdate() {
TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table");
- UpdateTableRequest request = mock(UpdateTableRequest.class);
+ // Regular table update (e.g. property change): no AssertTableDoesNotExist
requirement.
+ // Import and ownership must NOT be triggered regardless of entity store
state.
+ UpdateTableRequest request =
+ new UpdateTableRequest(Collections.emptyList(),
Collections.emptyList());
LoadTableResponse mockResponse = mock(LoadTableResponse.class);
when(mockDispatcher.updateTable(mockContext, tableId,
request)).thenReturn(mockResponse);
@@ -262,6 +321,12 @@ public class TestIcebergTableHookDispatcher {
Assertions.assertEquals(mockResponse, result);
verify(mockDispatcher).updateTable(mockContext, tableId, request);
+
+ // Verify table import was NOT called for a regular update
+ verify(mockTableDispatcher, never()).loadTable(any());
+
+ // Verify ownership was NOT set
+ verify(mockOwnerDispatcher, never()).setOwner(any(), any(), any(), any());
}
@Test