Fokko commented on code in PR #1661: URL: https://github.com/apache/iceberg-python/pull/1661#discussion_r1987938857
########## pyiceberg/table/update/snapshot.py: ########## @@ -477,6 +478,20 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] +@dataclass(init=False) Review Comment: What do you think of making this `Frozen`? This gives some nice benefits like being hashable: https://docs.python.org/3/library/dataclasses.html#frozen-instances Using the `default_factory'`s, we can also drop the init. ```suggestion @dataclass(frozen=True) ``` ########## pyiceberg/manifest.py: ########## @@ -861,6 +865,7 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file ) ) + self._existing_files += 1 Review Comment: I believe we already do this here: https://github.com/apache/iceberg-python/blob/764880364c94fbc4d29a0677350463de1d94e75c/pyiceberg/manifest.py#L823 ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) + + props = { + "manifests-kept": str(len([])), + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": str(len([])), Review Comment: ```suggestion "manifests-kept": "0", "manifests-created": str(len(self.added_manifests)), "manifests-replaced": str(len(self.rewritten_manifests)), "entries-processed": "0" ``` ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec Review Comment: I don't think these are used. ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) + + props = { + "manifests-kept": str(len([])), + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": str(len([])), + } + previous_snapshot = ( + self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if self._parent_snapshot_id is not None + else None + ) + + return update_snapshot_summaries( + summary=Summary(operation=self._operation, **ssc.build(), **props), + previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, + truncate_full_table=False, + ) + + def rewrite_manifests(self) -> RewriteManifestsResult: + data_result = self._find_matching_manifests(ManifestContent.DATA) + + self.rewritten_manifests.extend(data_result.rewritten_manifests) + self.added_manifests.extend(data_result.added_manifests) + + deletes_result = self._find_matching_manifests(ManifestContent.DELETES) + self.rewritten_manifests.extend(deletes_result.rewritten_manifests) + self.added_manifests.extend(deletes_result.added_manifests) + + if not self.rewritten_manifests: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + + return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests) + + def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifestsResult: + snapshot = self._table.current_snapshot() + if self._spec_id and self._spec_id not in self._table.specs(): + raise ValueError(f"Cannot find spec with id: {self._spec_id}") + + if not snapshot: + raise ValueError("Cannot rewrite manifests without a current snapshot") Review Comment: How about pushing this to thei `__init__` so we don't have to do this every time ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) + + props = { + "manifests-kept": str(len([])), + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": str(len([])), + } + previous_snapshot = ( + self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if self._parent_snapshot_id is not None + else None + ) + + return update_snapshot_summaries( + summary=Summary(operation=self._operation, **ssc.build(), **props), + previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, + truncate_full_table=False, + ) + + def rewrite_manifests(self) -> RewriteManifestsResult: + data_result = self._find_matching_manifests(ManifestContent.DATA) + + self.rewritten_manifests.extend(data_result.rewritten_manifests) + self.added_manifests.extend(data_result.added_manifests) + + deletes_result = self._find_matching_manifests(ManifestContent.DELETES) + self.rewritten_manifests.extend(deletes_result.rewritten_manifests) + self.added_manifests.extend(deletes_result.added_manifests) + + if not self.rewritten_manifests: Review Comment: nit, just for clarity that it is a list: ```suggestion if len(self.rewritten_manifests) == 0: ``` ########## tests/integration/test_writes/test_rewrite_manifests.py: ########## @@ -0,0 +1,249 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +from typing import List + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.manifest import ManifestFile +from pyiceberg.table import TableProperties +from utils import _create_table + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v1_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v2_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_without_data]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + with tbl.transaction() as tx: + tx.upgrade_table_version(format_version=2) + + tbl.append(arrow_table_with_null) + + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +# @pytest.mark.integration +# def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: +# identifier = "default.arrow_table_summaries" +# tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) +# tbl.append(arrow_table_with_null) +# tbl.append(arrow_table_with_null) +# +# # tbl.rewrite_manifests() +# +# # records1 = [ThreeColumnRecord(1, None, "AAAA")] +# # write_records(spark, table_location, records1) +# before_pandas = tbl.scan().to_pandas() +# before_count = before_pandas.shape[0] +# tbl.refresh() +# manifests = tbl.inspect.manifests().to_pylist() +# assert len(manifests) == 2, "Should have 2 manifests before rewrite" +# +# tbl.rewrite_manifests() +# tbl.refresh() +# +# after_pandas = tbl.scan().to_pandas() +# after_count = before_pandas.shape[0] +# manifests = tbl.inspect.manifests().to_pylist() +# assert len(manifests) == 1, "Should have 1 manifests before rewrite" +# +# snaps = tbl.inspect.snapshots().to_pandas() +# print(snaps) + + +@pytest.mark.integration +def test_rewrite_manifests_empty_table(session_catalog: Catalog) -> None: + # Create an unpartitioned table + identifier = "default.test_rewrite_manifests_empty_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + + assert tbl.current_snapshot() is None, "Table must be empty" + + # Execute rewrite manifests action + tbl.rewrite_manifests() + + tbl.refresh() + assert tbl.current_snapshot() is None, "Table must stay empty" + + +@pytest.mark.integration +def test_rewrite_small_manifests_non_partitioned_table(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_rewrite_small_manifests_non_partitioned_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + tbl.refresh() + + manifests = tbl.inspect.manifests() + assert len(manifests) == 2, "Should have 2 manifests before rewrite" + + result = tbl.rewrite_manifests() + + assert len(result.rewritten_manifests) == 2, "Action should rewrite 2 manifests" + assert len(result.added_manifests) == 1, "Action should add 1 manifest" + + tbl.refresh() + + current_snapshot = tbl.current_snapshot() + if not current_snapshot: + raise AssertionError + new_manifests = current_snapshot.manifests(tbl.io) + assert len(new_manifests) == 1, "Should have 1 manifest after rewrite" + assert new_manifests[0].existing_files_count == 2, "Should have 4 files in the new manifest" + assert new_manifests[0].added_files_count == 0, "Should have no added files in the new manifest" + assert new_manifests[0].deleted_files_count == 0, "Should have no deleted files in the new manifest" + + # Validate the records + expected_records_count = arrow_table_with_null.shape[0] * 2 + result_df = tbl.scan().to_pandas() + actual_records_count = result_df.shape[0] + assert expected_records_count == actual_records_count, "Rows must match" + + +def compute_manifest_entry_size_bytes(manifests: List[ManifestFile]) -> float: + total_size = 0 + num_entries = 0 + + for manifest in manifests: + total_size += manifest.manifest_length + num_entries += manifest.added_files_count + manifest.existing_files_count + manifest.deleted_files_count + + return total_size / num_entries if num_entries > 0 else 0 + + +def test_rewrite_small_manifests_partitioned_table(session_catalog: Catalog) -> None: + records1 = pa.Table.from_pydict({"c1": [1, 1], "c2": [None, "BBBBBBBBBB"], "c3": ["AAAA", "BBBB"]}) + + records2 = records2 = pa.Table.from_pydict({"c1": [2, 2], "c2": ["CCCCCCCCCC", "DDDDDDDDDD"], "c3": ["CCCC", "DDDD"]}) + + records3 = records3 = pa.Table.from_pydict({"c1": [3, 3], "c2": ["EEEEEEEEEE", "FFFFFFFFFF"], "c3": ["EEEE", "FFFF"]}) + + records4 = records4 = pa.Table.from_pydict({"c1": [4, 4], "c2": ["GGGGGGGGGG", "HHHHHHHHHG"], "c3": ["GGGG", "HHHH"]}) + + schema = pa.schema( + [ + ("c1", pa.int64()), + ("c2", pa.string()), + ("c3", pa.string()), + ] + ) + + identifier = "default.test_rewrite_small_manifests_non_partitioned_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=schema) + + tbl.append(records1) + tbl.append(records2) + tbl.append(records3) + tbl.append(records4) + tbl.refresh() + + tbl.refresh() + manifests = tbl.current_snapshot().manifests(tbl.io) + assert len(manifests) == 4, "Should have 4 manifests before rewrite" + + # manifest_entry_size_bytes = compute_manifest_entry_size_bytes(manifests) + target_manifest_size_bytes = 5200 * 2 + 100 + tbl = ( + tbl.transaction() + .set_properties({TableProperties.MANIFEST_TARGET_SIZE_BYTES: str(target_manifest_size_bytes)}) + .commit_transaction() + ) + + result = tbl.rewrite_manifests() + + tbl.refresh() + assert len(result.rewritten_manifests) == 4, "Action should rewrite 4 manifests" + assert len(result.added_manifests) == 2, "Action should add 2 manifests" + + new_manifests = tbl.current_snapshot().manifests(tbl.io) Review Comment: Nothing wrong with this, but `tbl.inspect` might also be helpful to assert the manifests. ########## pyiceberg/table/update/snapshot.py: ########## @@ -477,6 +478,20 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] +@dataclass(init=False) +class RewriteManifestsResult: + rewritten_manifests: List[ManifestFile] + added_manifests: List[ManifestFile] Review Comment: How about slapping on some default factories: https://docs.python.org/3/library/dataclasses.html#dataclasses.field ```suggestion rewritten_manifests: List[ManifestFile] = field(default_factory=list) added_manifests: List[ManifestFile] = field(default_factory=list) ``` ########## pyiceberg/table/__init__.py: ########## @@ -223,6 +219,9 @@ class TableProperties: MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep" MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1 + SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled" + SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = False Review Comment: These seem unused, and I don't think we want to add this to PyIceberg. This was introduced in Java to test V2 features on a V1 table. ########## tests/integration/test_writes/test_rewrite_manifests.py: ########## @@ -0,0 +1,249 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +from typing import List + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.manifest import ManifestFile +from pyiceberg.table import TableProperties +from utils import _create_table + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v1_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v2_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_without_data]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + with tbl.transaction() as tx: + tx.upgrade_table_version(format_version=2) + + tbl.append(arrow_table_with_null) + + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" Review Comment: What are we testing here? I would love to see if we could rewrite the V1 and V2 manifest into a V2 one. ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) + + props = { + "manifests-kept": str(len([])), + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": str(len([])), + } + previous_snapshot = ( + self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if self._parent_snapshot_id is not None + else None + ) + + return update_snapshot_summaries( + summary=Summary(operation=self._operation, **ssc.build(), **props), + previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, + truncate_full_table=False, + ) + + def rewrite_manifests(self) -> RewriteManifestsResult: + data_result = self._find_matching_manifests(ManifestContent.DATA) + + self.rewritten_manifests.extend(data_result.rewritten_manifests) + self.added_manifests.extend(data_result.added_manifests) + + deletes_result = self._find_matching_manifests(ManifestContent.DELETES) + self.rewritten_manifests.extend(deletes_result.rewritten_manifests) + self.added_manifests.extend(deletes_result.added_manifests) + + if not self.rewritten_manifests: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + + return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests) + + def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifestsResult: + snapshot = self._table.current_snapshot() + if self._spec_id and self._spec_id not in self._table.specs(): + raise ValueError(f"Cannot find spec with id: {self._spec_id}") + + if not snapshot: + raise ValueError("Cannot rewrite manifests without a current snapshot") + + manifests = [ + manifest + for manifest in snapshot.manifests(io=self._io) + if manifest.partition_spec_id == self._spec_id and manifest.content == content + ] + + data_manifest_merge_manager = _ManifestMergeManager( + target_size_bytes=self._target_size_bytes, + min_count_to_merge=2, + merge_enabled=True, + snapshot_producer=self, + ) + new_manifests = data_manifest_merge_manager.merge_manifests(manifests=manifests) + + return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests) + + def _copy_manifest_file(self, manifest_file: ManifestFile, snapshot_id: int) -> ManifestFile: + return ManifestFile( + manifest_path=manifest_file.manifest_path, + manifest_length=manifest_file.manifest_length, + partition_spec_id=manifest_file.partition_spec_id, + content=manifest_file.content, + sequence_number=manifest_file.sequence_number, + min_sequence_number=manifest_file.min_sequence_number, + added_snapshot_id=snapshot_id, + added_files_count=manifest_file.added_files_count, + existing_files_count=manifest_file.existing_files_count, + deleted_files_count=manifest_file.deleted_files_count, + added_rows_count=manifest_file.added_rows_count, + existing_rows_count=manifest_file.existing_rows_count, + deleted_rows_count=manifest_file.deleted_rows_count, + partitions=manifest_file.partitions, + key_metadata=manifest_file.key_metadata, + ) + + def __exit__(self, _: Any, value: Any, traceback: Any) -> None: + """Commit only if we have rewritten any manifests.""" + if self.rewritten_manifests: + self.commit() + + def _existing_manifests(self) -> List[ManifestFile]: + """Determine if there are any existing manifest files.""" + return [self._copy_manifest_file(manifest, self.snapshot_id) for manifest in self.added_manifests] + + def _deleted_entries(self) -> List[ManifestEntry]: + """To determine if we need to record any deleted manifest entries. + + In case of an append, nothing is deleted. Review Comment: Copy paste :) ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) + + props = { + "manifests-kept": str(len([])), + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": str(len([])), + } + previous_snapshot = ( + self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if self._parent_snapshot_id is not None + else None + ) + + return update_snapshot_summaries( + summary=Summary(operation=self._operation, **ssc.build(), **props), + previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, + truncate_full_table=False, + ) + + def rewrite_manifests(self) -> RewriteManifestsResult: + data_result = self._find_matching_manifests(ManifestContent.DATA) + + self.rewritten_manifests.extend(data_result.rewritten_manifests) + self.added_manifests.extend(data_result.added_manifests) + + deletes_result = self._find_matching_manifests(ManifestContent.DELETES) + self.rewritten_manifests.extend(deletes_result.rewritten_manifests) + self.added_manifests.extend(deletes_result.added_manifests) + + if not self.rewritten_manifests: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + + return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests) + + def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifestsResult: + snapshot = self._table.current_snapshot() + if self._spec_id and self._spec_id not in self._table.specs(): + raise ValueError(f"Cannot find spec with id: {self._spec_id}") + + if not snapshot: + raise ValueError("Cannot rewrite manifests without a current snapshot") + + manifests = [ + manifest + for manifest in snapshot.manifests(io=self._io) + if manifest.partition_spec_id == self._spec_id and manifest.content == content + ] Review Comment: For a followup it might be worthwile to see if we can cache this result, since we're going over the manifests twice (once for data, once for delete). ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) Review Comment: It looks like we're cloning this logic, I've created an issue to resolve this in another PR: https://github.com/apache/iceberg-python/issues/1779 ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id Review Comment: Should we also add these at the class level? Just below 546. ########## pyiceberg/table/update/snapshot.py: ########## @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) + + props = { + "manifests-kept": str(len([])), + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": str(len([])), + } + previous_snapshot = ( + self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if self._parent_snapshot_id is not None + else None + ) + + return update_snapshot_summaries( + summary=Summary(operation=self._operation, **ssc.build(), **props), + previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, + truncate_full_table=False, + ) + + def rewrite_manifests(self) -> RewriteManifestsResult: + data_result = self._find_matching_manifests(ManifestContent.DATA) + + self.rewritten_manifests.extend(data_result.rewritten_manifests) + self.added_manifests.extend(data_result.added_manifests) + + deletes_result = self._find_matching_manifests(ManifestContent.DELETES) + self.rewritten_manifests.extend(deletes_result.rewritten_manifests) + self.added_manifests.extend(deletes_result.added_manifests) + + if not self.rewritten_manifests: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + + return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests) + + def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifestsResult: + snapshot = self._table.current_snapshot() + if self._spec_id and self._spec_id not in self._table.specs(): + raise ValueError(f"Cannot find spec with id: {self._spec_id}") + + if not snapshot: + raise ValueError("Cannot rewrite manifests without a current snapshot") + + manifests = [ + manifest + for manifest in snapshot.manifests(io=self._io) + if manifest.partition_spec_id == self._spec_id and manifest.content == content + ] + + data_manifest_merge_manager = _ManifestMergeManager( + target_size_bytes=self._target_size_bytes, + min_count_to_merge=2, + merge_enabled=True, Review Comment: I think in Java we set the properties from the table properties: https://github.com/apache/iceberg/blob/6e8718113c08aebf76d8e79a9e2534c89c73407a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L113-L119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org