smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3261424781


##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +463,136 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    def replace_table(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> Table:
+        """Atomically replace a table's schema, spec, sort order, location, 
and properties.
+
+        The table UUID and history (snapshots, schemas, specs, sort orders) 
are preserved.
+        The current snapshot is cleared (main branch ref is removed).
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            Table: the replaced table instance.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+            TableAlreadyExistsError: If a view exists at the same identifier.
+        """
+        return self.replace_table_transaction(
+            identifier, schema, location, partition_spec, sort_order, 
properties
+        ).commit_transaction()
+
+    def replace_table_transaction(

Review Comment:
   Fair point — checked PR #498 (`create_table_transaction`) and that was added 
as `@abstractmethod` on `Catalog` directly, no back-compat stub. Matching that 
precedent now: `replace_table_transaction` is also `@abstractmethod`. 
`NoopCatalog` gets a stub since it extends `Catalog` directly.



##########
pyiceberg/partitioning.py:
##########
@@ -335,6 +335,175 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, 
old_schema: Schema, fre
     return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
 
 
+def assign_fresh_partition_spec_ids_for_replace(
+    spec: PartitionSpec,
+    old_schema: Schema,
+    fresh_schema: Schema,
+    existing_specs: list[PartitionSpec],
+    last_partition_id: int | None,
+    format_version: int = 2,
+    current_spec: PartitionSpec | None = None,
+) -> tuple[PartitionSpec, int]:
+    """Assign partition field IDs for a replace operation, reusing IDs from 
existing specs.
+
+    - For v2+, reuse partition field IDs by `(source_id, transform)` across 
all existing specs.
+      New fields get IDs starting from `last_partition_id + 1`.
+    - For v1, the current spec's fields must be preserved (v1 specs are 
append-only). Fields
+      absent from the new spec are carried forward with a `VoidTransform`. 
Matching new fields
+      reuse the existing partition field ID; remaining new fields are appended 
with fresh IDs.
+
+    Args:
+        spec: The new partition spec to assign IDs to. Its `source_id`s 
reference `old_schema`.
+        old_schema: The schema that the new spec's `source_id`s reference.
+        fresh_schema: The schema with freshly assigned field IDs.
+        existing_specs: All partition specs from the existing table metadata.
+        last_partition_id: The current table's `last_partition_id`.
+        format_version: Table format version. Required to be set to 1 for v1 
carry-forward.
+        current_spec: The current default partition spec. Required when 
`format_version <= 1`.
+
+    Returns:
+        A tuple of `(fresh_spec, new_last_partition_id)`.
+    """
+    effective_last_partition_id = last_partition_id if last_partition_id is 
not None else PARTITION_FIELD_ID_START - 1
+
+    if format_version <= 1:
+        if current_spec is None:
+            raise ValueError("current_spec is required for v1 replace_table")
+        return _assign_fresh_partition_spec_ids_for_replace_v1(
+            spec, old_schema, fresh_schema, current_spec, 
effective_last_partition_id
+        )
+
+    # v2+: reuse field IDs by (source_id, transform) across all specs. When 
the same
+    # (source_id, transform) appears in multiple specs, prefer the highest 
field_id.
+    transform_to_field_id: dict[tuple[int, str], int] = {}
+    for existing_spec in existing_specs:
+        for field in existing_spec.fields:
+            key = (field.source_id, str(field.transform))
+            if key not in transform_to_field_id or field.field_id > 
transform_to_field_id[key]:
+                transform_to_field_id[key] = field.field_id
+
+    next_id = effective_last_partition_id
+    partition_fields = []
+    for field in spec.fields:
+        original_column_name = old_schema.find_column_name(field.source_id)
+        if original_column_name is None:
+            raise ValueError(f"Could not find in old schema: {field}")
+        fresh_field = fresh_schema.find_field(original_column_name)
+        if fresh_field is None:
+            raise ValueError(f"Could not find field in fresh schema: 
{original_column_name}")
+
+        validate_partition_name(field.name, field.transform, 
fresh_field.field_id, fresh_schema, set())
+
+        key = (fresh_field.field_id, str(field.transform))
+        if key in transform_to_field_id:
+            partition_field_id = transform_to_field_id[key]
+        else:
+            next_id += 1
+            partition_field_id = next_id
+            transform_to_field_id[key] = partition_field_id
+
+        partition_fields.append(
+            PartitionField(
+                name=field.name,
+                source_id=fresh_field.field_id,
+                field_id=partition_field_id,
+                transform=field.transform,
+            )
+        )
+
+    # `next_id` starts at `effective_last_partition_id` and only increments, 
so it is the
+    # new last partition id.
+    return PartitionSpec(*partition_fields, 
spec_id=INITIAL_PARTITION_SPEC_ID), next_id
+
+
+def _assign_fresh_partition_spec_ids_for_replace_v1(

Review Comment:
   Yes, PyIceberg supports v1 — you can create v1 tables today via 
`create_table(..., properties={"format-version": "1"})` and there are tests for 
it (`test_create_v1_table` etc.). For replace specifically: v1 partition specs 
are append-only by spec rule, so a replace that drops a partition field would 
produce an invalid v1 spec without this carry-forward. Mirrors Java's [v1 
branch in 
`reassignPartitionIds`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L668-L700)
 — covered by `test_replace_table_v1_carries_forward_partition_fields_as_void` 
and the helper-level v1 tests. Happy to defer if you'd rather, but I think it's 
worth keeping since it's a correctness gap without it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to