flyrain commented on code in PR #3360:
URL: https://github.com/apache/polaris/pull/3360#discussion_r2893505462


##########
integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java:
##########
@@ -1189,6 +1189,101 @@ public void 
testMultipleConflictingCommitsToSingleTableInTransaction() {
     
assertThat(latestCommittedSchema.asStruct()).isEqualTo(originalSchema.asStruct());
   }
 
+  @Test
+  public void testCoalescedConflictOnOneTableRollsBackEntireTransaction() {
+    Namespace namespace = Namespace.of("coalescingAtomicNs");
+    TableIdentifier goodId = TableIdentifier.of(namespace, "goodTable");
+    TableIdentifier conflictId = TableIdentifier.of(namespace, 
"conflictTable");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(namespace);
+    }
+
+    catalog().createTable(goodId, SCHEMA);
+    catalog().createTable(conflictId, SCHEMA);
+
+    Schema originalGoodSchema = catalog().loadTable(goodId).schema();
+    Schema originalConflictSchema = catalog().loadTable(conflictId).schema();
+
+    // goodTable: a single non-conflicting schema change
+    Transaction goodTx = catalog().loadTable(goodId).newTransaction();
+    goodTx.updateSchema().addColumn("new_col", Types.LongType.get()).commit();
+
+    // conflictTable: two independent transactions that both rename the same 
column,
+    // producing two TableCommits for the same table. The first rename 
succeeds,
+    // but the second conflicts because the schema has already changed.
+    Table conflictTable = catalog().loadTable(conflictId);
+    Transaction conflictTx1 = conflictTable.newTransaction();
+    Transaction conflictTx2 = conflictTable.newTransaction();
+    conflictTx1.updateSchema().renameColumn("data", "renamed-col1").commit();
+    conflictTx2.updateSchema().renameColumn("data", "renamed-col2").commit();
+
+    TableCommit goodCommit =
+        TableCommit.create(
+            goodId,
+            ((BaseTransaction) goodTx).startMetadata(),
+            ((BaseTransaction) goodTx).currentMetadata());
+    TableCommit conflictCommit1 =
+        TableCommit.create(
+            conflictId,
+            ((BaseTransaction) conflictTx1).startMetadata(),
+            ((BaseTransaction) conflictTx1).currentMetadata());
+    TableCommit conflictCommit2 =
+        TableCommit.create(
+            conflictId,
+            ((BaseTransaction) conflictTx2).startMetadata(),
+            ((BaseTransaction) conflictTx2).currentMetadata());
+
+    // The coalescing logic groups conflictCommit1 and conflictCommit2 
together.
+    // The first rename succeeds, but the second fails requirement validation
+    // (schema ID changed). This should fail the entire transaction atomically.
+    assertThatThrownBy(
+            () -> restCatalog.commitTransaction(goodCommit, conflictCommit1, 
conflictCommit2))
+        .isInstanceOf(CommitFailedException.class);
+
+    // Verify atomicity: neither table should have changed.
+    assertThat(catalog().loadTable(goodId).schema().asStruct())
+        .isEqualTo(originalGoodSchema.asStruct());
+    assertThat(catalog().loadTable(conflictId).schema().asStruct())
+        .isEqualTo(originalConflictSchema.asStruct());
+  }
+
+  @Test
+  public void 
testMultipleNonConflictingUpdatesToSameTableWithSchemaAndProperties() {
+    Namespace namespace = Namespace.of("coalescingMixedNs");
+    TableIdentifier identifier = TableIdentifier.of(namespace, 
"coalescingMixedTable");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(namespace);
+    }
+
+    // Use a single Iceberg transaction that performs both a schema change and 
a property update.
+    // This produces a single TableCommit containing multiple metadata 
updates, verifying
+    // that the server correctly handles multiple update types within a single 
commit request.
+    Table table = catalog().buildTable(identifier, SCHEMA).create();
+    Transaction transaction = table.newTransaction();
+
+    UpdateSchema updateSchema =
+        transaction.updateSchema().addColumn("new_col", Types.LongType.get());
+    Schema expectedSchema = updateSchema.apply();
+    updateSchema.commit();
+
+    transaction.updateProperties().set("prop-key", "prop-val").commit();

