smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3261424963


##########
pyiceberg/partitioning.py:
##########
@@ -335,6 +335,175 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, 
old_schema: Schema, fre
     return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
 
 
+def assign_fresh_partition_spec_ids_for_replace(
+    spec: PartitionSpec,
+    old_schema: Schema,
+    fresh_schema: Schema,
+    existing_specs: list[PartitionSpec],
+    last_partition_id: int | None,
+    format_version: int = 2,
+    current_spec: PartitionSpec | None = None,
+) -> tuple[PartitionSpec, int]:
+    """Assign partition field IDs for a replace operation, reusing IDs from 
existing specs.
+
+    - For v2+, reuse partition field IDs by `(source_id, transform)` across 
all existing specs.
+      New fields get IDs starting from `last_partition_id + 1`.
+    - For v1, the current spec's fields must be preserved (v1 specs are 
append-only). Fields
+      absent from the new spec are carried forward with a `VoidTransform`. 
Matching new fields
+      reuse the existing partition field ID; remaining new fields are appended 
with fresh IDs.
+
+    Args:
+        spec: The new partition spec to assign IDs to. Its `source_id`s 
reference `old_schema`.
+        old_schema: The schema that the new spec's `source_id`s reference.
+        fresh_schema: The schema with freshly assigned field IDs.
+        existing_specs: All partition specs from the existing table metadata.
+        last_partition_id: The current table's `last_partition_id`.
+        format_version: Table format version. Required to be set to 1 for v1 
carry-forward.
+        current_spec: The current default partition spec. Required when 
`format_version <= 1`.
+
+    Returns:
+        A tuple of `(fresh_spec, new_last_partition_id)`.
+    """
+    effective_last_partition_id = last_partition_id if last_partition_id is 
not None else PARTITION_FIELD_ID_START - 1
+
+    if format_version <= 1:
+        if current_spec is None:
+            raise ValueError("current_spec is required for v1 replace_table")
+        return _assign_fresh_partition_spec_ids_for_replace_v1(
+            spec, old_schema, fresh_schema, current_spec, 
effective_last_partition_id
+        )
+
+    # v2+: reuse field IDs by (source_id, transform) across all specs. When 
the same
+    # (source_id, transform) appears in multiple specs, prefer the highest 
field_id.
+    transform_to_field_id: dict[tuple[int, str], int] = {}
+    for existing_spec in existing_specs:
+        for field in existing_spec.fields:
+            key = (field.source_id, str(field.transform))
+            if key not in transform_to_field_id or field.field_id > 
transform_to_field_id[key]:
+                transform_to_field_id[key] = field.field_id
+
+    next_id = effective_last_partition_id
+    partition_fields = []
+    for field in spec.fields:
+        original_column_name = old_schema.find_column_name(field.source_id)
+        if original_column_name is None:
+            raise ValueError(f"Could not find in old schema: {field}")
+        fresh_field = fresh_schema.find_field(original_column_name)
+        if fresh_field is None:
+            raise ValueError(f"Could not find field in fresh schema: 
{original_column_name}")
+
+        validate_partition_name(field.name, field.transform, 
fresh_field.field_id, fresh_schema, set())
+
+        key = (fresh_field.field_id, str(field.transform))
+        if key in transform_to_field_id:
+            partition_field_id = transform_to_field_id[key]
+        else:
+            next_id += 1
+            partition_field_id = next_id
+            transform_to_field_id[key] = partition_field_id
+
+        partition_fields.append(
+            PartitionField(
+                name=field.name,
+                source_id=fresh_field.field_id,
+                field_id=partition_field_id,
+                transform=field.transform,
+            )
+        )
+
+    # `next_id` starts at `effective_last_partition_id` and only increments, 
so it is the
+    # new last partition id.
+    return PartitionSpec(*partition_fields, 
spec_id=INITIAL_PARTITION_SPEC_ID), next_id
+
+
+def _assign_fresh_partition_spec_ids_for_replace_v1(

Review Comment:
   [AI Reviewer Aid] V1 partition specs are append-only by spec rule, so a 
replace that drops a partition field would produce an invalid v1 spec without 
this carry-forward. Mirrors Java's [v1 branch in 
`reassignPartitionIds`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L668-L700)
 — covered by `test_replace_table_v1_carries_forward_partition_fields_as_void` 
and the helper-level v1 tests.



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,148 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)
+        if existing_schema_id is not None:
+            self._updates += 
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+        else:
+            next_schema_id = max((s.schema_id for s in 
table_metadata.schemas), default=-1) + 1
+            schema_with_fresh_id = new_schema.model_copy(update={"schema_id": 
next_schema_id})
+            self._updates += (
+                AddSchemaUpdate(schema_=schema_with_fresh_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+            )
+
+        # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on 
add to avoid
+        # collisions with existing specs (AddPartitionSpecUpdate refuses 
duplicate IDs).
+        effective_spec = UNPARTITIONED_PARTITION_SPEC if 
new_spec.is_unpartitioned() else new_spec
+        existing_spec_id = self._find_matching_spec_id(table_metadata, 
effective_spec)
+        if existing_spec_id is not None:
+            self._updates += (SetDefaultSpecUpdate(spec_id=existing_spec_id),)
+        else:
+            next_spec_id = max((s.spec_id for s in 
table_metadata.partition_specs), default=-1) + 1
+            spec_with_fresh_id = PartitionSpec(*effective_spec.fields, 
spec_id=next_spec_id)
+            self._updates += (
+                AddPartitionSpecUpdate(spec=spec_with_fresh_id),
+                SetDefaultSpecUpdate(spec_id=-1),
+            )
+
+        # Sort order: same reuse-or-add pattern with fresh order_id on add.
+        effective_sort_order = UNSORTED_SORT_ORDER if 
new_sort_order.is_unsorted else new_sort_order
+        existing_order_id = self._find_matching_sort_order_id(table_metadata, 
effective_sort_order)
+        if existing_order_id is not None:
+            self._updates += 
(SetDefaultSortOrderUpdate(sort_order_id=existing_order_id),)
+        else:
+            next_order_id = max((o.order_id for o in 
table_metadata.sort_orders), default=-1) + 1
+            sort_order_with_fresh_id = SortOrder(*effective_sort_order.fields, 
order_id=next_order_id)
+            self._updates += (
+                AddSortOrderUpdate(sort_order=sort_order_with_fresh_id),
+                SetDefaultSortOrderUpdate(sort_order_id=-1),
+            )
+
+        # Set location if changed.
+        if new_location != table_metadata.location:
+            self._updates += (SetLocationUpdate(location=new_location),)
+
+        # Merge properties (SetPropertiesUpdate merges onto existing 
properties).
+        if new_properties:
+            self._updates += (SetPropertiesUpdate(updates=new_properties),)
+
+    @staticmethod
+    def _find_matching_schema_id(table_metadata: TableMetadata, schema: 
Schema) -> int | None:
+        """Find an existing schema structurally equal to the given one, 
returning its schema_id or None."""
+        for existing in table_metadata.schemas:
+            if existing == schema:
+                return existing.schema_id
+        return None
+
+    @staticmethod
+    def _find_matching_spec_id(table_metadata: TableMetadata, spec: 
PartitionSpec) -> int | None:
+        """Find an existing partition spec with the same fields, returning its 
spec_id or None."""
+        for existing in table_metadata.partition_specs:
+            if existing.fields == spec.fields:
+                return existing.spec_id
+        return None
+
+    @staticmethod
+    def _find_matching_sort_order_id(table_metadata: TableMetadata, 
sort_order: SortOrder) -> int | None:
+        """Find an existing sort order with the same fields, returning its 
order_id or None."""
+        for existing in table_metadata.sort_orders:
+            if existing.fields == sort_order.fields:
+                return existing.order_id
+        return None
+
+    def commit_transaction(self) -> Table:

Review Comment:
   [AI Reviewer Aid] Per Java's 
[`UpdateRequirements.forReplaceTable`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/UpdateRequirements.java#L40-L48)
 plus the 
[`AddSchema`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/UpdateRequirements.java#L131-L138)
 / 
[`AddPartitionSpec`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/UpdateRequirements.java#L144-L152)
 per-update additions. `AssertCurrentSchemaID` / `AssertDefaultSpecID` / 
`AssertDefaultSortOrderID` are deliberately *not* emitted — Java suppresses 
them via the `!isReplace` guards.
   
   One deliberate divergence: Python emits `AssertLastAssignedFieldId` / 
`AssertLastAssignedPartitionId` unconditionally; Java emits them only when the 
change set actually contains an `AddSchema` / `AddPartitionSpec` (via the 
[`Builder.update`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/UpdateRequirements.java#L131-L152)
 dispatch).
   
   The divergence only kicks in when the replace "replays a historical shape" — 
the new schema/spec is structurally identical to an entry already in history, 
so PyIceberg emits `SetCurrentSchema` / `SetDefaultSpec` reusing the old id 
rather than `Add*`. In that case Java emits no `AssertLastAssigned*Id`; Python 
still does.
   
   Why keep the stricter check: it guards against silently losing a concurrent 
column-add. Consider replaying `schema_id=0` while another writer concurrently 
adds a column (bumping `last_column_id`). Java accepts the commit and the new 
column drops out of the current schema; Python rejects with an 
`assert-last-assigned-field-id` mismatch, forcing an explicit reload + retry. 
Exercised by `test_concurrent_replace_transaction_schema_conflict` / 
`..._partition_spec_conflict`.



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,148 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)
+        if existing_schema_id is not None:
+            self._updates += 
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+        else:
+            next_schema_id = max((s.schema_id for s in 
table_metadata.schemas), default=-1) + 1
+            schema_with_fresh_id = new_schema.model_copy(update={"schema_id": 
next_schema_id})
+            self._updates += (
+                AddSchemaUpdate(schema_=schema_with_fresh_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+            )
+
+        # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on 
add to avoid
+        # collisions with existing specs (AddPartitionSpecUpdate refuses 
duplicate IDs).
+        effective_spec = UNPARTITIONED_PARTITION_SPEC if 
new_spec.is_unpartitioned() else new_spec
+        existing_spec_id = self._find_matching_spec_id(table_metadata, 
effective_spec)
+        if existing_spec_id is not None:
+            self._updates += (SetDefaultSpecUpdate(spec_id=existing_spec_id),)
+        else:
+            next_spec_id = max((s.spec_id for s in 
table_metadata.partition_specs), default=-1) + 1
+            spec_with_fresh_id = PartitionSpec(*effective_spec.fields, 
spec_id=next_spec_id)
+            self._updates += (
+                AddPartitionSpecUpdate(spec=spec_with_fresh_id),
+                SetDefaultSpecUpdate(spec_id=-1),
+            )
+
+        # Sort order: same reuse-or-add pattern with fresh order_id on add.
+        effective_sort_order = UNSORTED_SORT_ORDER if 
new_sort_order.is_unsorted else new_sort_order
+        existing_order_id = self._find_matching_sort_order_id(table_metadata, 
effective_sort_order)
+        if existing_order_id is not None:
+            self._updates += 
(SetDefaultSortOrderUpdate(sort_order_id=existing_order_id),)
+        else:
+            next_order_id = max((o.order_id for o in 
table_metadata.sort_orders), default=-1) + 1
+            sort_order_with_fresh_id = SortOrder(*effective_sort_order.fields, 
order_id=next_order_id)
+            self._updates += (
+                AddSortOrderUpdate(sort_order=sort_order_with_fresh_id),
+                SetDefaultSortOrderUpdate(sort_order_id=-1),
+            )
+
+        # Set location if changed.
+        if new_location != table_metadata.location:
+            self._updates += (SetLocationUpdate(location=new_location),)
+
+        # Merge properties (SetPropertiesUpdate merges onto existing 
properties).
+        if new_properties:
+            self._updates += (SetPropertiesUpdate(updates=new_properties),)
+
+    @staticmethod
+    def _find_matching_schema_id(table_metadata: TableMetadata, schema: 
Schema) -> int | None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`reuseOrCreateNewSchemaId`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1642-L1653)
 (and the 
[spec](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1688-L1700)
 / 
[sort-order](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1736-L1752)
 siblings) — walk all historical entries, return the existing id on structural 
match, otherwise generate a fresh one. Covers the case Fokko walked through in 
[#433 
(comment)](https://github.com/apache/iceberg-python/pull/433#discussion_r1524529502):
 `CREATE OR REPLACE` back to a previously-seen schema reuses its `schema_id` 
and does not append a duplicate.



##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,298 @@ def test_load_table_from_self_identifier(
     assert table.metadata == loaded_table.metadata
 
 
+_SIMPLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+)
+
+
+def _create_simple_table(
+    catalog: Catalog,
+    identifier: Identifier,
+    *,
+    schema: Schema = _SIMPLE_SCHEMA,
+    format_version: int = 2,
+    partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+    properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+    namespace = Catalog.namespace_from(identifier)
+    catalog.create_namespace_if_not_exists(namespace)
+    merged_properties = {"format-version": str(format_version), **(properties 
or {})}
+    catalog.create_table(identifier, schema=schema, 
partition_spec=partition_spec, properties=merged_properties)
+    return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+    return pa.Table.from_pydict(
+        {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in 
range(num_rows)]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+    )
+
+
+_REPLACE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier) -> None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`testReplaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2571-L2615)
 (UUID + schema swap + time-travel-readable old snapshot) and folds in the 
snapshot-log invariant from 
[`testReplaceTableKeepsSnapshotLog`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2708-L2738)
 (pre-replace `snapshot_log` entries must survive).



##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,298 @@ def test_load_table_from_self_identifier(
     assert table.metadata == loaded_table.metadata
 
 
+_SIMPLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+)
+
+
+def _create_simple_table(
+    catalog: Catalog,
+    identifier: Identifier,
+    *,
+    schema: Schema = _SIMPLE_SCHEMA,
+    format_version: int = 2,
+    partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+    properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+    namespace = Catalog.namespace_from(identifier)
+    catalog.create_namespace_if_not_exists(namespace)
+    merged_properties = {"format-version": str(format_version), **(properties 
or {})}
+    catalog.create_table(identifier, schema=schema, 
partition_spec=partition_spec, properties=merged_properties)
+    return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+    return pa.Table.from_pydict(
+        {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in 
range(num_rows)]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+    )
+
+
+_REPLACE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier) -> None:
+    _, original_schema = _create_simple_table(catalog, test_table_identifier)
+    original = catalog.load_table(test_table_identifier)
+    original.append(_simple_data())
+    original = catalog.load_table(test_table_identifier)
+    old_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+    snapshot_log_before = list(original.metadata.snapshot_log)
+    assert len(snapshot_log_before) == 1
+
+    catalog.replace_table(test_table_identifier, schema=_REPLACE_SCHEMA)
+    replaced = catalog.load_table(test_table_identifier)
+
+    # UUID + history preserved, current snapshot cleared, current schema 
swapped.
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.metadata.current_snapshot_id is None
+    assert {f.name for f in replaced.schema().fields} == {"id", "data", 
"extra"}
+    # Old snapshot kept by identity (not just count), and snapshot_log entries 
from before survive.
+    assert any(s.snapshot_id == old_snapshot_id for s in 
replaced.metadata.snapshots)
+    assert all(entry in replaced.metadata.snapshot_log for entry in 
snapshot_log_before)
+    # Old schema is still in the schemas list alongside the new one.
+    schema_ids = sorted(s.schema_id for s in replaced.metadata.schemas)
+    assert schema_ids == [0, 1]
+    assert replaced.metadata.current_schema_id == 1
+    # Time-travel back to the pre-replace snapshot returns the rows that were 
there before.
+    assert 
replaced.scan(snapshot_id=old_snapshot_id).to_arrow().equals(_simple_data())
+
+
+def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier, tmp_path: Path) -> None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`testCompleteReplaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2618-L2692)
 — exercises all six `replace_table` args (schema + spec + sort + location + 
properties) in one transaction with an RTAS append, asserts history accumulates 
and the new snapshot has no parent. Also asserts property-merge semantics (keep 
/ override / add) which Java's variant tests under 
`testReplaceTransactionProperties*` in the same file.



