manisin opened a new issue, #16232:
URL: https://github.com/apache/iceberg/issues/16232

   ### Apache Iceberg version
   
   main (also affects 1.5.x, 1.6.x, 1.7.x)
   
   ### Query engine
   
   Any (tested with Java SDK directly via InMemoryCatalog)
   
   ### Please describe the bug 🐞
   
   ## Summary
   
   `CREATE OR REPLACE TABLE` (via `replaceTransaction()`) silently overwrites 
concurrent committed changes to the same table. Any modification that commits 
between the start and end of a replace transaction — schema updates, property 
writes, DML appends, snapshot expiration — is lost without error or conflict 
detection.
   
   This is specific to REPLACE transactions. Regular operations 
(`updateProperties`, `updateSchema`, `expireSnapshots`, DML) are immune because 
they use OCC with retry and additive re-apply on refreshed metadata.
   
   ## Root Cause
   
   Two issues in `BaseTransaction.commitReplaceTransaction`:
   
   1. **Stale metadata committed**: The method refreshes `base` (to detect 
version conflicts) but commits the original `current` metadata snapshot built 
at transaction creation time. It never calls `applyUpdates()` to incorporate 
changes that happened since the transaction started. In contrast, 
`commitSimpleTransaction` does call `applyUpdates()`, which re-applies pending 
updates on the refreshed base.
   
   2. **Conflict detection disabled**: `UpdateRequirements.forReplaceTable` 
sets `isReplace=true`, which skips 5 categories of conflict assertions (schema, 
partition spec, sort order, snapshot refs, default branch). This means the 
server-side OCC check cannot detect that concurrent changes were overwritten.
   
   The combination means: a REPLACE transaction will always succeed (no 
`CommitFailedException`) regardless of what happened to the table concurrently, 
and it will silently overwrite all concurrent changes.
   
   ## Scenarios That Break
   
   I wrote a self-contained JUnit 5 test class (below) that demonstrates 6 
distinct failure scenarios. All tests **pass** on current `main`, proving the 
bugs exist. Each test:
   1. Creates a table with known initial state
   2. Starts a `replaceTransaction()` (captures base metadata)
   3. Performs a concurrent modification that commits successfully
   4. Commits the replace transaction
   5. Asserts the bug: the concurrent committed change was silently lost
   
   ### 1. Replace vs Snapshot Expiration — expired snapshot comes back
   
   A snapshot that was successfully expired reappears after the replace 
commits. This is table corruption: the metadata now references snapshot files 
that may have been physically deleted by the expiration process.
   
   ### 2. Replace vs Schema Update — column addition silently lost
   
   A concurrently-added column disappears after the replace commits. The schema 
reverts to what it was when the replace transaction started.
   
   ### 3. Replace vs Property Write — property silently clobbered
   
   A concurrently-set table property vanishes after the replace commits. The 
properties map reverts to the replace transaction's snapshot.
   
   ### 4. Replace vs DML Append — committed data snapshot lost
   
   A concurrent data append creates a new snapshot that disappears after the 
replace commits. This is **silent data loss** — the data was committed 
successfully but is no longer reachable.
   
   ### 5. Replace vs Replace — no conflict detection between two replaces
   
   Two concurrent replace transactions both succeed without 
`CommitFailedException`. The second one silently overwrites the first with no 
awareness of the conflict. For regular operations (e.g., two concurrent 
`updateProperties` setting different keys), OCC detects the conflict and the 
second writer retries additively, preserving both changes.
   
   ### 6. Compound — multiple concurrent changes all lost simultaneously
   
   Three concurrent modifications (expire snapshot + set property + add column) 
are all committed successfully, then all silently overwritten when the replace 
commits. Demonstrates that the bug affects all metadata dimensions 
simultaneously.
   
   ## Additional Context for V3 Tables
   
   On V3 tables with row lineage, this bug is even more severe. 
`snapshot.first-row-id` is set from `base.nextRowId()` at transaction creation 
time. If a concurrent commit advances the table's `next-row-id`, the replace's 
snapshot will have a stale `first-row-id`, causing:
   
   ```
   CommitFailedException: Cannot add a snapshot, first-row-id is behind table 
next-row-id: X < Y
   ```
   
   Since `commitReplaceTransaction` retries with the same stale `current` 