Review Comment:
   Is behavior deterministic when we have two property updates like this? 
   1.  transaction.updateProperties().set("prop-key", "prop-val1").commit();
   2. transaction.updateProperties().set("prop-key", "prop-val2").commit();
   
   If not, we may check with the Iceberg community to clarify the behavior. Not 
a blocker. 



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1033,60 +1034,82 @@ public void commitTransaction(CommitTransactionRequest 
commitTransactionRequest)
         new TransactionWorkspaceMetaStoreManager(diagnostics(), 
metaStoreManager());
     ((IcebergCatalog) 
baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
 
+    // Group all changes by table identifier to handle them atomically.
+    // This prevents conflicts when multiple changes target the same table 
entity.
+    // LinkedHashMap preserves insertion order for deterministic processing.
+    Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new 
LinkedHashMap<>();
+    for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) {
+      if (CatalogHandlerUtils.isCreate(change)) {
+        throw new BadRequestException(
+            "Unsupported operation: commitTranaction with 
updateForStagedCreate: %s", change);
+      }
+      changesByTable.computeIfAbsent(change.identifier(), k -> new 
ArrayList<>()).add(change);
+    }
+
+    // Process each table's changes in order.
+    // Note: All UpdateTableRequests for a given table are coalesced into a 
single metadata
+    // update and a single tableOps.commit(), which results in one Polaris 
entity update per
+    // table. This is subtly different from applying each UpdateTableRequest 
as an independent
+    // commit (as if each were under a lock). Requirements are still validated 
sequentially
+    // against the evolving metadata, so conflicts are detected correctly.
+    // See also the TODO in TransactionWorkspaceMetaStoreManager for a more 
general (but more
+    // complex) alternative that would intercept at the MetaStoreManager layer.
     List<TableMetadata> tableMetadataObjs = new ArrayList<>();
-    commitTransactionRequest.tableChanges().stream()
-        .forEach(
-            change -> {
-              Table table = baseCatalog.loadTable(change.identifier());
-              if (!(table instanceof BaseTable baseTable)) {
-                throw new IllegalStateException(
-                    "Cannot wrap catalog that does not produce BaseTable");
-              }
-              if (CatalogHandlerUtils.isCreate(change)) {
-                throw new BadRequestException(
-                    "Unsupported operation: commitTranaction with 
updateForStagedCreate: %s",
-                    change);
+    changesByTable.forEach(
+        (tableIdentifier, changes) -> {
+          Table table = baseCatalog.loadTable(tableIdentifier);
+          if (!(table instanceof BaseTable baseTable)) {
+            throw new IllegalStateException("Cannot wrap catalog that does not 
produce BaseTable");
+          }
+
+          TableOperations tableOps = baseTable.operations();
+          TableMetadata baseMetadata = tableOps.current();
+
+          // Apply each change sequentially: validate requirements against 
current state,
+          // then apply updates. This ensures conflicts are detected (e.g., if 
two changes
+          // both expect schema ID 0, the second will fail after the first 
increments it).
+          TableMetadata currentMetadata = baseMetadata;
+          for (UpdateTableRequest change : changes) {
+            // Validate requirements against the current metadata state
+            final TableMetadata metadataForValidation = currentMetadata;
+            change
+                .requirements()
+                .forEach(requirement -> 
requirement.validate(metadataForValidation));
+
+            // TODO: Refactor to share/reconcile the update-application logic 
below with
+            // CatalogHandlerUtils to avoid divergence as complexity grows.
+            TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(currentMetadata);
+            for (MetadataUpdate singleUpdate : change.updates()) {
+              // Note: If location-overlap checking is refactored to be 
atomic, we could
+              // support validation within a single multi-table transaction as 
well, but
+              // will need to update the TransactionWorkspaceMetaStoreManager 
to better
+              // expose the concept of being able to read uncommitted updates.
+              if (singleUpdate instanceof MetadataUpdate.SetLocation 
setLocation) {
+                if (!currentMetadata.location().equals(setLocation.location())
+                    && !realmConfig()
+                        
.getConfig(FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {

Review Comment:
   This check allows the operation `setLocation` when 
ALLOW_NAMESPACE_LOCATION_OVERLAP is true.  I think the behavior isn't correct.  
cc @dennishuo @collado-mike 
   However, it isn't a blocker for me as this PR doesn't the logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to