##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,298 @@ def test_load_table_from_self_identifier(
     assert table.metadata == loaded_table.metadata
 
 
+_SIMPLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+)
+
+
+def _create_simple_table(
+    catalog: Catalog,
+    identifier: Identifier,
+    *,
+    schema: Schema = _SIMPLE_SCHEMA,
+    format_version: int = 2,
+    partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+    properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+    namespace = Catalog.namespace_from(identifier)
+    catalog.create_namespace_if_not_exists(namespace)
+    merged_properties = {"format-version": str(format_version), **(properties 
or {})}
+    catalog.create_table(identifier, schema=schema, 
partition_spec=partition_spec, properties=merged_properties)
+    return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+    return pa.Table.from_pydict(
+        {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in 
range(num_rows)]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+    )
+
+
+_REPLACE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier) -> None:
+    _, original_schema = _create_simple_table(catalog, test_table_identifier)
+    original = catalog.load_table(test_table_identifier)
+    original.append(_simple_data())
+    original = catalog.load_table(test_table_identifier)
+    old_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+    snapshot_log_before = list(original.metadata.snapshot_log)
+    assert len(snapshot_log_before) == 1
+
+    catalog.replace_table(test_table_identifier, schema=_REPLACE_SCHEMA)
+    replaced = catalog.load_table(test_table_identifier)
+
+    # UUID + history preserved, current snapshot cleared, current schema 
swapped.
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.metadata.current_snapshot_id is None
+    assert {f.name for f in replaced.schema().fields} == {"id", "data", 
"extra"}
+    # Old snapshot kept by identity (not just count), and snapshot_log entries 
from before survive.
+    assert any(s.snapshot_id == old_snapshot_id for s in 
replaced.metadata.snapshots)
+    assert all(entry in replaced.metadata.snapshot_log for entry in 
snapshot_log_before)
+    # Old schema is still in the schemas list alongside the new one.
+    schema_ids = sorted(s.schema_id for s in replaced.metadata.schemas)
+    assert schema_ids == [0, 1]
+    assert replaced.metadata.current_schema_id == 1
+    # Time-travel back to the pre-replace snapshot returns the rows that were 
there before.
+    assert 
replaced.scan(snapshot_id=old_snapshot_id).to_arrow().equals(_simple_data())
+
+
+def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier, tmp_path: Path) -> None:
+    _create_simple_table(catalog, test_table_identifier, properties={"keep": 
"yes", "override": "old"})
+    catalog.load_table(test_table_identifier).append(_simple_data())
+    original = catalog.load_table(test_table_identifier)
+    old_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+    original_data = original.scan().to_arrow()
+
+    new_location = f"file://{tmp_path}/replaced"
+    new_schema = Schema(
+        NestedField(field_id=1, name="id", field_type=LongType(), 
required=False),
+        NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+        NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+    )
+    new_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
name="id_part", transform=IdentityTransform()))
+    new_sort = SortOrder(SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC))
+    new_data = pa.Table.from_pydict(
+        {"id": [10, 20], "data": ["alice", "bob"], "extra": [True, False]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string()), pa.field("extra", pa.bool_())]),
+    )
+
+    with catalog.replace_table_transaction(
+        test_table_identifier,
+        schema=new_schema,
+        partition_spec=new_spec,
+        sort_order=new_sort,
+        location=new_location,
+        properties={"override": "new", "added": "v"},
+    ) as txn:
+        txn.append(new_data)
+
+    replaced = catalog.load_table(test_table_identifier)
+
+    # Identity invariants.
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.metadata.location == new_location
+
+    # New schema / spec / sort applied; old entries retained in history.
+    assert {f.name for f in replaced.schema().fields} == {"id", "data", 
"extra"}
+    assert sorted(s.schema_id for s in replaced.metadata.schemas) == [0, 1]
+    assert replaced.spec().fields[0].source_id == 1
+    assert isinstance(replaced.spec().fields[0].transform, IdentityTransform)
+    assert {s.spec_id for s in replaced.metadata.partition_specs} == {0, 1}
+    assert replaced.sort_order().fields == new_sort.fields
+    assert {s.order_id for s in replaced.metadata.sort_orders} == {0, 
replaced.metadata.default_sort_order_id}
+
+    # Property merge: kept, overridden, added — and `format-version` does not 
leak.
+    assert replaced.properties["keep"] == "yes"
+    assert replaced.properties["override"] == "new"
+    assert replaced.properties["added"] == "v"
+    assert "format-version" not in replaced.properties
+
+    # RTAS atomicity: new snapshot exists, has no parent (fresh start), old 
snapshot is still
+    # in the snapshot list, and time-travel reads return the original rows.
+    new_snapshot = replaced.current_snapshot()
+    assert new_snapshot is not None
+    assert new_snapshot.snapshot_id != old_snapshot_id
+    assert new_snapshot.parent_snapshot_id is None
+    assert any(s.snapshot_id == old_snapshot_id for s in 
replaced.metadata.snapshots)
+    assert replaced.scan().to_arrow().num_rows == 2
+    # Time-travel back to before the replace returns the original rows from 
the old schema.
+    time_travel = replaced.scan(snapshot_id=old_snapshot_id).to_arrow()
+    assert time_travel.num_rows == original_data.num_rows
+    assert time_travel.column("id").to_pylist() == 
original_data.column("id").to_pylist()
+
+
+def test_replace_transaction_requires_table_exists(catalog: Catalog, 
test_table_identifier: Identifier) -> None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`testReplaceTransactionRequiresTableExists`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2695-L2705).