(never re-applying updates), this failure is **permanent** — the replace can 
never succeed after any concurrent snapshot change on a V3 table.
   
   ## Reproducing Test
   
   <details>
   <summary>TestReplaceTableSafety.java (self-contained, no external 
dependencies)</summary>
   
   ```java
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *   http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
   package org.apache.iceberg;
   
   import static org.apache.iceberg.types.Types.NestedField.required;
   import static org.assertj.core.api.Assertions.assertThat;
   
   import java.nio.file.Path;
   import java.util.Set;
   import java.util.stream.Collectors;
   import org.apache.iceberg.catalog.Namespace;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.inmemory.InMemoryCatalog;
   import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
   import org.apache.iceberg.types.Types;
   import org.junit.jupiter.api.BeforeEach;
   import org.junit.jupiter.api.Test;
   import org.junit.jupiter.api.io.TempDir;
   
   /**
    * Tests demonstrating safety bugs in Iceberg's REPLACE TABLE transaction.
    *
    * <p>Each test PASSES on current Iceberg code, proving the bug exists. The 
bugs are specific to
    * REPLACE and cannot occur with regular {@code updateTable} operations, 
which use OCC with retry:
    * on conflict, base is refreshed and updates are re-applied additively on 
the latest metadata.
    *
    * <p>REPLACE bypasses this: {@code commitReplaceTransaction} commits a 
stale {@code current}
    * metadata snapshot built at transaction creation time, silently 
overwriting any concurrent
    * committed changes.
    *
    * <p>Root causes:
    *
    * <ul>
    *   <li>{@code BaseTransaction.commitReplaceTransaction} refreshes {@code 
base} but never calls
    *       {@code applyUpdates()}, so the stale {@code current} metadata from 
transaction creation
    *       time is committed wholesale.
    *   <li>{@code UpdateRequirements.forReplaceTable} uses {@code 
isReplace=true}, skipping conflict
    *       assertions (schema, spec, sort order, snapshot refs, branches).
    * </ul>
    *
    * @see <a href="https://github.com/apache/iceberg/issues/13651";>#13651 - 
Snapshot Revival</a>
    * @see <a href="https://github.com/apache/iceberg/issues/12738";>#12738 - 
Stale Partition Spec</a>
    */
   public class TestReplaceTableSafety {
   
     private static final Namespace NS = Namespace.of("db");
     private static final TableIdentifier TABLE = TableIdentifier.of(NS, "t");
   
     private static final Schema SCHEMA_ID_DATA =
         new Schema(
             required(1, "id", Types.IntegerType.get()),
             required(2, "data", Types.StringType.get()));
   
     private static final Schema SCHEMA_ID_DATA_EXTRA =
         new Schema(
             required(1, "id", Types.IntegerType.get()),
             required(2, "data", Types.StringType.get()),
             required(3, "extra", Types.StringType.get()));
   
     @TempDir private Path tempDir;
     private InMemoryCatalog catalog;
   
     @BeforeEach
     public void before() {
       catalog = new InMemoryCatalog();
       catalog.initialize("test", ImmutableMap.of("warehouse", 
tempDir.toString()));
       catalog.createNamespace(NS);
     }
   
     private DataFile dataFile(PartitionSpec spec, String path) {
       return DataFiles.builder(spec)
           .withPath(path)
           .withFileSizeInBytes(10)
           .withRecordCount(1)
           .build();
     }
   
     // 
---------------------------------------------------------------------------
     // Bug 1: Replace vs snapshot expiration -- expired snapshot revived
     //
     // commitReplaceTransaction commits stale metadata that still contains the
     // expired snapshot. The expiration commit is silently overwritten.
     // 
---------------------------------------------------------------------------
   
     @Test
     public void testReplaceVsExpiration_SnapshotRevival() {
       Table table = catalog.buildTable(TABLE, SCHEMA_ID_DATA).create();
   
       table.newFastAppend().appendFile(dataFile(table.spec(), 
"/data/a.parquet")).commit();
       long snapshot1 = table.currentSnapshot().snapshotId();
   
       table.newFastAppend().appendFile(dataFile(table.spec(), 
"/data/b.parquet")).commit();
   
       // Start replace transaction (captures metadata with both snapshots)
       Transaction replace = catalog.buildTable(TABLE, 
SCHEMA_ID_DATA).replaceTransaction();
   
       // Concurrently expire snapshot 1
       table = catalog.loadTable(TABLE);
       table.expireSnapshots().expireSnapshotId(snapshot1).commit();
   
       // Verify snapshot 1 is gone after expiration
       table = catalog.loadTable(TABLE);
       assertThat(collectSnapshotIds(table))
           .as("Snapshot 1 should be gone after expiration")
           .doesNotContain(snapshot1);
   
       // Commit the replace
       replace.commitTransaction();
   
       // BUG: snapshot 1 is back -- revived after expiration
       table = catalog.loadTable(TABLE);
       assertThat(collectSnapshotIds(table))
           .as("Expired snapshot should be revived by replace (BUG: stale 
snapshot revival)")
           .contains(snapshot1);
     }
   
     // 
---------------------------------------------------------------------------
     // Bug 2: Replace vs concurrent schema update -- change silently lost
     //
     // The concurrent addColumn commit is silently overwritten because REPLACE
     // commits the stale schema from transaction creation time.
     // 
---------------------------------------------------------------------------
   
     @Test
     public void testReplaceVsSchemaUpdate_SchemaChangeLost() {
       catalog.buildTable(TABLE, SCHEMA_ID_DATA).create();
   
       // Start replace (captures metadata with original schema)
       Transaction replace = catalog.buildTable(TABLE, 
SCHEMA_ID_DATA).replaceTransaction();
   
       // Concurrently add a column
       Table table = catalog.loadTable(TABLE);
       table.updateSchema().addColumn("replacement", 
Types.StringType.get()).commit();
   
       // Commit the replace -- overwrites the schema update
       replace.commitTransaction();
   
       // BUG: the concurrent schema evolution is silently overwritten
       Table result = catalog.loadTable(TABLE);
       assertThat(result.schema().findField("replacement"))
           .as("Column added by concurrent schema update should be gone (BUG: 
silent overwrite)")
           .isNull();
     }
   
     // 
---------------------------------------------------------------------------
     // Bug 3: Replace vs concurrent property write -- property clobbered
     //
     // This bug is REPLACE-specific. With regular updateProperties, two 
concurrent
     // writers setting DIFFERENT keys both survive via OCC retry + additive 
re-apply.
     // With REPLACE, the entire stale properties map is committed wholesale.
     // 
---------------------------------------------------------------------------
   
     @Test
     public void testReplaceVsPropertyWrite_PropertyClobbered() {
       catalog.buildTable(TABLE, SCHEMA_ID_DATA).withProperty("key1", 
"original").create();
   
       // Start replace with new properties
       Transaction replace =
           catalog
               .buildTable(TABLE, SCHEMA_ID_DATA)
               .withProperty("key1", "replaced")
               .replaceTransaction();
   
       // Concurrently set a different property
       Table table = catalog.loadTable(TABLE);
       table.updateProperties().set("key2", "concurrent_value").commit();
   
       // Verify concurrent property exists before replace commit
       table = catalog.loadTable(TABLE);
       assertThat(table.properties()).containsEntry("key2", "concurrent_value");
   
       // Commit the replace
       replace.commitTransaction();
   
       // BUG: the concurrent property write is silently lost
       Table result = catalog.loadTable(TABLE);
       assertThat(result.properties())
           .as("Concurrent property 'key2' should be gone (BUG: property 
clobbered by replace)")
           .doesNotContainKey("key2");
       assertThat(result.properties()).containsEntry("key1", "replaced");
     }
   
     // 
---------------------------------------------------------------------------
     // Bug 4: Replace vs concurrent DML -- snapshot lost
     //
     // A concurrent data append creates a new snapshot, but REPLACE commits
     // stale metadata from before the append, so the DML snapshot vanishes.
     // This is silent data loss.
     // 
---------------------------------------------------------------------------
   
     @Test
     public void testReplaceVsDataCommit_SnapshotLost() {
       Table table = catalog.buildTable(TABLE, SCHEMA_ID_DATA).create();
   
       table.newFastAppend().appendFile(dataFile(table.spec(), 
"/data/a.parquet")).commit();
   
       // Start replace (captures metadata with 1 snapshot)
       Transaction replace = catalog.buildTable(TABLE, 
SCHEMA_ID_DATA).replaceTransaction();
   
       // Concurrently append data (creates snapshot 2)
       table = catalog.loadTable(TABLE);
       table.newFastAppend().appendFile(dataFile(table.spec(), 
"/data/b.parquet")).commit();
       table = catalog.loadTable(TABLE);
       long dmlSnapshotId = table.currentSnapshot().snapshotId();
   
       // Commit the replace
       replace.commitTransaction();
   
       // BUG: the DML snapshot is lost -- overwritten by stale replace metadata
       Table result = catalog.loadTable(TABLE);
       assertThat(collectSnapshotIds(result))
           .as("DML snapshot should be lost (BUG: replace overwrites concurrent 
DML)")
           .doesNotContain(dmlSnapshotId);
     }
   
     // 
---------------------------------------------------------------------------
     // Bug 5: Replace vs concurrent replace -- no conflict detection
     //
     // Two concurrent REPLACE transactions both succeed without
     // CommitFailedException. The second silently overwrites the first.
     // For regular operations, OCC detects the conflict and triggers retry.
     // 
---------------------------------------------------------------------------
   
     @Test
     public void testReplaceVsReplace_LastWriterWinsNoConflict() {
       catalog.buildTable(TABLE, SCHEMA_ID_DATA).withProperty("key1", 
"v1").create();
   
       // Start two concurrent replace transactions
       Transaction replaceA =
           catalog
               .buildTable(TABLE, SCHEMA_ID_DATA)
               .withProperty("key1", "from_A")
               .replaceTransaction();
   
       Transaction replaceB =
           catalog
               .buildTable(TABLE, SCHEMA_ID_DATA)
               .withProperty("key1", "from_B")
               .replaceTransaction();
   
       // Commit A first
       replaceA.commitTransaction();
       Table afterA = catalog.loadTable(TABLE);
       assertThat(afterA.properties()).containsEntry("key1", "from_A");
   
       // Commit B -- succeeds without CommitFailedException (no conflict 
detection)
       replaceB.commitTransaction();
   
       // BUG: B silently overwrites A with no conflict detection
       Table result = catalog.loadTable(TABLE);
       assertThat(result.properties())
           .as("Replace B should silently overwrite A (BUG: no conflict 
detection between replaces)")
           .containsEntry("key1", "from_B");
     }
   
     // 
---------------------------------------------------------------------------
     // Bug 6: Compound -- replace vs expiration + property + schema update
     //
     // Three concurrent committed changes are all silently lost when REPLACE
     // commits stale metadata. Demonstrates the bug affects all metadata
     // dimensions simultaneously.
     // 
---------------------------------------------------------------------------
   
     @Test
     public void testReplaceCompound_MultipleViolations() {
       PartitionSpec specOnData =
           
PartitionSpec.builderFor(SCHEMA_ID_DATA_EXTRA).identity("data").build();
       Table table =
           catalog
               .buildTable(TABLE, SCHEMA_ID_DATA_EXTRA)
               .withPartitionSpec(specOnData)
               .withProperty("marker", "original")
               .create();
   
       table.newFastAppend().appendFile(dataFile(table.spec(), 
"/data/a.parquet")).commit();
       long snapshot1 = table.currentSnapshot().snapshotId();
       table.newFastAppend().appendFile(dataFile(table.spec(), 
"/data/b.parquet")).commit();
   
       // Start replace with new spec and property
       PartitionSpec specOnExtra =
           
PartitionSpec.builderFor(SCHEMA_ID_DATA_EXTRA).identity("extra").build();
       Transaction replace =
           catalog
               .buildTable(TABLE, SCHEMA_ID_DATA_EXTRA)
               .withPartitionSpec(specOnExtra)
               .withProperty("marker", "from_replace")
               .replaceTransaction();
   
       // Concurrent modification 1: expire snapshot 1
       table = catalog.loadTable(TABLE);
       table.expireSnapshots().expireSnapshotId(snapshot1).commit();
   
       // Concurrent modification 2: set property
       table = catalog.loadTable(TABLE);
       table.updateProperties().set("marker", "from_concurrent").commit();
   
       // Concurrent modification 3: add a column
       table = catalog.loadTable(TABLE);
       table.updateSchema().addColumn("added_col", 
Types.StringType.get()).commit();
   
       // Commit replace -- overwrites all three concurrent changes
       replace.commitTransaction();
   
       Table result = catalog.loadTable(TABLE);
   
       // Violation 1: expired snapshot is back
       assertThat(collectSnapshotIds(result))
           .as("Expired snapshot should be revived (BUG)")
           .contains(snapshot1);
   
       // Violation 2: concurrent property change is lost
       assertThat(result.properties().get("marker"))
           .as("Concurrent property update should be lost (BUG)")
           .isEqualTo("from_replace");
   
       // Violation 3: concurrent schema evolution is lost
       assertThat(result.schema().findField("added_col"))
           .as("Concurrent column addition should be lost (BUG)")
           .isNull();
     }
   
     // 
---------------------------------------------------------------------------
     // Helpers
     // 
---------------------------------------------------------------------------
   
     private Set<Long> collectSnapshotIds(Table table) {
       return 
org.apache.iceberg.relocated.com.google.common.collect.Streams.stream(table.snapshots())
           .map(Snapshot::snapshotId)
           .collect(Collectors.toSet());
     }
   }
   
   ```
   
   </details>
   
   ### Running the test
   
   ```bash
   # Copy into an Iceberg clone and run:
   cp TestReplaceTableSafety.java 
/path/to/iceberg/core/src/test/java/org/apache/iceberg/
   cd /path/to/iceberg
   ./gradlew :iceberg-core:test --tests 
"org.apache.iceberg.TestReplaceTableSafety"
   ```
   
   All 6 tests should **PASS** (proving the bugs exist). When the fix lands, 
