nssalian commented on code in PR #16289:
URL: https://github.com/apache/iceberg/pull/16289#discussion_r3238987445
##########
core/src/main/java/org/apache/iceberg/BaseTransaction.java:
##########
@@ -320,10 +320,13 @@ private void commitReplaceTransaction(boolean orCreate) {
}
}
- // because this is a replace table, it will always completely
replace the table
- // metadata. even if it was just updated.
- if (base != underlyingOps.current()) {
- this.base = underlyingOps.current(); // just refreshed
+ // Replace transactions must not silently overwrite concurrent
commits. If the table
+ // metadata has changed since the transaction started, fail
instead of rebasing and
+ // merging staged updates.
+ if (base != null && underlyingOps.current() != base) {
Review Comment:
Since the base assignment was removed, this would cause the base to not be
updated and keep retrying. `applyUpdates` has a better approach here with
`PendingUpdateFailedException` to break the retry loop.
##########
core/src/main/java/org/apache/iceberg/UpdateRequirements.java:
##########
@@ -41,7 +41,9 @@ public static List<UpdateRequirement> forReplaceTable(
TableMetadata base, List<MetadataUpdate> metadataUpdates) {
Preconditions.checkArgument(null != base, "Invalid table metadata: null");
Preconditions.checkArgument(null != metadataUpdates, "Invalid metadata
updates: null");
- Builder builder = new Builder(base, true);
+ // use the same optimistic concurrency checks as ordinary commits; replace
commits must not
+ // silently drop concurrent schema, partition spec, sort order, or ref
changes.
+ Builder builder = new Builder(base, false);
Review Comment:
Can you add a test for this change?
##########
core/src/test/java/org/apache/iceberg/TestReplaceTableSafety.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Regression tests for GH-16232.
+ *
+ * <p>Replace transactions must fail on concurrent table metadata changes
rather than silently
+ * overwriting committed updates.
+ */
+public class TestReplaceTableSafety {
+
+ private static final Namespace NS = Namespace.of("db");
+ private static final TableIdentifier TABLE = TableIdentifier.of(NS, "tbl");
+
+ private static final Schema SCHEMA =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()), required(2, "data",
Types.StringType.get()));
+
+ private static final Schema SCHEMA_WITH_EXTRA_COL =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()),
+ NestedField.optional(3, "extra", Types.StringType.get()));
+
+ private static final DataFile FILE_A =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(2)
+ .build();
+
+ private static final DataFile FILE_B =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(2)
+ .build();
+
+ private InMemoryCatalog catalog;
+
+ @BeforeEach
+ public void before(@TempDir Path temp) {
+ catalog = new InMemoryCatalog();
+ catalog.initialize(
+ "in-memory",
+ ImmutableMap.of(
+ CatalogProperties.WAREHOUSE_LOCATION,
+ temp.toAbsolutePath().toString(),
+ CatalogProperties.FILE_IO_IMPL,
+ "org.apache.iceberg.inmemory.InMemoryFileIO"));
+ catalog.createNamespace(NS);
+ }
+
+ @Test
+ public void replaceVsSchemaUpdateFailsAndPreservesSchema() {
+ Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+ Transaction replace = catalog.buildTable(TABLE,
SCHEMA).replaceTransaction();
+ replace.newFastAppend().appendFile(FILE_A).commit();
+
+ // concurrent schema update
+ table.updateSchema().addColumn("extra", Types.StringType.get()).commit();
+
+ assertThatThrownBy(replace::commitTransaction)
+
.isInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class)
+ .hasMessageContaining("replace transaction");
+
+ Table after = catalog.loadTable(TABLE);
+
assertThat(after.schema().asStruct()).isEqualTo(SCHEMA_WITH_EXTRA_COL.asStruct());
+ }
+
+ @Test
+ public void replaceVsPropertyWriteFailsAndPreservesProperty() {
+ Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+ Transaction replace = catalog.buildTable(TABLE,
SCHEMA).replaceTransaction();
+ replace.newFastAppend().appendFile(FILE_A).commit();
+
+ // concurrent property update
+ table.updateProperties().set("k1", "v1").commit();
+
+ assertThatThrownBy(replace::commitTransaction)
+
.isInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class)
+ .hasMessageContaining("replace transaction");
+
+ Table after = catalog.loadTable(TABLE);
+ assertThat(after.properties()).containsEntry("k1", "v1");
+ }
+
+ @Test
+ public void replaceVsAppendFailsAndPreservesCommittedData() {
+ catalog.buildTable(TABLE, SCHEMA).create();
+
+ Transaction replace = catalog.buildTable(TABLE,
SCHEMA).replaceTransaction();
+ replace.newFastAppend().appendFile(FILE_A).commit();
+
+ // concurrent append
+ catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit();
+
+ assertThatThrownBy(replace::commitTransaction)
+
.isInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class)
+ .hasMessageContaining("replace transaction");
+
+ Table after = catalog.loadTable(TABLE);
+ // concurrent data remains reachable
+ assertThat(after.currentSnapshot()).isNotNull();
+ assertThat(after.newScan().planFiles()).hasSize(1);
+ }
+
+ @Test
+ public void replaceVsReplaceFailsSecondCommitAndPreservesFirst() {
+ catalog.buildTable(TABLE, SCHEMA).create();
+
+ Transaction secondReplace = catalog.buildTable(TABLE,
SCHEMA).replaceTransaction();
+ secondReplace.newFastAppend().appendFile(FILE_A).commit();
+
+ Transaction firstReplace = catalog.buildTable(TABLE,
SCHEMA).replaceTransaction();
+ firstReplace.newFastAppend().appendFile(FILE_B).commit();
+ firstReplace.commitTransaction();
+
+ assertThatThrownBy(secondReplace::commitTransaction)
+
.isInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class)
+ .hasMessageContaining("replace transaction");
+ }
+
+ @Test
+ public void
replaceVsExpireSnapshotsFailsAndDoesNotResurrectExpiredSnapshot() {
+ Table table = catalog.buildTable(TABLE, SCHEMA).create();
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotId = table.currentSnapshot().snapshotId();
+ table.newFastAppend().appendFile(FILE_B).commit();
+
+ Transaction replace = catalog.buildTable(TABLE,
SCHEMA).replaceTransaction();
+ replace.newFastAppend().appendFile(FILE_B).commit();
+
+ // concurrent expire
+ table.expireSnapshots().expireSnapshotId(snapshotId).commit();
+
+ assertThatThrownBy(replace::commitTransaction)
+
.isInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class)
+ .hasMessageContaining("replace transaction");
+
+ Table after = catalog.loadTable(TABLE);
+ assertThat(after.snapshot(snapshotId)).isNull();
+ }
+
+ @Test
+ public void replaceVsMultipleConcurrentChangesFailsAndPreservesAll() {
+ Table table = catalog.buildTable(TABLE, SCHEMA).create();
+
+ Transaction replace = catalog.buildTable(TABLE,
SCHEMA).replaceTransaction();
+ replace.newFastAppend().appendFile(FILE_A).commit();
+
+ // concurrent changes
+ table.updateSchema().addColumn("extra", Types.StringType.get()).commit();
+ table.updateProperties().set("k1", "v1").commit();
+
+ assertThatThrownBy(replace::commitTransaction)
+
.isInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class)
+ .hasMessageContaining("replace transaction");
+
+ Table after = catalog.loadTable(TABLE);
+
assertThat(after.schema().asStruct()).isEqualTo(SCHEMA_WITH_EXTRA_COL.asStruct());
+ assertThat(after.properties()).containsEntry("k1", "v1");
+ }
+}
Review Comment:
Few more tests would help.
1. For the `createOrReplaceTransaction (orCreate=true)`
2. v3 row-lineage tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]