smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3261232343
##########
mkdocs/docs/api.md:
##########
@@ -185,6 +185,45 @@ with
catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
```
+## Replace a table
+
+Atomically replace an existing table's schema, partition spec, sort order,
location, and properties. The table UUID and history (snapshots, schemas,
specs, sort orders, metadata log) are preserved; the current snapshot is
cleared (the `main` branch ref is removed). Use this when you want to redefine
the table's metadata; pair it with `replace_table_transaction` to atomically
write new data alongside the metadata change (RTAS-style).
+
+```python
+from pyiceberg.schema import Schema
+from pyiceberg.types import NestedField, LongType, StringType, BooleanType
+
+new_schema = Schema(
+ NestedField(field_id=1, name="datetime", field_type=LongType(),
required=False),
+ NestedField(field_id=2, name="symbol", field_type=StringType(),
required=False),
+ NestedField(field_id=3, name="active", field_type=BooleanType(),
required=False),
+)
+catalog.replace_table(
+ identifier="docs_example.bids",
+ schema=new_schema,
+)
+```
+
+Field IDs from columns whose names appear in the previous schema are reused,
so existing data files remain readable when the new schema is a compatible
superset. New columns get fresh IDs above `last-column-id`.
+
+Properties passed to `replace_table` are **merged** with the existing table
properties (your values override; existing keys you don't pass are preserved).
To remove a property as part of the replace, use `replace_table_transaction`
and remove it explicitly within the transaction.
+
+Use `replace_table_transaction` to stage additional changes (writes, property
updates, schema evolution) before committing — for example, swap the schema and
write new data atomically:
+
+```python
+with catalog.replace_table_transaction(identifier="docs_example.bids",
schema=new_schema) as txn:
+ with txn.update_snapshot().fast_append() as snap:
+ for data_file in
_dataframe_to_data_files(table_metadata=txn.table_metadata, df=df,
io=txn._table.io):
+ snap.append_data_file(data_file)
+ txn.set_properties(write_replaced_at="2026-04-19T00:00:00Z")
Review Comment:
Surely we should be showing / testing an example where you have a pyarrow
table that you want to replace your table with, so you do
`replace_table_transaction` with that Arrow table's schema and do append on the
transaction to achieve it? this feels like the most common use case by far, no?
##########
tests/test_schema.py:
##########
@@ -1815,3 +1816,77 @@ def
test_check_schema_compatible_optional_map_field_present() -> None:
)
# Should not raise - schemas match
_check_schema_compatible(requested_schema, provided_schema)
+
+
[email protected](
+ "new_fields, expected_ids, expected_last_col_id",
+ [
+ # All columns reused by name: IDs come from base, last_column_id
unchanged.
+ ([("id", IntegerType()), ("data", StringType())], [1, 2], 2),
+ # Mix of reused and new: new column gets ID above last_column_id.
+ ([("id", IntegerType()), ("data", StringType()), ("new_col",
BooleanType())], [1, 2, 3], 3),
+ # No column names match: all fresh IDs starting from last_column_id +
1.
+ ([("x", IntegerType()), ("y", IntegerType())], [3, 4], 4),
+ ],
+ ids=[
+ "all-reused-keeps-last-col-id",
+ "new-field-bumps-last-col-id",
+ "no-name-overlap-bumps-from-base",
+ ],
+)
+def test_assign_fresh_schema_ids_for_replace_primitive_fields(
+ new_fields: list[tuple[str, IcebergType]], expected_ids: list[int],
expected_last_col_id: int
+) -> None:
+ """Replace schema field IDs are reused from the base schema by name; new
fields get IDs above last_column_id."""
+ base_schema = Schema(
+ NestedField(field_id=1, name="id", field_type=IntegerType(),
required=False),
+ NestedField(field_id=2, name="data", field_type=StringType(),
required=False),
+ )
+ new_schema = Schema(
+ *(
+ NestedField(field_id=10 * (i + 1), name=name,
field_type=field_type, required=False)
+ for i, (name, field_type) in enumerate(new_fields)
+ )
+ )
+ fresh, last_col_id = assign_fresh_schema_ids_for_replace(new_schema,
base_schema, 2)
+ assert [f.field_id for f in fresh.fields] == expected_ids
+ assert last_col_id == expected_last_col_id
+
+
+def test_assign_fresh_schema_ids_for_replace_with_nested_struct() -> None:
+ """Test that nested struct field IDs are reused by full path name."""
+ base_schema = Schema(
+ NestedField(field_id=1, name="id", field_type=IntegerType(),
required=False),
+ NestedField(
+ field_id=2,
+ name="location",
+ field_type=StructType(
+ NestedField(field_id=3, name="lat", field_type=FloatType(),
required=False),
+ NestedField(field_id=4, name="lon", field_type=FloatType(),
required=False),
+ ),
+ required=False,
+ ),
+ )
+ new_schema = Schema(
+ NestedField(field_id=10, name="id", field_type=IntegerType(),
required=False),
+ NestedField(
+ field_id=20,
+ name="location",
+ field_type=StructType(
+ NestedField(field_id=30, name="lat", field_type=FloatType(),
required=False),
+ NestedField(field_id=40, name="lon", field_type=FloatType(),
required=False),
+ NestedField(field_id=50, name="alt", field_type=FloatType(),
required=False),
+ ),
+ required=False,
+ ),
+ )
+ fresh, last_col_id = assign_fresh_schema_ids_for_replace(new_schema,
base_schema, 4)
+ assert fresh.fields[0].field_id == 1 # id reused
+ assert fresh.fields[1].field_id == 2 # location reused
+ loc_fields = fresh.fields[1].field_type.fields
+ assert loc_fields[0].field_id == 3 # location.lat reused
+ assert loc_fields[1].field_id == 4 # location.lon reused
+ assert loc_fields[2].field_id == 5 # location.alt is new
+ assert last_col_id == 5
+
+
Review Comment:
Ran `make lint` — picked up trailing whitespace + ruff format issues. Now
passing cleanly.
##########
mkdocs/docs/api.md:
##########
@@ -185,6 +185,45 @@ with
catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
```
+## Replace a table
+
+Atomically replace an existing table's schema, partition spec, sort order,
location, and properties. The table UUID and history (snapshots, schemas,
specs, sort orders, metadata log) are preserved; the current snapshot is
cleared (the `main` branch ref is removed). Use this when you want to redefine
the table's metadata; pair it with `replace_table_transaction` to atomically
write new data alongside the metadata change (RTAS-style).
Review Comment:
Adopted your wording with light tweaks ("lets you write new data alongside
this change" instead of the slightly clunky "allows for new data to be
written").
##########
mkdocs/docs/api.md:
##########
@@ -185,6 +185,45 @@ with
catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
```
+## Replace a table
+
+Atomically replace an existing table's schema, partition spec, sort order,
location, and properties. The table UUID and history (snapshots, schemas,
specs, sort orders, metadata log) are preserved; the current snapshot is
cleared (the `main` branch ref is removed). Use this when you want to redefine
the table's metadata; pair it with `replace_table_transaction` to atomically
write new data alongside the metadata change (RTAS-style).
+
+```python
+from pyiceberg.schema import Schema
+from pyiceberg.types import NestedField, LongType, StringType, BooleanType
+
+new_schema = Schema(
+ NestedField(field_id=1, name="datetime", field_type=LongType(),
required=False),
+ NestedField(field_id=2, name="symbol", field_type=StringType(),
required=False),
+ NestedField(field_id=3, name="active", field_type=BooleanType(),
required=False),
+)
+catalog.replace_table(
+ identifier="docs_example.bids",
+ schema=new_schema,
+)
+```
+
+Field IDs from columns whose names appear in the previous schema are reused,
so existing data files remain readable when the new schema is a compatible
superset. New columns get fresh IDs above `last-column-id`.
+
+Properties passed to `replace_table` are **merged** with the existing table
properties (your values override; existing keys you don't pass are preserved).
To remove a property as part of the replace, use `replace_table_transaction`
and remove it explicitly within the transaction.
+
+Use `replace_table_transaction` to stage additional changes (writes, property
updates, schema evolution) before committing — for example, swap the schema and
write new data atomically:
+
+```python
+with catalog.replace_table_transaction(identifier="docs_example.bids",
schema=new_schema) as txn:
+ with txn.update_snapshot().fast_append() as snap:
+ for data_file in
_dataframe_to_data_files(table_metadata=txn.table_metadata, df=df,
io=txn._table.io):
+ snap.append_data_file(data_file)
+ txn.set_properties(write_replaced_at="2026-04-19T00:00:00Z")
Review Comment:
You're right — switched to `txn.append(df)` everywhere (docs + RTAS tests,
unit + integration). Also dropped the parallel `new_schema = Schema(...)`
construction; the tests now just pass `df.schema` directly since
`replace_table*` accepts `Schema | pa.Schema`.
##########
tests/catalog/test_catalog_behaviors.py:
##########
@@ -387,6 +387,346 @@ def test_load_table_from_self_identifier(
assert table.metadata == loaded_table.metadata
+_SIMPLE_SCHEMA = Schema(
+ NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+ NestedField(field_id=2, name="data", field_type=StringType(),
required=False),
+)
+
+
+def _create_simple_table(
+ catalog: Catalog,
+ identifier: Identifier,
+ *,
+ schema: Schema = _SIMPLE_SCHEMA,
+ format_version: int = 2,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ properties: dict[str, str] | None = None,
+) -> tuple[Identifier, Schema]:
+ namespace = Catalog.namespace_from(identifier)
+ catalog.create_namespace_if_not_exists(namespace)
+ merged_properties = {"format-version": str(format_version), **(properties
or {})}
+ catalog.create_table(identifier, schema=schema,
partition_spec=partition_spec, properties=merged_properties)
+ return identifier, schema
+
+
+def _simple_data(num_rows: int = 2) -> pa.Table:
+ return pa.Table.from_pydict(
+ {"id": list(range(num_rows)), "data": [chr(ord("a") + i) for i in
range(num_rows)]},
+ schema=pa.schema([pa.field("id", pa.int64()), pa.field("data",
pa.large_string())]),
+ )
+
+
+_REPLACE_SCHEMA = Schema(
+ NestedField(field_id=1, name="id", field_type=LongType(), required=False),
+ NestedField(field_id=2, name="data", field_type=StringType(),
required=False),
+ NestedField(field_id=3, name="extra", field_type=BooleanType(),
required=False),
+)
+
+
+def test_replace_transaction(catalog: Catalog, test_table_identifier:
Identifier) -> None:
+ _, original_schema = _create_simple_table(catalog, test_table_identifier)
+ original = catalog.load_table(test_table_identifier)
+ original.append(_simple_data())
+ original = catalog.load_table(test_table_identifier)
+ old_snapshot_id = original.current_snapshot().snapshot_id # type:
ignore[union-attr]
+ snapshot_log_before = list(original.metadata.snapshot_log)
+ assert len(snapshot_log_before) == 1
+
+ catalog.replace_table_transaction(test_table_identifier,
schema=_REPLACE_SCHEMA).commit_transaction()
+ replaced = catalog.load_table(test_table_identifier)
+
+ # UUID + history preserved, current snapshot cleared, current schema
swapped.
+ assert replaced.metadata.table_uuid == original.metadata.table_uuid
+ assert replaced.metadata.current_snapshot_id is None
+ assert {f.name for f in replaced.schema().fields} == {"id", "data",
"extra"}
+ # Old snapshot kept by identity (not just count), and snapshot_log entries
from before survive.
+ assert any(s.snapshot_id == old_snapshot_id for s in
replaced.metadata.snapshots)
+ assert all(entry in replaced.metadata.snapshot_log for entry in
snapshot_log_before)
+ # Old schema is still in the schemas list alongside the new one.
+ schema_ids = sorted(s.schema_id for s in replaced.metadata.schemas)
+ assert schema_ids == [0, 1]
+ assert replaced.metadata.current_schema_id == 1
+ # Time-travel back to the pre-replace snapshot returns the rows that were
there before.
+ assert
replaced.scan(snapshot_id=old_snapshot_id).to_arrow().equals(_simple_data())
+
+
+def _run_complete_replace(
Review Comment:
[AI Reviewer Aid] The three `test_complete_replace_transaction_*` tests
below share this setup, which together mirror Java's
[`testCompleteReplaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java#L2657-L2731)
— exercises all six `replace_table_transaction` args (schema + spec + sort +
location + properties) with an RTAS append, and asserts history accumulates,
the new snapshot has no parent, and property-merge semantics (keep / override /
add — which Java covers under `testReplaceTransactionProperties*` in the same
file).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]