smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3269526794


##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,314 @@ def test_load_table_from_self_identifier(
     assert table.metadata == loaded_table.metadata
 
 
+_SIMPLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+)
+
+
+def _create_simple_table(
+    catalog: Catalog,
+    identifier: Identifier,
+    *,
+    schema: Schema = _SIMPLE_SCHEMA,
+    format_version: int = 2,
+    partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+    properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+    namespace = Catalog.namespace_from(identifier)
+    catalog.create_namespace_if_not_exists(namespace)
+    merged_properties = {"format-version": str(format_version), **(properties 
or {})}
+    catalog.create_table(identifier, schema=schema, 
partition_spec=partition_spec, properties=merged_properties)
+    return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+    return pa.Table.from_pydict(
+        {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in 
range(num_rows)]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+    )
+
+
+_REPLACE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier) -> None:
+    _, original_schema = _create_simple_table(catalog, test_table_identifier)
+    original = catalog.load_table(test_table_identifier)
+    original.append(_simple_data())
+    original = catalog.load_table(test_table_identifier)
+    old_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+    snapshot_log_before = list(original.metadata.snapshot_log)
+    assert len(snapshot_log_before) == 1
+
+    catalog.replace_table_transaction(test_table_identifier, 
schema=_REPLACE_SCHEMA).commit_transaction()
+    replaced = catalog.load_table(test_table_identifier)
+
+    # UUID + history preserved, current snapshot cleared, current schema 
swapped.
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.metadata.current_snapshot_id is None
+    assert {f.name for f in replaced.schema().fields} == {"id", "data", 
"extra"}
+    # Old snapshot kept by identity (not just count), and snapshot_log entries 
from before survive.
+    assert any(s.snapshot_id == old_snapshot_id for s in 
replaced.metadata.snapshots)
+    assert all(entry in replaced.metadata.snapshot_log for entry in 
snapshot_log_before)
+    # Old schema is still in the schemas list alongside the new one.
+    schema_ids = sorted(s.schema_id for s in replaced.metadata.schemas)
+    assert schema_ids == [0, 1]
+    assert replaced.metadata.current_schema_id == 1
+    # Time-travel back to the pre-replace snapshot returns the rows that were 
there before.
+    assert 
replaced.scan(snapshot_id=old_snapshot_id).to_arrow().equals(_simple_data())
+
+
+def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier, tmp_path: Path) -> None:
+    _create_simple_table(catalog, test_table_identifier, properties={"keep": 
"yes", "override": "old"})
+    catalog.load_table(test_table_identifier).append(_simple_data())
+    original = catalog.load_table(test_table_identifier)
+    old_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+    original_data = original.scan().to_arrow()
+
+    new_location = f"file://{tmp_path}/replaced"
+    new_schema = Schema(
+        NestedField(field_id=1, name="id", field_type=LongType(), 
required=False),
+        NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+        NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+    )
+    new_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
name="id_part", transform=IdentityTransform()))
+    new_sort = SortOrder(SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC))
+    new_data = pa.Table.from_pydict(
+        {"id": [10, 20], "data": ["alice", "bob"], "extra": [True, False]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string()), pa.field("extra", pa.bool_())]),
+    )
+
+    with catalog.replace_table_transaction(
+        test_table_identifier,
+        schema=new_schema,
+        partition_spec=new_spec,
+        sort_order=new_sort,
+        location=new_location,
+        properties={"override": "new", "added": "v"},
+    ) as txn:
+        txn.append(new_data)
+
+    replaced = catalog.load_table(test_table_identifier)
+
+    # Identity invariants.
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.metadata.location == new_location
+
+    # New schema / spec / sort applied; old entries retained in history.
+    assert {f.name for f in replaced.schema().fields} == {"id", "data", 
"extra"}
+    assert sorted(s.schema_id for s in replaced.metadata.schemas) == [0, 1]
+    assert replaced.spec().fields[0].source_id == 1
+    assert isinstance(replaced.spec().fields[0].transform, IdentityTransform)
+    assert {s.spec_id for s in replaced.metadata.partition_specs} == {0, 1}
+    assert replaced.sort_order().fields == new_sort.fields
+    assert {s.order_id for s in replaced.metadata.sort_orders} == {0, 
replaced.metadata.default_sort_order_id}
+
+    # Property merge: kept, overridden, added — and `format-version` does not 
leak.
+    assert replaced.properties["keep"] == "yes"
+    assert replaced.properties["override"] == "new"
+    assert replaced.properties["added"] == "v"
+    assert "format-version" not in replaced.properties
+
+    # RTAS atomicity: new snapshot exists, has no parent (fresh start), old 
snapshot is still
+    # in the snapshot list, and time-travel reads return the original rows.
+    new_snapshot = replaced.current_snapshot()
+    assert new_snapshot is not None
+    assert new_snapshot.snapshot_id != old_snapshot_id
+    assert new_snapshot.parent_snapshot_id is None
+    assert any(s.snapshot_id == old_snapshot_id for s in 
replaced.metadata.snapshots)
+    assert replaced.scan().to_arrow().num_rows == 2
+    # Time-travel back to before the replace returns the original rows from 
the old schema.
+    time_travel = replaced.scan(snapshot_id=old_snapshot_id).to_arrow()
+    assert time_travel.num_rows == original_data.num_rows
+    assert time_travel.column("id").to_pylist() == 
original_data.column("id").to_pylist()
+
+
+def test_replace_transaction_requires_table_exists(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    schema = Schema(NestedField(field_id=1, name="id", field_type=LongType(), 
required=False))
+    with pytest.raises(NoSuchTableError):
+        catalog.replace_table_transaction(test_table_identifier, schema=schema)
+
+
+def test_replace_table_reuses_schema_id_when_identical(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _, base_schema = _create_simple_table(catalog, test_table_identifier)
+    catalog.replace_table_transaction(test_table_identifier, 
schema=base_schema).commit_transaction()
+    replaced = catalog.load_table(test_table_identifier)
+    # Identical shape -> no new schema appended, current points back at id 0.
+    assert [s.schema_id for s in replaced.metadata.schemas] == [0]
+    assert replaced.metadata.current_schema_id == 0
+    assert replaced.metadata.last_column_id == 2
+
+
+def test_replace_table_reuses_partition_spec_and_sort_order_when_identical(
+    catalog: Catalog, test_table_identifier: Identifier
+) -> None:
+    spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
name="id_part", transform=IdentityTransform()))
+    sort = SortOrder(SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC))
+    _, schema = _create_simple_table(catalog, test_table_identifier, 
partition_spec=spec)
+    # Introduce a sort order then replay both spec and sort — neither should 
append a new entry.
+    catalog.replace_table_transaction(
+        test_table_identifier, schema=schema, partition_spec=spec, 
sort_order=sort
+    ).commit_transaction()
+    sorted_first = catalog.load_table(test_table_identifier)
+    sorted_order_id = sorted_first.metadata.default_sort_order_id
+    assert sorted_order_id != 0
+
+    catalog.replace_table_transaction(
+        test_table_identifier, schema=schema, partition_spec=spec, 
sort_order=sort
+    ).commit_transaction()
+    replayed = catalog.load_table(test_table_identifier)
+    assert [s.spec_id for s in replayed.metadata.partition_specs] == [0]
+    assert replayed.metadata.default_spec_id == 0
+    assert replayed.metadata.default_sort_order_id == sorted_order_id
+
+    # Dropping the sort order falls back to the unsorted order_id 0 (also 
reused, not appended).
+    catalog.replace_table_transaction(test_table_identifier, schema=schema, 
partition_spec=spec).commit_transaction()
+    unsorted = catalog.load_table(test_table_identifier)
+    assert unsorted.sort_order().is_unsorted
+    assert unsorted.metadata.default_sort_order_id == 0
+
+
[email protected]("keep_identifier", [True, False], ids=["preserves", 
"drops"])
+def test_replace_table_identifier_field_ids(catalog: Catalog, 
test_table_identifier: Identifier, keep_identifier: bool) -> None:
+    schema = Schema(
+        NestedField(field_id=1, name="id", field_type=LongType(), 
required=True),
+        NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+        identifier_field_ids=[1],
+    )
+    _create_simple_table(catalog, test_table_identifier, schema=schema)
+    new_schema = (
+        Schema(
+            NestedField(field_id=1, name="id", field_type=LongType(), 
required=True),
+            NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+            NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+            identifier_field_ids=[1],
+        )
+        if keep_identifier
+        else Schema(
+            NestedField(field_id=1, name="id", field_type=LongType(), 
required=False),
+            NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+        )
+    )
+    catalog.replace_table_transaction(test_table_identifier, 
schema=new_schema).commit_transaction()
+    replaced = catalog.load_table(test_table_identifier)
+    expected = [1] if keep_identifier else []
+    assert list(replaced.schema().identifier_field_ids) == expected
+
+
[email protected](
+    "format_version, expect_void_carry_forward",
+    [(1, True), (2, False)],
+    ids=["v1-carries-forward", "v2-drops"],
+)
+def test_replace_table_partition_field_carry_forward(
+    catalog: Catalog,
+    test_table_identifier: Identifier,
+    format_version: int,
+    expect_void_carry_forward: bool,
+) -> None:
+    spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
name="id_part", transform=IdentityTransform()))
+    _, schema = _create_simple_table(catalog, test_table_identifier, 
partition_spec=spec, format_version=format_version)
+    catalog.replace_table_transaction(test_table_identifier, 
schema=schema).commit_transaction()
+    replaced = catalog.load_table(test_table_identifier)
+    new_spec = replaced.spec()
+    if expect_void_carry_forward:
+        void_field = next(f for f in new_spec.fields if f.field_id == 1000)
+        assert isinstance(void_field.transform, VoidTransform)
+        assert void_field.source_id == 1
+        assert void_field.name == "id_part"
+    else:
+        assert new_spec.is_unpartitioned()
+
+
+def test_replace_table_upgrades_format_version(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _, schema = _create_simple_table(catalog, test_table_identifier, 
format_version=1)
+    assert catalog.load_table(test_table_identifier).format_version == 1
+
+    catalog.replace_table_transaction(
+        test_table_identifier, schema=schema, properties={"format-version": 
"2"}
+    ).commit_transaction()
+    upgraded = catalog.load_table(test_table_identifier)
+    assert upgraded.format_version == 2
+    # `format-version` is a control input, not a persisted property.
+    assert "format-version" not in upgraded.properties
+
+    # A follow-up replace stays at the upgraded version.
+    new_schema = Schema(*schema.fields, NestedField(field_id=3, name="extra", 
field_type=BooleanType(), required=False))
+    catalog.replace_table_transaction(test_table_identifier, 
schema=new_schema).commit_transaction()
+    replayed = catalog.load_table(test_table_identifier)
+    assert replayed.format_version == 2
+    assert {f.name for f in replayed.schema().fields} == {"id", "data", 
"extra"}
+
+
+def test_replace_table_rejects_format_version_downgrade(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _, schema = _create_simple_table(catalog, test_table_identifier, 
format_version=2)
+    with pytest.raises(ValueError, match="Cannot downgrade format-version"):
+        catalog.replace_table_transaction(test_table_identifier, 
schema=schema, properties={"format-version": "1"})
+
+
[email protected]("location_kind", ["inherit", "explicit", 
"trailing-slash"])
+def test_replace_table_location(catalog: Catalog, test_table_identifier: 
Identifier, tmp_path: Path, location_kind: str) -> None:
+    _, schema = _create_simple_table(catalog, test_table_identifier)
+    existing = catalog.load_table(test_table_identifier).metadata.location
+
+    if location_kind == "inherit":
+        catalog.replace_table_transaction(test_table_identifier, 
schema=schema).commit_transaction()
+        replaced = catalog.load_table(test_table_identifier)
+        assert replaced.metadata.location == existing
+    else:
+        bare = f"file://{tmp_path}/relocated"
+        arg = bare + "/" if location_kind == "trailing-slash" else bare
+        catalog.replace_table_transaction(test_table_identifier, 
schema=schema, location=arg).commit_transaction()
+        replaced = catalog.load_table(test_table_identifier)
+        assert replaced.metadata.location == bare
+
+
+def test_replace_table_transaction_rolls_back_on_failure(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _create_simple_table(catalog, test_table_identifier)
+    catalog.load_table(test_table_identifier).append(_simple_data())
+    before = catalog.load_table(test_table_identifier).metadata
+
+    def run_failing_replace() -> None:
+        with catalog.replace_table_transaction(test_table_identifier, 
schema=_REPLACE_SCHEMA):
+            raise RuntimeError("simulated failure inside replace transaction")
+
+    with pytest.raises(RuntimeError, match="simulated failure inside replace 
transaction"):
+        run_failing_replace()
+
+    after = catalog.load_table(test_table_identifier).metadata
+    assert after.table_uuid == before.table_uuid
+    assert after.current_snapshot_id == before.current_snapshot_id
+    assert after.current_schema_id == before.current_schema_id
+    assert len(after.schemas) == len(before.schemas)
+
+
+def test_concurrent_replace_transaction_schema_conflict(catalog: Catalog, 
test_table_identifier: Identifier) -> None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`testConcurrentReplaceTransactionSchemaConflict`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2866-L2903).
 The non-conflict variants (`testConcurrentReplaceTransactions`, 
`testConcurrentReplaceTransactionSchema`) are deliberately not ported — 
PyIceberg emits `AssertLastAssignedFieldId` unconditionally (see the divergence 
note on `commit_transaction`), so those happy-path concurrent flows would 
fail-fast here rather than succeed.



##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,314 @@ def test_load_table_from_self_identifier(
     assert table.metadata == loaded_table.metadata
 
 
+_SIMPLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+)
+
+
+def _create_simple_table(
+    catalog: Catalog,
+    identifier: Identifier,
+    *,
+    schema: Schema = _SIMPLE_SCHEMA,
+    format_version: int = 2,
+    partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+    properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+    namespace = Catalog.namespace_from(identifier)
+    catalog.create_namespace_if_not_exists(namespace)
+    merged_properties = {"format-version": str(format_version), **(properties 
or {})}
+    catalog.create_table(identifier, schema=schema, 
partition_spec=partition_spec, properties=merged_properties)
+    return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+    return pa.Table.from_pydict(
+        {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in 
range(num_rows)]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+    )
+
+
+_REPLACE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier) -> None:
+    _, original_schema = _create_simple_table(catalog, test_table_identifier)
+    original = catalog.load_table(test_table_identifier)
+    original.append(_simple_data())
+    original = catalog.load_table(test_table_identifier)
+    old_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+    snapshot_log_before = list(original.metadata.snapshot_log)
+    assert len(snapshot_log_before) == 1
+
+    catalog.replace_table_transaction(test_table_identifier, 
schema=_REPLACE_SCHEMA).commit_transaction()
+    replaced = catalog.load_table(test_table_identifier)
+
+    # UUID + history preserved, current snapshot cleared, current schema 
swapped.
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.metadata.current_snapshot_id is None
+    assert {f.name for f in replaced.schema().fields} == {"id", "data", 
"extra"}
+    # Old snapshot kept by identity (not just count), and snapshot_log entries 
from before survive.
+    assert any(s.snapshot_id == old_snapshot_id for s in 
replaced.metadata.snapshots)
+    assert all(entry in replaced.metadata.snapshot_log for entry in 
snapshot_log_before)
+    # Old schema is still in the schemas list alongside the new one.
+    schema_ids = sorted(s.schema_id for s in replaced.metadata.schemas)
+    assert schema_ids == [0, 1]
+    assert replaced.metadata.current_schema_id == 1
+    # Time-travel back to the pre-replace snapshot returns the rows that were 
there before.
+    assert 
replaced.scan(snapshot_id=old_snapshot_id).to_arrow().equals(_simple_data())
+
+
+def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier, tmp_path: Path) -> None:
+    _create_simple_table(catalog, test_table_identifier, properties={"keep": 
"yes", "override": "old"})
+    catalog.load_table(test_table_identifier).append(_simple_data())
+    original = catalog.load_table(test_table_identifier)
+    old_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+    original_data = original.scan().to_arrow()
+
+    new_location = f"file://{tmp_path}/replaced"
+    new_schema = Schema(
+        NestedField(field_id=1, name="id", field_type=LongType(), 
required=False),
+        NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+        NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+    )
+    new_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
name="id_part", transform=IdentityTransform()))
+    new_sort = SortOrder(SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC))
+    new_data = pa.Table.from_pydict(
+        {"id": [10, 20], "data": ["alice", "bob"], "extra": [True, False]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string()), pa.field("extra", pa.bool_())]),
+    )
+
+    with catalog.replace_table_transaction(
+        test_table_identifier,
+        schema=new_schema,
+        partition_spec=new_spec,
+        sort_order=new_sort,
+        location=new_location,
+        properties={"override": "new", "added": "v"},
+    ) as txn:
+        txn.append(new_data)
+
+    replaced = catalog.load_table(test_table_identifier)
+
+    # Identity invariants.
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.metadata.location == new_location
+
+    # New schema / spec / sort applied; old entries retained in history.
+    assert {f.name for f in replaced.schema().fields} == {"id", "data", 
"extra"}
+    assert sorted(s.schema_id for s in replaced.metadata.schemas) == [0, 1]
+    assert replaced.spec().fields[0].source_id == 1
+    assert isinstance(replaced.spec().fields[0].transform, IdentityTransform)
+    assert {s.spec_id for s in replaced.metadata.partition_specs} == {0, 1}
+    assert replaced.sort_order().fields == new_sort.fields
+    assert {s.order_id for s in replaced.metadata.sort_orders} == {0, 
replaced.metadata.default_sort_order_id}
+
+    # Property merge: kept, overridden, added — and `format-version` does not 
leak.
+    assert replaced.properties["keep"] == "yes"
+    assert replaced.properties["override"] == "new"
+    assert replaced.properties["added"] == "v"
+    assert "format-version" not in replaced.properties
+
+    # RTAS atomicity: new snapshot exists, has no parent (fresh start), old 
snapshot is still
+    # in the snapshot list, and time-travel reads return the original rows.
+    new_snapshot = replaced.current_snapshot()
+    assert new_snapshot is not None
+    assert new_snapshot.snapshot_id != old_snapshot_id
+    assert new_snapshot.parent_snapshot_id is None
+    assert any(s.snapshot_id == old_snapshot_id for s in 
replaced.metadata.snapshots)
+    assert replaced.scan().to_arrow().num_rows == 2
+    # Time-travel back to before the replace returns the original rows from 
the old schema.
+    time_travel = replaced.scan(snapshot_id=old_snapshot_id).to_arrow()
+    assert time_travel.num_rows == original_data.num_rows
+    assert time_travel.column("id").to_pylist() == 
original_data.column("id").to_pylist()
+
+
+def test_replace_transaction_requires_table_exists(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    schema = Schema(NestedField(field_id=1, name="id", field_type=LongType(), 
required=False))
+    with pytest.raises(NoSuchTableError):
+        catalog.replace_table_transaction(test_table_identifier, schema=schema)
+
+
+def test_replace_table_reuses_schema_id_when_identical(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _, base_schema = _create_simple_table(catalog, test_table_identifier)
+    catalog.replace_table_transaction(test_table_identifier, 
schema=base_schema).commit_transaction()
+    replaced = catalog.load_table(test_table_identifier)
+    # Identical shape -> no new schema appended, current points back at id 0.
+    assert [s.schema_id for s in replaced.metadata.schemas] == [0]
+    assert replaced.metadata.current_schema_id == 0
+    assert replaced.metadata.last_column_id == 2
+
+
+def test_replace_table_reuses_partition_spec_and_sort_order_when_identical(
+    catalog: Catalog, test_table_identifier: Identifier
+) -> None:
+    spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
name="id_part", transform=IdentityTransform()))
+    sort = SortOrder(SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC))
+    _, schema = _create_simple_table(catalog, test_table_identifier, 
partition_spec=spec)
+    # Introduce a sort order then replay both spec and sort — neither should 
append a new entry.
+    catalog.replace_table_transaction(
+        test_table_identifier, schema=schema, partition_spec=spec, 
sort_order=sort
+    ).commit_transaction()
+    sorted_first = catalog.load_table(test_table_identifier)
+    sorted_order_id = sorted_first.metadata.default_sort_order_id
+    assert sorted_order_id != 0
+
+    catalog.replace_table_transaction(
+        test_table_identifier, schema=schema, partition_spec=spec, 
sort_order=sort
+    ).commit_transaction()
+    replayed = catalog.load_table(test_table_identifier)
+    assert [s.spec_id for s in replayed.metadata.partition_specs] == [0]
+    assert replayed.metadata.default_spec_id == 0
+    assert replayed.metadata.default_sort_order_id == sorted_order_id
+
+    # Dropping the sort order falls back to the unsorted order_id 0 (also 
reused, not appended).
+    catalog.replace_table_transaction(test_table_identifier, schema=schema, 
partition_spec=spec).commit_transaction()
+    unsorted = catalog.load_table(test_table_identifier)
+    assert unsorted.sort_order().is_unsorted
+    assert unsorted.metadata.default_sort_order_id == 0
+
+
[email protected]("keep_identifier", [True, False], ids=["preserves", 
"drops"])
+def test_replace_table_identifier_field_ids(catalog: Catalog, 
test_table_identifier: Identifier, keep_identifier: bool) -> None:
+    schema = Schema(
+        NestedField(field_id=1, name="id", field_type=LongType(), 
required=True),
+        NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+        identifier_field_ids=[1],
+    )
+    _create_simple_table(catalog, test_table_identifier, schema=schema)
+    new_schema = (
+        Schema(
+            NestedField(field_id=1, name="id", field_type=LongType(), 
required=True),
+            NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+            NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+            identifier_field_ids=[1],
+        )
+        if keep_identifier
+        else Schema(
+            NestedField(field_id=1, name="id", field_type=LongType(), 
required=False),
+            NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+        )
+    )
+    catalog.replace_table_transaction(test_table_identifier, 
schema=new_schema).commit_transaction()
+    replaced = catalog.load_table(test_table_identifier)
+    expected = [1] if keep_identifier else []
+    assert list(replaced.schema().identifier_field_ids) == expected
+
+
[email protected](
+    "format_version, expect_void_carry_forward",
+    [(1, True), (2, False)],
+    ids=["v1-carries-forward", "v2-drops"],
+)
+def test_replace_table_partition_field_carry_forward(
+    catalog: Catalog,
+    test_table_identifier: Identifier,
+    format_version: int,
+    expect_void_carry_forward: bool,
+) -> None:
+    spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
name="id_part", transform=IdentityTransform()))
+    _, schema = _create_simple_table(catalog, test_table_identifier, 
partition_spec=spec, format_version=format_version)
+    catalog.replace_table_transaction(test_table_identifier, 
schema=schema).commit_transaction()
+    replaced = catalog.load_table(test_table_identifier)
+    new_spec = replaced.spec()
+    if expect_void_carry_forward:
+        void_field = next(f for f in new_spec.fields if f.field_id == 1000)
+        assert isinstance(void_field.transform, VoidTransform)
+        assert void_field.source_id == 1
+        assert void_field.name == "id_part"
+    else:
+        assert new_spec.is_unpartitioned()
+
+
+def test_replace_table_upgrades_format_version(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _, schema = _create_simple_table(catalog, test_table_identifier, 
format_version=1)
+    assert catalog.load_table(test_table_identifier).format_version == 1
+
+    catalog.replace_table_transaction(
+        test_table_identifier, schema=schema, properties={"format-version": 
"2"}
+    ).commit_transaction()
+    upgraded = catalog.load_table(test_table_identifier)
+    assert upgraded.format_version == 2
+    # `format-version` is a control input, not a persisted property.
+    assert "format-version" not in upgraded.properties
+
+    # A follow-up replace stays at the upgraded version.
+    new_schema = Schema(*schema.fields, NestedField(field_id=3, name="extra", 
field_type=BooleanType(), required=False))
+    catalog.replace_table_transaction(test_table_identifier, 
schema=new_schema).commit_transaction()
+    replayed = catalog.load_table(test_table_identifier)
+    assert replayed.format_version == 2
+    assert {f.name for f in replayed.schema().fields} == {"id", "data", 
"extra"}
+
+
+def test_replace_table_rejects_format_version_downgrade(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _, schema = _create_simple_table(catalog, test_table_identifier, 
format_version=2)
+    with pytest.raises(ValueError, match="Cannot downgrade format-version"):
+        catalog.replace_table_transaction(test_table_identifier, 
schema=schema, properties={"format-version": "1"})
+
+
[email protected]("location_kind", ["inherit", "explicit", 
"trailing-slash"])
+def test_replace_table_location(catalog: Catalog, test_table_identifier: 
Identifier, tmp_path: Path, location_kind: str) -> None:
+    _, schema = _create_simple_table(catalog, test_table_identifier)
+    existing = catalog.load_table(test_table_identifier).metadata.location
+
+    if location_kind == "inherit":
+        catalog.replace_table_transaction(test_table_identifier, 
schema=schema).commit_transaction()
+        replaced = catalog.load_table(test_table_identifier)
+        assert replaced.metadata.location == existing
+    else:
+        bare = f"file://{tmp_path}/relocated"
+        arg = bare + "/" if location_kind == "trailing-slash" else bare
+        catalog.replace_table_transaction(test_table_identifier, 
schema=schema, location=arg).commit_transaction()
+        replaced = catalog.load_table(test_table_identifier)
+        assert replaced.metadata.location == bare
+
+
+def test_replace_table_transaction_rolls_back_on_failure(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _create_simple_table(catalog, test_table_identifier)
+    catalog.load_table(test_table_identifier).append(_simple_data())
+    before = catalog.load_table(test_table_identifier).metadata
+
+    def run_failing_replace() -> None:
+        with catalog.replace_table_transaction(test_table_identifier, 
schema=_REPLACE_SCHEMA):
+            raise RuntimeError("simulated failure inside replace transaction")
+
+    with pytest.raises(RuntimeError, match="simulated failure inside replace 
transaction"):
+        run_failing_replace()
+
+    after = catalog.load_table(test_table_identifier).metadata
+    assert after.table_uuid == before.table_uuid
+    assert after.current_snapshot_id == before.current_snapshot_id
+    assert after.current_schema_id == before.current_schema_id
+    assert len(after.schemas) == len(before.schemas)
+
+
+def test_concurrent_replace_transaction_schema_conflict(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    _create_simple_table(catalog, test_table_identifier)
+    txn_a = catalog.replace_table_transaction(test_table_identifier, 
schema=_REPLACE_SCHEMA)
+    txn_b = catalog.replace_table_transaction(test_table_identifier, 
schema=_REPLACE_SCHEMA)
+
+    txn_a.commit_transaction()
+    with pytest.raises(CommitFailedException, match="last assigned field id"):
+        txn_b.commit_transaction()
+
+
+def test_concurrent_replace_transaction_partition_spec_conflict(catalog: 
Catalog, test_table_identifier: Identifier) -> None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`testConcurrentReplaceTransactionPartitionSpecConflict`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2986-L3023).
 Same deliberate-not-ported reasoning for the non-conflict spec variants as for 
the schema case above.



##########
tests/integration/test_catalog.py:
##########
@@ -866,3 +875,36 @@ def test_load_missing_table(test_catalog: Catalog, 
database_name: str, table_nam
 
     with pytest.raises(NoSuchTableError):
         test_catalog.load_table(identifier)
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_replace_table_transaction(test_catalog: Catalog, database_name: str, 
table_name: str) -> None:

Review Comment:
   [AI Reviewer Aid] This test exercises only the RTAS path. The DDL-only 
invariant — that after a replace with no follow-up write, `current_snapshot_id` 
is `None` because the `main` branch ref was removed — is covered against 
InMemoryCatalog + SqlCatalog by 
[`test_replace_transaction`](https://github.com/apache/iceberg-python/blob/a810a15025c6aad691e2cb5903cdfe910da5ded5/tests/catalog/test_catalog_behaviors.py#L426)
 in the catalog-behavior suite.



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,150 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)
+        if existing_schema_id is not None:
+            self._updates += 
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)

Review Comment:
   [AI Reviewer Aid] Per Java's 
[`reuseOrCreateNewSchemaId`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1642-L1653)
 — walks all historical schemas, reuses the id if structurally identical, 
otherwise `max(id) + 1`. `SetCurrentSchemaUpdate` is emitted even on the reuse 
branch (this line), mirroring Java's 
[`RESTSessionCatalog.replaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L1060-L1064)
 which always ensures a `SetCurrentSchema` change is in the request.



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,150 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)
+        if existing_schema_id is not None:
+            self._updates += 
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+        else:
+            next_schema_id = max((s.schema_id for s in 
table_metadata.schemas), default=-1) + 1
+            schema_with_fresh_id = new_schema.model_copy(update={"schema_id": 
next_schema_id})
+            self._updates += (
+                AddSchemaUpdate(schema_=schema_with_fresh_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+            )
+
+        # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on 
add to avoid
+        # collisions with existing specs (AddPartitionSpecUpdate refuses 
duplicate IDs).
+        effective_spec = UNPARTITIONED_PARTITION_SPEC if 
new_spec.is_unpartitioned() else new_spec
+        existing_spec_id = self._find_matching_spec_id(table_metadata, 
effective_spec)
+        if existing_spec_id is not None:
+            self._updates += (SetDefaultSpecUpdate(spec_id=existing_spec_id),)

Review Comment:
   [AI Reviewer Aid] Per Java's 
[`reuseOrCreateNewSpecId`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1688-L1700).
 `SetDefaultSpecUpdate` is emitted even on the reuse branch (this line), also 
per the [`RESTSessionCatalog.replaceTransaction` 
block](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L1066-L1070).



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,150 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)
+        if existing_schema_id is not None:
+            self._updates += 
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+        else:
+            next_schema_id = max((s.schema_id for s in 
table_metadata.schemas), default=-1) + 1
+            schema_with_fresh_id = new_schema.model_copy(update={"schema_id": 
next_schema_id})
+            self._updates += (
+                AddSchemaUpdate(schema_=schema_with_fresh_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+            )
+
+        # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on 
add to avoid
+        # collisions with existing specs (AddPartitionSpecUpdate refuses 
duplicate IDs).
+        effective_spec = UNPARTITIONED_PARTITION_SPEC if 
new_spec.is_unpartitioned() else new_spec
+        existing_spec_id = self._find_matching_spec_id(table_metadata, 
effective_spec)
+        if existing_spec_id is not None:
+            self._updates += (SetDefaultSpecUpdate(spec_id=existing_spec_id),)
+        else:
+            next_spec_id = max((s.spec_id for s in 
table_metadata.partition_specs), default=-1) + 1
+            spec_with_fresh_id = PartitionSpec(*effective_spec.fields, 
spec_id=next_spec_id)
+            self._updates += (
+                AddPartitionSpecUpdate(spec=spec_with_fresh_id),
+                SetDefaultSpecUpdate(spec_id=-1),
+            )
+
+        # Sort order: same reuse-or-add pattern with fresh order_id on add.
+        effective_sort_order = UNSORTED_SORT_ORDER if 
new_sort_order.is_unsorted else new_sort_order

Review Comment:
   [AI Reviewer Aid] Per Java's 
[`reuseOrCreateNewSortOrderId`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1736-L1752).
 Unsorted reuses id 0 in Java; the `effective_sort_order` substitution on this 
line achieves the same — a caller-supplied unsorted order folds back to the 
canonical `UNSORTED_SORT_ORDER` so the matching lookup below hits the 
always-present order_id 0.



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,117 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+        staged_table, fresh_schema, fresh_spec, fresh_sort_order, 
resolved_location = self._replace_staged_table(
+            identifier, schema, location, partition_spec, sort_order, 
properties
+        )
+        return ReplaceTableTransaction(
+            table=staged_table,
+            new_schema=fresh_schema,
+            new_spec=fresh_spec,
+            new_sort_order=fresh_sort_order,
+            new_location=resolved_location,
+            new_properties=properties,
+        )
+
+    def _replace_staged_table(

Review Comment:
   [AI Reviewer Aid] Structured to parallel 
[`_create_staged_table`](https://github.com/apache/iceberg-python/blob/fc4fedf8827ebafdf28dc4c5f93025cc7fb70056/pyiceberg/catalog/__init__.py#L1099)
 — both isolate fresh-id / spec / location resolution + `StagedTable` 
construction off the public catalog method so per-catalog overrides stay thin.
   
   Note that `replace_table_transaction` is concrete on `Catalog` (commit 
fc4fedf8) while `create_table_transaction` is `@abstractmethod`. The asymmetry 
is deliberate: REST uses a [server-side `stage_create=True` 
endpoint](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java)
 for create-table-transaction (so per-catalog impls genuinely differ); there is 
no `stage_replace` analog, so both backends collapse to the same body around 
this shared helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to