smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3261057242


##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)
+        if existing_schema_id is not None:
+            self._updates += 
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+        else:
+            next_schema_id = max((s.schema_id for s in 
table_metadata.schemas), default=-1) + 1
+            schema_with_fresh_id = new_schema.model_copy(update={"schema_id": 
next_schema_id})
+            self._updates += (
+                AddSchemaUpdate(schema_=schema_with_fresh_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+            )
+
+        # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on 
add to avoid
+        # collisions with existing specs (AddPartitionSpecUpdate refuses 
duplicate IDs).
+        effective_spec = UNPARTITIONED_PARTITION_SPEC if 
new_spec.is_unpartitioned() else new_spec
+        existing_spec_id = self._find_matching_spec_id(table_metadata, 
effective_spec)
+        if existing_spec_id is not None:
+            self._updates += (SetDefaultSpecUpdate(spec_id=existing_spec_id),)
+        else:
+            next_spec_id = max((s.spec_id for s in 
table_metadata.partition_specs), default=-1) + 1
+            spec_with_fresh_id = PartitionSpec(*effective_spec.fields, 
spec_id=next_spec_id)
+            self._updates += (
+                AddPartitionSpecUpdate(spec=spec_with_fresh_id),
+                SetDefaultSpecUpdate(spec_id=-1),
+            )
+
+        # Sort order: same reuse-or-add pattern with fresh order_id on add.
+        effective_sort_order = UNSORTED_SORT_ORDER if 
new_sort_order.is_unsorted else new_sort_order
+        existing_order_id = self._find_matching_sort_order_id(table_metadata, 
effective_sort_order)
+        if existing_order_id is not None:
+            self._updates += 
(SetDefaultSortOrderUpdate(sort_order_id=existing_order_id),)
+        else:
+            next_order_id = max((o.order_id for o in 
table_metadata.sort_orders), default=-1) + 1
+            sort_order_with_fresh_id = SortOrder(*effective_sort_order.fields, 
order_id=next_order_id)
+            self._updates += (
+                AddSortOrderUpdate(sort_order=sort_order_with_fresh_id),
+                SetDefaultSortOrderUpdate(sort_order_id=-1),
+            )
+
+        # Set location if changed.
+        if new_location != table_metadata.location:
+            self._updates += (SetLocationUpdate(location=new_location),)
+
+        # Merge properties (SetPropertiesUpdate merges onto existing 
properties).
+        if new_properties:
+            self._updates += (SetPropertiesUpdate(updates=new_properties),)

Review Comment:
   [AI Reviewer Aid] Properties are merged onto existing, matching Java's 
[`TableMetadata.Builder.setProperties`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1458-L1467)
 which does `properties.putAll(updated)`. Documented in the public docstring — 
callers wanting to remove keys must use `replace_table_transaction` and drop 
them explicitly inside the txn.



##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,298 @@ def test_load_table_from_self_identifier(
     assert table.metadata == loaded_table.metadata
 
 
+_SIMPLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+)
+
+
+def _create_simple_table(
+    catalog: Catalog,
+    identifier: Identifier,
+    *,
+    schema: Schema = _SIMPLE_SCHEMA,
+    format_version: int = 2,
+    partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+    properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+    namespace = Catalog.namespace_from(identifier)
+    catalog.create_namespace_if_not_exists(namespace)
+    merged_properties = {"format-version": str(format_version), **(properties 
or {})}
+    catalog.create_table(identifier, schema=schema, 
partition_spec=partition_spec, properties=merged_properties)
+    return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+    return pa.Table.from_pydict(
+        {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in 
range(num_rows)]},
+        schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+    )
+
+
+_REPLACE_SCHEMA = Schema(
+    NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+    NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="extra", field_type=BooleanType(), 
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier) -> None:
+    _, original_schema = _create_simple_table(catalog, test_table_identifier)
+    original = catalog.load_table(test_table_identifier)
+    original.append(_simple_data())
+    original = catalog.load_table(test_table_identifier)
+    old_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+    snapshot_log_before = list(original.metadata.snapshot_log)
+    assert len(snapshot_log_before) == 1
+
+    catalog.replace_table(test_table_identifier, schema=_REPLACE_SCHEMA)
+    replaced = catalog.load_table(test_table_identifier)
+
+    # UUID + history preserved, current snapshot cleared, current schema 
swapped.
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.metadata.current_snapshot_id is None
+    assert {f.name for f in replaced.schema().fields} == {"id", "data", 
"extra"}
+    # Old snapshot kept by identity (not just count), and snapshot_log entries 
from before survive.
+    assert any(s.snapshot_id == old_snapshot_id for s in 
replaced.metadata.snapshots)
+    assert all(entry in replaced.metadata.snapshot_log for entry in 
snapshot_log_before)
+    # Old schema is still in the schemas list alongside the new one.
+    schema_ids = sorted(s.schema_id for s in replaced.metadata.schemas)
+    assert schema_ids == [0, 1]
+    assert replaced.metadata.current_schema_id == 1
+    # Time-travel back to the pre-replace snapshot returns the rows that were 
there before.
+    assert 
replaced.scan(snapshot_id=old_snapshot_id).to_arrow().equals(_simple_data())
+
+
+def test_complete_replace_transaction(catalog: Catalog, test_table_identifier: 
Identifier, tmp_path: Path) -> None:

