manisin opened a new issue, #16232:
URL: https://github.com/apache/iceberg/issues/16232
### Apache Iceberg version
main (also affects 1.5.x, 1.6.x, 1.7.x)
### Query engine
Any (tested with Java SDK directly via InMemoryCatalog)
### Please describe the bug 🐞
## Summary
`CREATE OR REPLACE TABLE` (via `replaceTransaction()`) silently overwrites
concurrent committed changes to the same table. Any modification that commits
between the start and end of a replace transaction — schema updates, property
writes, DML appends, snapshot expiration — is lost without error or conflict
detection.
This is specific to REPLACE transactions. Regular operations
(`updateProperties`, `updateSchema`, `expireSnapshots`, DML) are immune because
they use OCC with retry and additive re-apply on refreshed metadata.
## Root Cause
Two issues in `BaseTransaction.commitReplaceTransaction`:
1. **Stale metadata committed**: The method refreshes `base` (to detect
version conflicts) but commits the original `current` metadata snapshot built
at transaction creation time. It never calls `applyUpdates()` to incorporate
changes that happened since the transaction started. In contrast,
`commitSimpleTransaction` does call `applyUpdates()`, which re-applies pending
updates on the refreshed base.
2. **Conflict detection disabled**: `UpdateRequirements.forReplaceTable`
sets `isReplace=true`, which skips 5 categories of conflict assertions (schema,
partition spec, sort order, snapshot refs, default branch). This means the
server-side OCC check cannot detect that concurrent changes were overwritten.
The combination means: a REPLACE transaction will always succeed (no
`CommitFailedException`) regardless of what happened to the table concurrently,
and it will silently overwrite all concurrent changes.
## Scenarios That Break
I wrote a self-contained JUnit 5 test class (below) that demonstrates 6
distinct failure scenarios. All tests **pass** on current `main`, proving the
bugs exist. Each test:
1. Creates a table with known initial state
2. Starts a `replaceTransaction()` (captures base metadata)
3. Performs a concurrent modification that commits successfully
4. Commits the replace transaction
5. Asserts the bug: the concurrent committed change was silently lost
### 1. Replace vs Snapshot Expiration — expired snapshot comes back
A snapshot that was successfully expired reappears after the replace
commits. This is table corruption: the metadata now references snapshot files
that may have been physically deleted by the expiration process.
### 2. Replace vs Schema Update — column addition silently lost
A concurrently-added column disappears after the replace commits. The schema
reverts to what it was when the replace transaction started.
### 3. Replace vs Property Write — property silently clobbered
A concurrently-set table property vanishes after the replace commits. The
properties map reverts to the replace transaction's snapshot.
### 4. Replace vs DML Append — committed data snapshot lost
A concurrent data append creates a new snapshot that disappears after the
replace commits. This is **silent data loss** — the data was committed
successfully but is no longer reachable.
### 5. Replace vs Replace — no conflict detection between two replaces
Two concurrent replace transactions both succeed without
`CommitFailedException`. The second one silently overwrites the first with no
awareness of the conflict. For regular operations (e.g., two concurrent
`updateProperties` setting different keys), OCC detects the conflict and the
second writer retries additively, preserving both changes.
### 6. Compound — multiple concurrent changes all lost simultaneously
Three concurrent modifications (expire snapshot + set property + add column)
are all committed successfully, then all silently overwritten when the replace
commits. Demonstrates that the bug affects all metadata dimensions
simultaneously.
## Additional Context for V3 Tables
On V3 tables with row lineage, this bug is even more severe.
`snapshot.first-row-id` is set from `base.nextRowId()` at transaction creation
time. If a concurrent commit advances the table's `next-row-id`, the replace's
snapshot will have a stale `first-row-id`, causing:
```
CommitFailedException: Cannot add a snapshot, first-row-id is behind table
next-row-id: X < Y
```
Since `commitReplaceTransaction` retries with the same stale `current`
(never re-applying updates), this failure is **permanent** — the replace can
never succeed after any concurrent snapshot change on a V3 table.
## Reproducing Test
<details>
<summary>TestReplaceTableSafety.java (self-contained, no external
dependencies)</summary>
```java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.file.Path;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
/**
* Tests demonstrating safety bugs in Iceberg's REPLACE TABLE transaction.
*
* <p>Each test PASSES on current Iceberg code, proving the bug exists. The
bugs are specific to
* REPLACE and cannot occur with regular {@code updateTable} operations,
which use OCC with retry:
* on conflict, base is refreshed and updates are re-applied additively on
the latest metadata.
*
* <p>REPLACE bypasses this: {@code commitReplaceTransaction} commits a
stale {@code current}
* metadata snapshot built at transaction creation time, silently
overwriting any concurrent
* committed changes.
*
* <p>Root causes:
*
* <ul>
* <li>{@code BaseTransaction.commitReplaceTransaction} refreshes {@code
base} but never calls
* {@code applyUpdates()}, so the stale {@code current} metadata from
transaction creation
* time is committed wholesale.
* <li>{@code UpdateRequirements.forReplaceTable} uses {@code
isReplace=true}, skipping conflict
* assertions (schema, spec, sort order, snapshot refs, branches).
* </ul>
*
* @see <a href="https://github.com/apache/iceberg/issues/13651">#13651 -
Snapshot Revival</a>
* @see <a href="https://github.com/apache/iceberg/issues/12738">#12738 -
Stale Partition Spec</a>
*/
public class TestReplaceTableSafety {
private static final Namespace NS = Namespace.of("db");
private static final TableIdentifier TABLE = TableIdentifier.of(NS, "t");
private static final Schema SCHEMA_ID_DATA =
new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()));
private static final Schema SCHEMA_ID_DATA_EXTRA =
new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()),
required(3, "extra", Types.StringType.get()));
@TempDir private Path tempDir;
private InMemoryCatalog catalog;
@BeforeEach
public void before() {
catalog = new InMemoryCatalog();
catalog.initialize("test", ImmutableMap.of("warehouse",
tempDir.toString()));
catalog.createNamespace(NS);
}
private DataFile dataFile(PartitionSpec spec, String path) {
return DataFiles.builder(spec)
.withPath(path)
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
}
//
---------------------------------------------------------------------------
// Bug 1: Replace vs snapshot expiration -- expired snapshot revived
//
// commitReplaceTransaction commits stale metadata that still contains the
// expired snapshot. The expiration commit is silently overwritten.
//
---------------------------------------------------------------------------
@Test
public void testReplaceVsExpiration_SnapshotRevival() {
Table table = catalog.buildTable(TABLE, SCHEMA_ID_DATA).create();
table.newFastAppend().appendFile(dataFile(table.spec(),
"/data/a.parquet")).commit();
long snapshot1 = table.currentSnapshot().snapshotId();
table.newFastAppend().appendFile(dataFile(table.spec(),
"/data/b.parquet")).commit();
// Start replace transaction (captures metadata with both snapshots)
Transaction replace = catalog.buildTable(TABLE,
SCHEMA_ID_DATA).replaceTransaction();
// Concurrently expire snapshot 1
table = catalog.loadTable(TABLE);
table.expireSnapshots().expireSnapshotId(snapshot1).commit();
// Verify snapshot 1 is gone after expiration
table = catalog.loadTable(TABLE);
assertThat(collectSnapshotIds(table))
.as("Snapshot 1 should be gone after expiration")
.doesNotContain(snapshot1);
// Commit the replace
replace.commitTransaction();
// BUG: snapshot 1 is back -- revived after expiration
table = catalog.loadTable(TABLE);
assertThat(collectSnapshotIds(table))
.as("Expired snapshot should be revived by replace (BUG: stale
snapshot revival)")
.contains(snapshot1);
}
//
---------------------------------------------------------------------------
// Bug 2: Replace vs concurrent schema update -- change silently lost
//
// The concurrent addColumn commit is silently overwritten because REPLACE
// commits the stale schema from transaction creation time.
//
---------------------------------------------------------------------------
@Test
public void testReplaceVsSchemaUpdate_SchemaChangeLost() {
catalog.buildTable(TABLE, SCHEMA_ID_DATA).create();
// Start replace (captures metadata with original schema)
Transaction replace = catalog.buildTable(TABLE,
SCHEMA_ID_DATA).replaceTransaction();
// Concurrently add a column
Table table = catalog.loadTable(TABLE);
table.updateSchema().addColumn("replacement",
Types.StringType.get()).commit();
// Commit the replace -- overwrites the schema update
replace.commitTransaction();
// BUG: the concurrent schema evolution is silently overwritten
Table result = catalog.loadTable(TABLE);
assertThat(result.schema().findField("replacement"))
.as("Column added by concurrent schema update should be gone (BUG:
silent overwrite)")
.isNull();
}
//
---------------------------------------------------------------------------
// Bug 3: Replace vs concurrent property write -- property clobbered
//
// This bug is REPLACE-specific. With regular updateProperties, two
concurrent
// writers setting DIFFERENT keys both survive via OCC retry + additive
re-apply.
// With REPLACE, the entire stale properties map is committed wholesale.
//
---------------------------------------------------------------------------
@Test
public void testReplaceVsPropertyWrite_PropertyClobbered() {
catalog.buildTable(TABLE, SCHEMA_ID_DATA).withProperty("key1",
"original").create();
// Start replace with new properties
Transaction replace =
catalog
.buildTable(TABLE, SCHEMA_ID_DATA)
.withProperty("key1", "replaced")
.replaceTransaction();
// Concurrently set a different property
Table table = catalog.loadTable(TABLE);
table.updateProperties().set("key2", "concurrent_value").commit();
// Verify concurrent property exists before replace commit
table = catalog.loadTable(TABLE);
assertThat(table.properties()).containsEntry("key2", "concurrent_value");
// Commit the replace
replace.commitTransaction();
// BUG: the concurrent property write is silently lost
Table result = catalog.loadTable(TABLE);
assertThat(result.properties())
.as("Concurrent property 'key2' should be gone (BUG: property
clobbered by replace)")
.doesNotContainKey("key2");
assertThat(result.properties()).containsEntry("key1", "replaced");
}
//
---------------------------------------------------------------------------
// Bug 4: Replace vs concurrent DML -- snapshot lost
//
// A concurrent data append creates a new snapshot, but REPLACE commits
// stale metadata from before the append, so the DML snapshot vanishes.
// This is silent data loss.
//
---------------------------------------------------------------------------
@Test
public void testReplaceVsDataCommit_SnapshotLost() {
Table table = catalog.buildTable(TABLE, SCHEMA_ID_DATA).create();
table.newFastAppend().appendFile(dataFile(table.spec(),
"/data/a.parquet")).commit();
// Start replace (captures metadata with 1 snapshot)
Transaction replace = catalog.buildTable(TABLE,
SCHEMA_ID_DATA).replaceTransaction();
// Concurrently append data (creates snapshot 2)
table = catalog.loadTable(TABLE);
table.newFastAppend().appendFile(dataFile(table.spec(),
"/data/b.parquet")).commit();
table = catalog.loadTable(TABLE);
long dmlSnapshotId = table.currentSnapshot().snapshotId();
// Commit the replace
replace.commitTransaction();
// BUG: the DML snapshot is lost -- overwritten by stale replace metadata
Table result = catalog.loadTable(TABLE);
assertThat(collectSnapshotIds(result))
.as("DML snapshot should be lost (BUG: replace overwrites concurrent
DML)")
.doesNotContain(dmlSnapshotId);
}
//
---------------------------------------------------------------------------
// Bug 5: Replace vs concurrent replace -- no conflict detection
//
// Two concurrent REPLACE transactions both succeed without
// CommitFailedException. The second silently overwrites the first.
// For regular operations, OCC detects the conflict and triggers retry.
//
---------------------------------------------------------------------------
@Test
public void testReplaceVsReplace_LastWriterWinsNoConflict() {
catalog.buildTable(TABLE, SCHEMA_ID_DATA).withProperty("key1",
"v1").create();
// Start two concurrent replace transactions
Transaction replaceA =
catalog
.buildTable(TABLE, SCHEMA_ID_DATA)
.withProperty("key1", "from_A")
.replaceTransaction();
Transaction replaceB =
catalog
.buildTable(TABLE, SCHEMA_ID_DATA)
.withProperty("key1", "from_B")
.replaceTransaction();
// Commit A first
replaceA.commitTransaction();
Table afterA = catalog.loadTable(TABLE);
assertThat(afterA.properties()).containsEntry("key1", "from_A");
// Commit B -- succeeds without CommitFailedException (no conflict
detection)
replaceB.commitTransaction();
// BUG: B silently overwrites A with no conflict detection
Table result = catalog.loadTable(TABLE);
assertThat(result.properties())
.as("Replace B should silently overwrite A (BUG: no conflict
detection between replaces)")
.containsEntry("key1", "from_B");
}
//
---------------------------------------------------------------------------
// Bug 6: Compound -- replace vs expiration + property + schema update
//
// Three concurrent committed changes are all silently lost when REPLACE
// commits stale metadata. Demonstrates the bug affects all metadata
// dimensions simultaneously.
//
---------------------------------------------------------------------------
@Test
public void testReplaceCompound_MultipleViolations() {
PartitionSpec specOnData =
PartitionSpec.builderFor(SCHEMA_ID_DATA_EXTRA).identity("data").build();
Table table =
catalog
.buildTable(TABLE, SCHEMA_ID_DATA_EXTRA)
.withPartitionSpec(specOnData)
.withProperty("marker", "original")
.create();
table.newFastAppend().appendFile(dataFile(table.spec(),
"/data/a.parquet")).commit();
long snapshot1 = table.currentSnapshot().snapshotId();
table.newFastAppend().appendFile(dataFile(table.spec(),
"/data/b.parquet")).commit();
// Start replace with new spec and property
PartitionSpec specOnExtra =
PartitionSpec.builderFor(SCHEMA_ID_DATA_EXTRA).identity("extra").build();
Transaction replace =
catalog
.buildTable(TABLE, SCHEMA_ID_DATA_EXTRA)
.withPartitionSpec(specOnExtra)
.withProperty("marker", "from_replace")
.replaceTransaction();
// Concurrent modification 1: expire snapshot 1
table = catalog.loadTable(TABLE);
table.expireSnapshots().expireSnapshotId(snapshot1).commit();
// Concurrent modification 2: set property
table = catalog.loadTable(TABLE);
table.updateProperties().set("marker", "from_concurrent").commit();
// Concurrent modification 3: add a column
table = catalog.loadTable(TABLE);
table.updateSchema().addColumn("added_col",
Types.StringType.get()).commit();
// Commit replace -- overwrites all three concurrent changes
replace.commitTransaction();
Table result = catalog.loadTable(TABLE);
// Violation 1: expired snapshot is back
assertThat(collectSnapshotIds(result))
.as("Expired snapshot should be revived (BUG)")
.contains(snapshot1);
// Violation 2: concurrent property change is lost
assertThat(result.properties().get("marker"))
.as("Concurrent property update should be lost (BUG)")
.isEqualTo("from_replace");
// Violation 3: concurrent schema evolution is lost
assertThat(result.schema().findField("added_col"))
.as("Concurrent column addition should be lost (BUG)")
.isNull();
}
//
---------------------------------------------------------------------------
// Helpers
//
---------------------------------------------------------------------------
private Set<Long> collectSnapshotIds(Table table) {
return
org.apache.iceberg.relocated.com.google.common.collect.Streams.stream(table.snapshots())
.map(Snapshot::snapshotId)
.collect(Collectors.toSet());
}
}
```
</details>
### Running the test
```bash
# Copy into an Iceberg clone and run:
cp TestReplaceTableSafety.java
/path/to/iceberg/core/src/test/java/org/apache/iceberg/
cd /path/to/iceberg
./gradlew :iceberg-core:test --tests
"org.apache.iceberg.TestReplaceTableSafety"
```
All 6 tests should **PASS** (proving the bugs exist). When the fix lands,
these tests will start failing.
## Related Issues and Discussions
- **Mailing list**: [DISCUSS: Replace table transaction in REST
Catalog](https://lists.apache.org/thread/d4hzd4cfvopvckcfw50orqksjzymd4lm)
- **Mailing list**: [DISCUSS: Allow Commit Conflicts in REPLACE TABLE
transactions](https://lists.apache.org/thread/f7zd1dvlf6l1gw6wmnh9g404trcrkg06)
- #13651 — Corrupted table history due to CREATE OR REPLACE TABLE & Snapshot
Expiration at the same time
- #12738 — Unexpected behavior with CREATE OR REPLACE AS SELECT (stale
partition spec)
- #11524 — Unable to roll back to previous version after CREATE OR REPLACE
with REST Catalog
- https://github.com/apache/iceberg/pull/15904 — [WIP] Replace transactions
rebase onto refreshed metadata (active fix)
- https://github.com/apache/iceberg/pull/15905 — Repro: Concurrent replace
V3 failure
- https://github.com/apache/iceberg/pull/15914 — Repro: REST vs non-REST
concurrent replace behaviour
## Why This Is REPLACE-Specific
Regular `updateTable` operations cannot silently overwrite concurrent
changes because they have three layers of defense:
1. **OCC conflict detection**: `base != current()` catches any concurrent
commit
2. **Retry with refresh**: on conflict, base is refreshed to the latest
metadata
3. **Additive re-apply**: the update is re-applied on refreshed state,
preserving concurrent changes to other fields
For example, two concurrent `updateProperties` calls setting *different*
keys both survive: the second writer detects the conflict, refreshes, and
re-applies its key additively. REPLACE bypasses all three layers.
### Willingness to contribute
- [X] I can contribute a fix for this bug independently
- [X] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]