##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,298 @@ def test_load_table_from_self_identifier(
     assert table.metadata == loaded_table.metadata
 
 
+_SIMPLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+)
+
+
+def _create_simple_table(
+    catalog: Catalog,
+    identifier: Identifier,
+    *,
+    schema: Schema = _SIMPLE_SCHEMA,
+    format_version: int = 2,
+    partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+    properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+    namespace = Catalog.namespace_from(identifier)
+    catalog.create_namespace_if_not_exists(namespace)
+    merged_properties = {"format-version": str(format_version), **(properties 
or {})}
+    catalog.create_table(identifier, schema=schema, 
partition_spec=partition_spec, properties=merged_properties)
+    return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+    return pa.Table.from_pydict(
+        {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in 
range(num_rows)]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+    )
+
+
+_REPLACE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier) -> None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`testReplaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2571-L2615)
 (UUID + schema swap + time-travel-readable old snapshot) and folds in the 
snapshot-log invariant from 
[`testReplaceTableKeepsSnapshotLog`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2708-L2738)
 (pre-replace `snapshot_log` entries must survive).



##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,298 @@ def test_load_table_from_self_identifier(
     assert table.metadata == loaded_table.metadata
 
 
+_SIMPLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+)
+
+
+def _create_simple_table(
+    catalog: Catalog,
+    identifier: Identifier,
+    *,
+    schema: Schema = _SIMPLE_SCHEMA,
+    format_version: int = 2,
+    partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+    properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+    namespace = Catalog.namespace_from(identifier)
+    catalog.create_namespace_if_not_exists(namespace)
+    merged_properties = {"format-version": str(format_version), **(properties 
or {})}
+    catalog.create_table(identifier, schema=schema, 
partition_spec=partition_spec, properties=merged_properties)
+    return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+    return pa.Table.from_pydict(
+        {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in 
range(num_rows)]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+    )
+
+
+_REPLACE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier) -> None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`testReplaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2571-L2615)
 (UUID + schema swap + time-travel-readable old snapshot) and folds in the 
