smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3270015278
##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,107 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)
+ @abstractmethod
Review Comment:
Making this `@abstractmethod` matches PR
[#498](https://github.com/apache/iceberg-python/pull/498)'s precedent for
`create_table_transaction`. It technically breaks out-of-tree `Catalog`
subclasses, but PyIceberg accepts that — we don't accommodate out-of-tree
implementations on this path.
##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,100 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)
+ @abstractmethod
+ def replace_table_transaction(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> ReplaceTableTransaction:
+ """Create a ReplaceTableTransaction.
+
+ The transaction can be used to stage additional changes (schema
evolution,
+ partition evolution, etc.) before committing.
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): New table schema.
+ location (str | None): New table location. Defaults to the
existing location.
+ partition_spec (PartitionSpec): New partition spec.
+ sort_order (SortOrder): New sort order.
+ properties (Properties): Properties to apply. Merged on top of the
existing
+ table properties: keys present here override existing values;
existing keys
+ not present here are preserved. To remove a property, follow
up with a
+ transaction that removes it explicitly.
+
+ Returns:
+ ReplaceTableTransaction: A transaction for the replace operation.
+
+ Raises:
+ NoSuchTableError: If the table does not exist.
+ """
+
+ def _replace_staged_table(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None,
+ partition_spec: PartitionSpec,
+ sort_order: SortOrder,
+ properties: Properties,
+ ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
+ """Load the existing table and build fresh schema/spec/sort-order for
replacement.
+
+ - reuses existing field IDs by name (from the current schema)
+ - reuses partition field IDs by `(source, transform)` across all specs
(v2+),
+ or carries forward the current spec with `VoidTransform`s (v1)
+ - reassigns sort field IDs against the fresh schema
+ - resolves `location` to the existing table's location when omitted
+
+ Returns:
+ A tuple `(staged_table, fresh_schema, fresh_partition_spec,
fresh_sort_order, resolved_location)`.
+ """
+ existing_table = self.load_table(identifier)
+ existing_metadata = existing_table.metadata
+
+ requested_format_version =
properties.get(TableProperties.FORMAT_VERSION)
+ if requested_format_version is not None and
int(requested_format_version) < existing_metadata.format_version:
+ raise ValueError(
+ f"Cannot downgrade format-version from
{existing_metadata.format_version} to {requested_format_version}"
Review Comment:
Java's
[`buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L733-L738)
reads `format-version` from properties and only upgrades. Rejecting downgrade
explicitly here — otherwise `_convert_schema_if_needed` would run with v1
semantics while the actual upgrade silently drops, producing a confusing
mismatch.
##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +449,117 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)
+ def replace_table_transaction(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> ReplaceTableTransaction:
+ """Create a ReplaceTableTransaction.
+
+ The transaction can be used to stage additional changes (schema
evolution,
+ partition evolution, etc.) before committing.
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): New table schema.
+ location (str | None): New table location. Defaults to the
existing location.
+ partition_spec (PartitionSpec): New partition spec.
+ sort_order (SortOrder): New sort order.
+ properties (Properties): Properties to apply. Merged on top of the
existing
+ table properties: keys present here override existing values;
existing keys
+ not present here are preserved. To remove a property, follow
up with a
+ transaction that removes it explicitly.
+
+ Returns:
+ ReplaceTableTransaction: A transaction for the replace operation.
+
+ Raises:
+ NoSuchTableError: If the table does not exist.
+ """
+ staged_table, fresh_schema, fresh_spec, fresh_sort_order,
resolved_location = self._replace_staged_table(
+ identifier, schema, location, partition_spec, sort_order,
properties
+ )
+ return ReplaceTableTransaction(
+ table=staged_table,
+ new_schema=fresh_schema,
+ new_spec=fresh_spec,
+ new_sort_order=fresh_sort_order,
+ new_location=resolved_location,
+ new_properties=properties,
+ )
+
+ def _replace_staged_table(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None,
+ partition_spec: PartitionSpec,
+ sort_order: SortOrder,
+ properties: Properties,
+ ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
+ """Load the existing table and build fresh schema/spec/sort-order for
replacement.
+
+ - reuses existing field IDs by name (from the current schema)
+ - reuses partition field IDs by `(source, transform)` across all specs
(v2+),
+ or carries forward the current spec with `VoidTransform`s (v1)
+ - reassigns sort field IDs against the fresh schema
+ - resolves `location` to the existing table's location when omitted
+
+ Returns:
+ A tuple `(staged_table, fresh_schema, fresh_partition_spec,
fresh_sort_order, resolved_location)`.
+ """
+ existing_table = self.load_table(identifier)
+ existing_metadata = existing_table.metadata
+
+ raw_format_version = properties.get(TableProperties.FORMAT_VERSION)
+ if raw_format_version is not None:
+ try:
+ requested_format_version = int(raw_format_version)
+ except (TypeError, ValueError) as exc:
+ raise ValueError(f"Invalid format-version property:
{raw_format_version!r}") from exc
+ if requested_format_version < existing_metadata.format_version:
+ raise ValueError(
+ f"Cannot downgrade format-version from
{existing_metadata.format_version} to {requested_format_version}"
Review Comment:
[AI Reviewer Aid] Java's
[`buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L733-L738)
reads `format-version` from properties and only upgrades. Rejecting downgrade
explicitly here — otherwise `_convert_schema_if_needed` would run with v1
semantics while the actual upgrade silently drops, producing a confusing
mismatch.
##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
return self._table
+class ReplaceTableTransaction(Transaction):
Review Comment:
[AI Reviewer Aid] Same role as Java's
[`Transactions.replaceTableTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/Transactions.java#L39-L46)
— collects the metadata updates that transform the existing table into the
replacement and commits them with the replace-specific requirements set.
##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
return self._table
+class ReplaceTableTransaction(Transaction):
+ """A transaction that replaces an existing table's schema, spec, sort
order, location, and properties.
+
+ The existing table UUID, snapshots, snapshot log, metadata log, and
history are preserved.
+ The "main" branch ref is removed (current-snapshot-id set to -1), and new
+ schema/spec/sort-order/location/properties are applied.
+ """
+
+ def __init__(
+ self,
+ table: StagedTable,
+ new_schema: Schema,
+ new_spec: PartitionSpec,
+ new_sort_order: SortOrder,
+ new_location: str,
+ new_properties: Properties,
+ ) -> None:
+ super().__init__(table, autocommit=False)
+ self._initial_changes(table.metadata, new_schema, new_spec,
new_sort_order, new_location, new_properties)
+
+ def _initial_changes(
+ self,
+ table_metadata: TableMetadata,
+ new_schema: Schema,
+ new_spec: PartitionSpec,
+ new_sort_order: SortOrder,
+ new_location: str,
+ new_properties: Properties,
+ ) -> None:
+ """Set the initial changes that transform the existing table into the
replacement.
+
+ Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` /
`SetDefaultSortOrder`
+ (even when the resulting id is reused) so the request body
unambiguously signals a
+ replace. Bumps `format-version` when the new properties request it.
+ """
+ # Upgrade format-version if requested via properties.
+ requested_format_version_str =
new_properties.get(TableProperties.FORMAT_VERSION)
+ if requested_format_version_str is not None:
+ requested_format_version = int(requested_format_version_str)
+ if requested_format_version > table_metadata.format_version:
+ self._updates +=
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+ # Remove the main branch ref to clear the current snapshot.
+ self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
Review Comment:
[AI Reviewer Aid] Only `main` is cleared, matching Java's
[`buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L739)
which calls `removeRef(SnapshotRef.MAIN_BRANCH)`. Other branches / tags
survive replace.
##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
return self._table
+class ReplaceTableTransaction(Transaction):
+ """A transaction that replaces an existing table's schema, spec, sort
order, location, and properties.
+
+ The existing table UUID, snapshots, snapshot log, metadata log, and
history are preserved.
+ The "main" branch ref is removed (current-snapshot-id set to -1), and new
+ schema/spec/sort-order/location/properties are applied.
+ """
+
+ def __init__(
+ self,
+ table: StagedTable,
+ new_schema: Schema,
+ new_spec: PartitionSpec,
+ new_sort_order: SortOrder,
+ new_location: str,
+ new_properties: Properties,
+ ) -> None:
+ super().__init__(table, autocommit=False)
+ self._initial_changes(table.metadata, new_schema, new_spec,
new_sort_order, new_location, new_properties)
+
+ def _initial_changes(
+ self,
+ table_metadata: TableMetadata,
+ new_schema: Schema,
+ new_spec: PartitionSpec,
+ new_sort_order: SortOrder,
+ new_location: str,
+ new_properties: Properties,
+ ) -> None:
+ """Set the initial changes that transform the existing table into the
replacement.
+
+ Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` /
`SetDefaultSortOrder`
+ (even when the resulting id is reused) so the request body
unambiguously signals a
+ replace. Bumps `format-version` when the new properties request it.
+ """
+ # Upgrade format-version if requested via properties.
+ requested_format_version_str =
new_properties.get(TableProperties.FORMAT_VERSION)
+ if requested_format_version_str is not None:
+ requested_format_version = int(requested_format_version_str)
+ if requested_format_version > table_metadata.format_version:
+ self._updates +=
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+ # Remove the main branch ref to clear the current snapshot.
+ self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+ # Schema: reuse an existing schema_id if structurally identical, else
add a new one
+ # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+ existing_schema_id = self._find_matching_schema_id(table_metadata,
new_schema)
+ if existing_schema_id is not None:
+ self._updates +=
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+ else:
+ next_schema_id = max((s.schema_id for s in
table_metadata.schemas), default=-1) + 1
+ schema_with_fresh_id = new_schema.model_copy(update={"schema_id":
next_schema_id})
+ self._updates += (
+ AddSchemaUpdate(schema_=schema_with_fresh_id),
+ SetCurrentSchemaUpdate(schema_id=-1),
+ )
+
+ # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on
add to avoid
+ # collisions with existing specs (AddPartitionSpecUpdate refuses
duplicate IDs).
+ effective_spec = UNPARTITIONED_PARTITION_SPEC if
new_spec.is_unpartitioned() else new_spec
+ existing_spec_id = self._find_matching_spec_id(table_metadata,
effective_spec)
+ if existing_spec_id is not None:
+ self._updates += (SetDefaultSpecUpdate(spec_id=existing_spec_id),)
+ else:
+ next_spec_id = max((s.spec_id for s in
table_metadata.partition_specs), default=-1) + 1
+ spec_with_fresh_id = PartitionSpec(*effective_spec.fields,
spec_id=next_spec_id)
+ self._updates += (
+ AddPartitionSpecUpdate(spec=spec_with_fresh_id),
+ SetDefaultSpecUpdate(spec_id=-1),
+ )
+
+ # Sort order: same reuse-or-add pattern with fresh order_id on add.
+ effective_sort_order = UNSORTED_SORT_ORDER if
new_sort_order.is_unsorted else new_sort_order
+ existing_order_id = self._find_matching_sort_order_id(table_metadata,
effective_sort_order)
+ if existing_order_id is not None:
+ self._updates +=
(SetDefaultSortOrderUpdate(sort_order_id=existing_order_id),)
+ else:
+ next_order_id = max((o.order_id for o in
table_metadata.sort_orders), default=-1) + 1
+ sort_order_with_fresh_id = SortOrder(*effective_sort_order.fields,
order_id=next_order_id)
+ self._updates += (
+ AddSortOrderUpdate(sort_order=sort_order_with_fresh_id),
+ SetDefaultSortOrderUpdate(sort_order_id=-1),
+ )
+
+ # Set location if changed.
+ if new_location != table_metadata.location:
+ self._updates += (SetLocationUpdate(location=new_location),)
+
+ # Merge properties (SetPropertiesUpdate merges onto existing
properties).
+ if new_properties:
+ self._updates += (SetPropertiesUpdate(updates=new_properties),)
Review Comment:
[AI Reviewer Aid] Properties are merged onto existing, matching Java's
[`TableMetadata.Builder.setProperties`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1458-L1467)
which does `properties.putAll(updated)`. Documented in the public docstring —
callers wanting to remove keys must use `replace_table_transaction` and drop
them explicitly inside the txn.
##########
pyiceberg/partitioning.py:
##########
@@ -335,6 +335,175 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec,
old_schema: Schema, fre
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
+def assign_fresh_partition_spec_ids_for_replace(
Review Comment:
[AI Reviewer Aid] Mirrors the v2 path in Java's
[`TableMetadata.reassignPartitionIds`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L649-L667)
— collect `(source_id, transform) -> field_id` across all existing specs,
reuse on match, fresh ids for the rest.
##########
pyiceberg/partitioning.py:
##########
@@ -335,6 +335,175 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec,
old_schema: Schema, fre
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
+def assign_fresh_partition_spec_ids_for_replace(
+ spec: PartitionSpec,
+ old_schema: Schema,
+ fresh_schema: Schema,
+ existing_specs: list[PartitionSpec],
+ last_partition_id: int | None,
+ format_version: int = 2,
+ current_spec: PartitionSpec | None = None,
+) -> tuple[PartitionSpec, int]:
+ """Assign partition field IDs for a replace operation, reusing IDs from
existing specs.
+
+ - For v2+, reuse partition field IDs by `(source_id, transform)` across
all existing specs.
+ New fields get IDs starting from `last_partition_id + 1`.
+ - For v1, the current spec's fields must be preserved (v1 specs are
append-only). Fields
+ absent from the new spec are carried forward with a `VoidTransform`.
Matching new fields
+ reuse the existing partition field ID; remaining new fields are appended
with fresh IDs.
+
+ Args:
+ spec: The new partition spec to assign IDs to. Its `source_id`s
reference `old_schema`.
+ old_schema: The schema that the new spec's `source_id`s reference.
+ fresh_schema: The schema with freshly assigned field IDs.
+ existing_specs: All partition specs from the existing table metadata.
+ last_partition_id: The current table's `last_partition_id`.
+ format_version: Table format version. Required to be set to 1 for v1
carry-forward.
+ current_spec: The current default partition spec. Required when
`format_version <= 1`.
+
+ Returns:
+ A tuple of `(fresh_spec, new_last_partition_id)`.
+ """
+ effective_last_partition_id = last_partition_id if last_partition_id is
not None else PARTITION_FIELD_ID_START - 1
+
+ if format_version <= 1:
+ if current_spec is None:
+ raise ValueError("current_spec is required for v1 replace_table")
+ return _assign_fresh_partition_spec_ids_for_replace_v1(
+ spec, old_schema, fresh_schema, current_spec,
effective_last_partition_id
+ )
+
+ # v2+: reuse field IDs by (source_id, transform) across all specs. When
the same
+ # (source_id, transform) appears in multiple specs, prefer the highest
field_id.
+ transform_to_field_id: dict[tuple[int, str], int] = {}
+ for existing_spec in existing_specs:
+ for field in existing_spec.fields:
+ key = (field.source_id, str(field.transform))
+ if key not in transform_to_field_id or field.field_id >
transform_to_field_id[key]:
+ transform_to_field_id[key] = field.field_id
+
+ next_id = effective_last_partition_id
+ partition_fields = []
+ for field in spec.fields:
+ original_column_name = old_schema.find_column_name(field.source_id)
+ if original_column_name is None:
+ raise ValueError(f"Could not find in old schema: {field}")
+ fresh_field = fresh_schema.find_field(original_column_name)
+ if fresh_field is None:
+ raise ValueError(f"Could not find field in fresh schema:
{original_column_name}")
+
+ validate_partition_name(field.name, field.transform,
fresh_field.field_id, fresh_schema, set())
+
+ key = (fresh_field.field_id, str(field.transform))
+ if key in transform_to_field_id:
+ partition_field_id = transform_to_field_id[key]
+ else:
+ next_id += 1
+ partition_field_id = next_id
+ transform_to_field_id[key] = partition_field_id
+
+ partition_fields.append(
+ PartitionField(
+ name=field.name,
+ source_id=fresh_field.field_id,
+ field_id=partition_field_id,
+ transform=field.transform,
+ )
+ )
+
+ # `next_id` starts at `effective_last_partition_id` and only increments,
so it is the
+ # new last partition id.
+ return PartitionSpec(*partition_fields,
spec_id=INITIAL_PARTITION_SPEC_ID), next_id
+
+
+def _assign_fresh_partition_spec_ids_for_replace_v1(
Review Comment:
[AI Reviewer Aid] Mirrors the v1 path in Java's
[`reassignPartitionIds`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L668-L700).
v1 partition specs are append-only with sequential ids — fields absent from
the new spec must be carried forward as `VoidTransform`, otherwise replace
would be illegal for v1. The unique-name suffix loop in `_unique_void_name`
matches Java's [collision-renaming
pattern](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L687-L692),
generalized to loop further if both `name` and `name_<field_id>` are taken.
##########
pyiceberg/schema.py:
##########
@@ -1380,6 +1380,62 @@ def primitive(self, primitive: PrimitiveType) ->
PrimitiveType:
return primitive
+class _SetFreshIDsForReplace(_SetFreshIDs):
+ """Assign fresh IDs for a replace operation, reusing IDs from the base
schema by field name.
+
+ For each field in the new schema, if a field with the same full name
exists in the
+ base schema, its ID is reused; otherwise a fresh ID is allocated starting
from
+ last_column_id + 1.
+
+ Note: ID reuse is purely name-based — a field whose name matches but whose
type differs
+ (e.g. `int` → `string`) will reuse the base ID. This is intentional:
replace allows
+ arbitrary schema changes; type compatibility is the caller's
responsibility.
+ """
+
+ def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) ->
None:
+ self.old_id_to_new_id: dict[int, int] = {}
+ self._old_id_to_base_id = old_id_to_base_id
+ counter = itertools.count(starting_id + 1)
+ self.next_id_func = lambda: next(counter)
+
+ def _get_and_increment(self, current_id: int) -> int:
+ if current_id in self._old_id_to_base_id:
+ new_id = self._old_id_to_base_id[current_id]
+ else:
+ new_id = self.next_id_func()
+ self.old_id_to_new_id[current_id] = new_id
+ return new_id
+
+
+def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema,
last_column_id: int) -> tuple[Schema, int]:
Review Comment:
[AI Reviewer Aid] Maps to Java's [`TypeUtil.assignFreshIds(Schema schema,
Schema baseSchema, NextID
nextId)`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/api/src/main/java/org/apache/iceberg/types/TypeUtil.java#L310-L315)
and its [`AssignFreshIds`
visitor](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/api/src/main/java/org/apache/iceberg/types/AssignFreshIds.java#L27-L84).
Name-based reuse from the **current** schema only (not the full history) —
matches Java's behaviour. Type compatibility is the caller's responsibility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]