Review Comment:
   [AI Reviewer Aid] Mirrors Java's 
[`testCompleteReplaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2618-L2692)
 — exercises all six `replace_table` args (schema + spec + sort + location + 
properties) in one transaction with an RTAS append, asserts history accumulates 
and the new snapshot has no parent. Also asserts property-merge semantics (keep 
/ override / add) which Java's variant tests under 
`testReplaceTransactionProperties*` in the same file.



##########
tests/integration/test_rest_catalog.py:
##########
@@ -69,6 +72,81 @@ def test_create_namespace_if_already_existing(catalog: 
RestCatalog) -> None:
     assert catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER)
 
 
[email protected]
[email protected]("catalog", [lf("session_catalog")])
+def test_replace_table_end_to_end_against_rest_server(catalog: Catalog) -> 
None:
+    """End-to-end smoke test: replace_table against a real REST server.
+
+    Detailed replace_table semantics are covered against InMemoryCatalog and 
SqlCatalog in
+    `tests/catalog/test_catalog_behaviors.py`. This test verifies the REST 
wire path: server
+    accepts the commit, preserves the UUID, and clears the current snapshot."""

Review Comment:
   ```suggestion
   ```



##########
tests/integration/test_rest_catalog.py:
##########
@@ -69,6 +72,81 @@ def test_create_namespace_if_already_existing(catalog: 
RestCatalog) -> None:
     assert catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER)
 
 
[email protected]
[email protected]("catalog", [lf("session_catalog")])
+def test_replace_table_end_to_end_against_rest_server(catalog: Catalog) -> 
None:

Review Comment:
   ```suggestion
   def test_replace_table(catalog: Catalog) -> None:
   ```



##########
tests/integration/test_rest_catalog.py:
##########
@@ -69,6 +72,81 @@ def test_create_namespace_if_already_existing(catalog: 
RestCatalog) -> None:
     assert catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER)
 
 
[email protected]
[email protected]("catalog", [lf("session_catalog")])
+def test_replace_table_end_to_end_against_rest_server(catalog: Catalog) -> 
None:
+    """End-to-end smoke test: replace_table against a real REST server.
+
+    Detailed replace_table semantics are covered against InMemoryCatalog and 
SqlCatalog in
+    `tests/catalog/test_catalog_behaviors.py`. This test verifies the REST 
wire path: server
+    accepts the commit, preserves the UUID, and clears the current snapshot."""
+    identifier = f"default.test_replace_table_e2e_{catalog.name}"
+    if not catalog.namespace_exists("default"):
+        catalog.create_namespace("default")
+    if catalog.table_exists(identifier):
+        catalog.drop_table(identifier)
+
+    original_schema = Schema(
+        NestedField(field_id=1, name="id", field_type=LongType(), 
required=False),
+        NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
+    )
+    original = catalog.create_table(identifier, schema=original_schema)
+    original.append(
+        pa.Table.from_pydict(
+            {"id": [1, 2, 3], "data": ["a", "b", "c"]},
+            schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", 
pa.large_string())]),
+        )
+    )
+    original.refresh()
+    original_snapshot_id = original.current_snapshot().snapshot_id  # type: 
ignore[union-attr]
+
+    new_schema = Schema(
+        NestedField(field_id=1, name="id", field_type=LongType(), 
required=False),
+        NestedField(field_id=2, name="name", field_type=StringType(), 
required=False),
+        NestedField(field_id=3, name="active", field_type=BooleanType(), 
required=False),
+    )
+    replaced = catalog.replace_table(identifier, schema=new_schema)
+
+    assert replaced.metadata.table_uuid == original.metadata.table_uuid
+    assert replaced.current_snapshot() is None
+    assert any(s.snapshot_id == original_snapshot_id for s in 
replaced.metadata.snapshots)
+    catalog.drop_table(identifier)
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog")])
+def test_replace_table_transaction_rtas_against_rest_server(catalog: Catalog) 
-> None:
+    """RTAS (Replace Table As Select) against a real REST server: the schema 
swap and the
+    new-data write must land atomically — the new snapshot is current on 
commit."""

Review Comment:
   ```suggestion
   def test_replace_table_transaction(catalog: Catalog) -> None:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to