snapshot-log invariant from 
[`testReplaceTableKeepsSnapshotLog`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2708-L2738)
 (pre-replace `snapshot_log` entries must survive).



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+
+    def _replace_staged_table(

Review Comment:
   [AI Reviewer Aid] Maps to Java's 
[`TableMetadata.buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L706-L746).
 All the bookkeeping (fresh schema, partition spec, sort order, location 
resolution, `StagedTable` construction) lives here so `MetastoreCatalog` and 
`RestCatalog` share it — analogous to how `_create_staged_table` is factored.



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+
+    def _replace_staged_table(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None,
+        partition_spec: PartitionSpec,
+        sort_order: SortOrder,
+        properties: Properties,
+    ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
+        """Load the existing table and build fresh schema/spec/sort-order for 
replacement.
+
+        - reuses existing field IDs by name (from the current schema)
+        - reuses partition field IDs by `(source, transform)` across all specs 
(v2+),
+          or carries forward the current spec with `VoidTransform`s (v1)
+        - reassigns sort field IDs against the fresh schema
+        - resolves `location` to the existing table's location when omitted
+
+        Returns:
+            A tuple `(staged_table, fresh_schema, fresh_partition_spec, 
fresh_sort_order, resolved_location)`.
+        """
+        existing_table = self.load_table(identifier)
+        existing_metadata = existing_table.metadata
+
+        requested_format_version = 
properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version is not None and 
int(requested_format_version) < existing_metadata.format_version:
+            raise ValueError(
+                f"Cannot downgrade format-version from 
{existing_metadata.format_version} to {requested_format_version}"
+            )
+        resolved_format_version = (
+            int(requested_format_version) if requested_format_version is not 
None else existing_metadata.format_version
+        )
+        iceberg_schema = self._convert_schema_if_needed(schema, 
cast(TableVersion, resolved_format_version))
+        iceberg_schema.check_format_version_compatibility(cast(TableVersion, 
resolved_format_version))

Review Comment:
   [AI Reviewer Aid] Same call `new_table_metadata` makes 
([metadata.py:597](https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/metadata.py#L597)),
 and the same check Java's Builder runs inside 
[`addSchemaInternal`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1605).
 Catches v1-incompatible types up front rather than failing later inside 
`AddSchemaUpdate`'s apply path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to