these tests will start failing.
   
   ## Related Issues and Discussions
   
   - **Mailing list**: [DISCUSS: Replace table transaction in REST 
Catalog](https://lists.apache.org/thread/d4hzd4cfvopvckcfw50orqksjzymd4lm)
   - **Mailing list**: [DISCUSS: Allow Commit Conflicts in REPLACE TABLE 
transactions](https://lists.apache.org/thread/f7zd1dvlf6l1gw6wmnh9g404trcrkg06)
   - #13651 — Corrupted table history due to CREATE OR REPLACE TABLE & Snapshot 
Expiration at the same time
   - #12738 — Unexpected behavior with CREATE OR REPLACE AS SELECT (stale 
partition spec)
   - #11524 — Unable to roll back to previous version after CREATE OR REPLACE 
with REST Catalog
   - https://github.com/apache/iceberg/pull/15904 — [WIP] Replace transactions 
rebase onto refreshed metadata (active fix)
   - https://github.com/apache/iceberg/pull/15905 — Repro: Concurrent replace 
V3 failure
   - https://github.com/apache/iceberg/pull/15914 — Repro: REST vs non-REST 
concurrent replace behaviour
   
   ## Why This Is REPLACE-Specific
   
   Regular `updateTable` operations cannot silently overwrite concurrent 
changes because they have three layers of defense:
   
   1. **OCC conflict detection**: `base != current()` catches any concurrent 
commit
   2. **Retry with refresh**: on conflict, base is refreshed to the latest 
metadata
   3. **Additive re-apply**: the update is re-applied on refreshed state, 
preserving concurrent changes to other fields
   
   For example, two concurrent `updateProperties` calls setting *different* 
keys both survive: the second writer detects the conflict, refreshes, and 
re-applies its key additively. REPLACE bypasses all three layers.
   
   ### Willingness to contribute
   
   - [X] I can contribute a fix for this bug independently
   - [X] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to