smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3269508110


##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod

Review Comment:
   @/var/folders/bq/gpty1dt579d_lh4mfg191y_508jd0k/T/tmp.vxaAx8TEhW



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+
+    def _replace_staged_table(

Review Comment:
   @/var/folders/bq/gpty1dt579d_lh4mfg191y_508jd0k/T/tmp.fKjI183X2T



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod

Review Comment:
   Pointing out that this is a new abstract method. I'm matching 
https://github.com/apache/iceberg-python/pull/498 that also introduced a new 
abstract method `create_table_transaction`. This technically breaks out-of-tree 
`Catalog` implementations that do not implement this, but I thought this was 
fine given the precedence in the above PR. (Happy to change to this raising 
`NotImplemented` instead and removing `abstractmethod`, but I do prefer 
`abstractmethod` from a design perspective and think it's fine to make this 
change)



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+
+    def _replace_staged_table(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None,
+        partition_spec: PartitionSpec,
+        sort_order: SortOrder,
+        properties: Properties,
+    ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
+        """Load the existing table and build fresh schema/spec/sort-order for 
replacement.
+
+        - reuses existing field IDs by name (from the current schema)
+        - reuses partition field IDs by `(source, transform)` across all specs 
(v2+),
+          or carries forward the current spec with `VoidTransform`s (v1)
+        - reassigns sort field IDs against the fresh schema
+        - resolves `location` to the existing table's location when omitted
+
+        Returns:
+            A tuple `(staged_table, fresh_schema, fresh_partition_spec, 
fresh_sort_order, resolved_location)`.
+        """
+        existing_table = self.load_table(identifier)
+        existing_metadata = existing_table.metadata
+
+        requested_format_version = 
properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version is not None and 
int(requested_format_version) < existing_metadata.format_version:
+            raise ValueError(
+                f"Cannot downgrade format-version from 
{existing_metadata.format_version} to {requested_format_version}"

Review Comment:
   @/var/folders/bq/gpty1dt579d_lh4mfg191y_508jd0k/T/tmp.5HmHlfSl0U



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+
+    def _replace_staged_table(

Review Comment:
   Maps to Java's 
[`TableMetadata.buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L706-L746).
 All the bookkeeping (fresh schema, partition spec, sort order, location 
resolution, `StagedTable` construction) lives here so `MetastoreCatalog` and 
`RestCatalog` share it — analogous to how `_create_staged_table` is factored.



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+
+    def _replace_staged_table(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None,
+        partition_spec: PartitionSpec,
+        sort_order: SortOrder,
+        properties: Properties,
+    ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
+        """Load the existing table and build fresh schema/spec/sort-order for 
replacement.
+
+        - reuses existing field IDs by name (from the current schema)
+        - reuses partition field IDs by `(source, transform)` across all specs 
(v2+),
+          or carries forward the current spec with `VoidTransform`s (v1)
+        - reassigns sort field IDs against the fresh schema
+        - resolves `location` to the existing table's location when omitted
+
+        Returns:
+            A tuple `(staged_table, fresh_schema, fresh_partition_spec, 
fresh_sort_order, resolved_location)`.
+        """
+        existing_table = self.load_table(identifier)
+        existing_metadata = existing_table.metadata
+
+        requested_format_version = 
properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version is not None and 
int(requested_format_version) < existing_metadata.format_version:
+            raise ValueError(
+                f"Cannot downgrade format-version from 
{existing_metadata.format_version} to {requested_format_version}"
+            )
+        resolved_format_version = (
+            int(requested_format_version) if requested_format_version is not 
None else existing_metadata.format_version
+        )
+        iceberg_schema = self._convert_schema_if_needed(schema, 
cast(TableVersion, resolved_format_version))
+        iceberg_schema.check_format_version_compatibility(cast(TableVersion, 
resolved_format_version))

Review Comment:
   @/var/folders/bq/gpty1dt579d_lh4mfg191y_508jd0k/T/tmp.FtSuhtVKHl



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+
+    def _replace_staged_table(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None,
+        partition_spec: PartitionSpec,
+        sort_order: SortOrder,
+        properties: Properties,
+    ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
+        """Load the existing table and build fresh schema/spec/sort-order for 
replacement.
+
+        - reuses existing field IDs by name (from the current schema)
+        - reuses partition field IDs by `(source, transform)` across all specs 
(v2+),
+          or carries forward the current spec with `VoidTransform`s (v1)
+        - reassigns sort field IDs against the fresh schema
+        - resolves `location` to the existing table's location when omitted
+
+        Returns:
+            A tuple `(staged_table, fresh_schema, fresh_partition_spec, 
fresh_sort_order, resolved_location)`.
+        """
+        existing_table = self.load_table(identifier)
+        existing_metadata = existing_table.metadata
+
+        requested_format_version = 
properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version is not None and 
int(requested_format_version) < existing_metadata.format_version:
+            raise ValueError(
+                f"Cannot downgrade format-version from 
{existing_metadata.format_version} to {requested_format_version}"

Review Comment:
   Java's 
[`buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L733-L738)
 reads `format-version` from properties and only upgrades. Rejecting downgrade 
explicitly here — otherwise `_convert_schema_if_needed` would run with v1 
semantics while the actual upgrade silently drops, producing a confusing 
mismatch.



##########
pyiceberg/catalog/__init__.py:
##########
@@ -924,6 +1023,28 @@ def create_table_transaction(
             self._create_staged_table(identifier, schema, location, 
partition_spec, sort_order, properties)
         )
 
+    @override
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        staged_table, fresh_schema, fresh_spec, fresh_sort_order, 
resolved_location = self._replace_staged_table(

Review Comment:
   @/var/folders/bq/gpty1dt579d_lh4mfg191y_508jd0k/T/tmp.b5YudSpgfl



##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
         except TableAlreadyExistsError:
             return self.load_table(identifier)
 
+    @abstractmethod
+    def replace_table_transaction(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> ReplaceTableTransaction:
+        """Create a ReplaceTableTransaction.
+
+        The transaction can be used to stage additional changes (schema 
evolution,
+        partition evolution, etc.) before committing.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): New table schema.
+            location (str | None): New table location. Defaults to the 
existing location.
+            partition_spec (PartitionSpec): New partition spec.
+            sort_order (SortOrder): New sort order.
+            properties (Properties): Properties to apply. Merged on top of the 
existing
+                table properties: keys present here override existing values; 
existing keys
+                not present here are preserved. To remove a property, follow 
up with a
+                transaction that removes it explicitly.
+
+        Returns:
+            ReplaceTableTransaction: A transaction for the replace operation.
+
+        Raises:
+            NoSuchTableError: If the table does not exist.
+        """
+
+    def _replace_staged_table(
+        self,
+        identifier: str | Identifier,
+        schema: Schema | pa.Schema,
+        location: str | None,
+        partition_spec: PartitionSpec,
+        sort_order: SortOrder,
+        properties: Properties,
+    ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
+        """Load the existing table and build fresh schema/spec/sort-order for 
replacement.
+
+        - reuses existing field IDs by name (from the current schema)
+        - reuses partition field IDs by `(source, transform)` across all specs 
(v2+),
+          or carries forward the current spec with `VoidTransform`s (v1)
+        - reassigns sort field IDs against the fresh schema
+        - resolves `location` to the existing table's location when omitted
+
+        Returns:
+            A tuple `(staged_table, fresh_schema, fresh_partition_spec, 
fresh_sort_order, resolved_location)`.
+        """
+        existing_table = self.load_table(identifier)
+        existing_metadata = existing_table.metadata
+
+        requested_format_version = 
properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version is not None and 
int(requested_format_version) < existing_metadata.format_version:
+            raise ValueError(
+                f"Cannot downgrade format-version from 
{existing_metadata.format_version} to {requested_format_version}"
+            )
+        resolved_format_version = (
+            int(requested_format_version) if requested_format_version is not 
None else existing_metadata.format_version
+        )
+        iceberg_schema = self._convert_schema_if_needed(schema, 
cast(TableVersion, resolved_format_version))
+        iceberg_schema.check_format_version_compatibility(cast(TableVersion, 
resolved_format_version))

Review Comment:
   Same call `new_table_metadata` makes 
([metadata.py:597](https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/metadata.py#L597)),
 and the same check Java's Builder runs inside 
[`addSchemaInternal`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1605).
 Catches v1-incompatible types up front rather than failing later inside 
`AddSchemaUpdate`'s apply path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to