Fokko commented on code in PR #61: URL: https://github.com/apache/iceberg-python/pull/61#discussion_r1383329498
########## pyiceberg/table/snapshots.py: ########## @@ -116,3 +144,199 @@ class MetadataLogEntry(IcebergBaseModel): class SnapshotLogEntry(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +class SnapshotSummaryCollector: + added_size: int + removed_size: int + added_files: int + removed_files: int + added_eq_delete_files: int + removed_eq_delete_files: int + added_pos_delete_files: int + removed_pos_delete_files: int + added_delete_files: int + removed_delete_files: int + added_records: int + deleted_records: int + added_pos_deletes: int + removed_pos_deletes: int + added_eq_deletes: int + removed_eq_deletes: int + + def __init__(self) -> None: + self.added_size = 0 + self.removed_size = 0 + self.added_files = 0 + self.removed_files = 0 + self.added_eq_delete_files = 0 + self.removed_eq_delete_files = 0 + self.added_pos_delete_files = 0 + self.removed_pos_delete_files = 0 + self.added_delete_files = 0 + self.removed_delete_files = 0 + self.added_records = 0 + self.deleted_records = 0 + self.added_pos_deletes = 0 + self.removed_pos_deletes = 0 + self.added_eq_deletes = 0 + self.removed_eq_deletes = 0 + + def add_file(self, data_file: DataFile) -> None: + if data_file.content == DataFileContent.DATA: + self.added_files += 1 + self.added_records += data_file.record_count + self.added_size += data_file.file_size_in_bytes + elif data_file.content == DataFileContent.POSITION_DELETES: + self.added_delete_files += 1 + self.added_pos_delete_files += 1 + self.added_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.added_delete_files += 1 + self.added_eq_delete_files += 1 + self.added_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def removed_file(self, data_file: DataFile) -> None: + if data_file.content == DataFileContent.DATA: + self.removed_files += 1 + self.deleted_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.removed_delete_files += 1 + self.removed_pos_delete_files += 1 + self.removed_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.removed_delete_files += 1 + self.removed_eq_delete_files += 1 + self.removed_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def added_manifest(self, manifest: ManifestFile) -> None: + if manifest.content == ManifestContent.DATA: + self.added_files += manifest.added_files_count or 0 + self.added_records += manifest.added_rows_count or 0 + self.removed_files += manifest.deleted_files_count or 0 + self.deleted_records += manifest.deleted_rows_count or 0 + elif manifest.content == ManifestContent.DELETES: + self.added_delete_files += manifest.added_files_count or 0 + self.removed_delete_files += manifest.deleted_files_count or 0 + else: + raise ValueError(f"Unknown manifest file content: {manifest.content}") + + def build(self) -> Dict[str, str]: + def set_non_zero(properties: Dict[str, str], num: int, property_name: str) -> None: + if num > 0: + properties[property_name] = str(num) + + properties: Dict[str, str] = {} + set_non_zero(properties, self.added_size, 'added-files-size') + set_non_zero(properties, self.removed_size, 'removed-files-size') + set_non_zero(properties, self.added_files, 'added-data-files') + set_non_zero(properties, self.removed_files, 'removed-data-files') + set_non_zero(properties, self.added_eq_delete_files, 'added-equality-delete-files') + set_non_zero(properties, self.removed_eq_delete_files, 'removed-equality-delete-files') + set_non_zero(properties, self.added_pos_delete_files, 'added-position-delete-files') + set_non_zero(properties, self.removed_pos_delete_files, 'removed-position-delete-files') + set_non_zero(properties, self.added_delete_files, 'added-delete-files') + set_non_zero(properties, self.removed_delete_files, 'removed-delete-files') + set_non_zero(properties, self.added_records, 'added-records') + set_non_zero(properties, self.deleted_records, 'deleted-records') + set_non_zero(properties, self.added_pos_deletes, 'added-position-deletes') + set_non_zero(properties, self.removed_pos_deletes, 'removed-position-deletes') + set_non_zero(properties, self.added_eq_deletes, 'added-equality-deletes') + set_non_zero(properties, self.removed_eq_deletes, 'removed-equality-deletes') + + return properties + + +properties = ['records', 'files-size', 'data-files', 'delete-files', 'position-deletes', 'equality-deletes'] + + +def truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: + for prop in { + 'total-data-files', + 'total-delete-files', + 'total-records', + 'total-files-size', + 'total-position-deletes', + 'total-equality-deletes', + }: + summary[prop] = '0' + + if value := previous_summary.get('total-data-files'): + summary['deleted-data-files'] = value + if value := previous_summary.get('total-delete-files'): + summary['removed-delete-files'] = value + if value := previous_summary.get('total-records'): + summary['deleted-records'] = value + if value := previous_summary.get('total-files-size'): + summary['removed-files-size'] = value + if value := previous_summary.get('total-position-deletes'): + summary['removed-position-deletes'] = value + if value := previous_summary.get('total-equality-deletes'): + summary['removed-equality-deletes'] = value + + return summary + + +def merge_snapshot_summaries( + summary: Summary, + previous_summary: Optional[Mapping[str, str]] = None, +) -> Summary: + if summary.operation == Operation.OVERWRITE and previous_summary is not None: + summary = truncate_table_summary(summary, previous_summary) Review Comment: With Spark it is not common to overwrite a full table, but I would expect this with PyIceberg to be more common. Typically you load the whole dataset into memory, wrangle it, and write it back (or at least store a copy of it, to know on which data a model was trained). I've added the flag 👍 ########## pyiceberg/table/snapshots.py: ########## @@ -116,3 +144,199 @@ class MetadataLogEntry(IcebergBaseModel): class SnapshotLogEntry(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +class SnapshotSummaryCollector: + added_size: int + removed_size: int + added_files: int + removed_files: int + added_eq_delete_files: int + removed_eq_delete_files: int + added_pos_delete_files: int + removed_pos_delete_files: int + added_delete_files: int + removed_delete_files: int + added_records: int + deleted_records: int + added_pos_deletes: int + removed_pos_deletes: int + added_eq_deletes: int + removed_eq_deletes: int + + def __init__(self) -> None: + self.added_size = 0 + self.removed_size = 0 + self.added_files = 0 + self.removed_files = 0 + self.added_eq_delete_files = 0 + self.removed_eq_delete_files = 0 + self.added_pos_delete_files = 0 + self.removed_pos_delete_files = 0 + self.added_delete_files = 0 + self.removed_delete_files = 0 + self.added_records = 0 + self.deleted_records = 0 + self.added_pos_deletes = 0 + self.removed_pos_deletes = 0 + self.added_eq_deletes = 0 + self.removed_eq_deletes = 0 + + def add_file(self, data_file: DataFile) -> None: + if data_file.content == DataFileContent.DATA: + self.added_files += 1 + self.added_records += data_file.record_count + self.added_size += data_file.file_size_in_bytes + elif data_file.content == DataFileContent.POSITION_DELETES: + self.added_delete_files += 1 + self.added_pos_delete_files += 1 + self.added_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.added_delete_files += 1 + self.added_eq_delete_files += 1 + self.added_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def removed_file(self, data_file: DataFile) -> None: + if data_file.content == DataFileContent.DATA: + self.removed_files += 1 + self.deleted_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.removed_delete_files += 1 + self.removed_pos_delete_files += 1 + self.removed_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.removed_delete_files += 1 + self.removed_eq_delete_files += 1 + self.removed_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def added_manifest(self, manifest: ManifestFile) -> None: + if manifest.content == ManifestContent.DATA: + self.added_files += manifest.added_files_count or 0 + self.added_records += manifest.added_rows_count or 0 + self.removed_files += manifest.deleted_files_count or 0 + self.deleted_records += manifest.deleted_rows_count or 0 + elif manifest.content == ManifestContent.DELETES: + self.added_delete_files += manifest.added_files_count or 0 + self.removed_delete_files += manifest.deleted_files_count or 0 + else: + raise ValueError(f"Unknown manifest file content: {manifest.content}") + + def build(self) -> Dict[str, str]: + def set_non_zero(properties: Dict[str, str], num: int, property_name: str) -> None: + if num > 0: + properties[property_name] = str(num) + + properties: Dict[str, str] = {} + set_non_zero(properties, self.added_size, 'added-files-size') + set_non_zero(properties, self.removed_size, 'removed-files-size') + set_non_zero(properties, self.added_files, 'added-data-files') + set_non_zero(properties, self.removed_files, 'removed-data-files') + set_non_zero(properties, self.added_eq_delete_files, 'added-equality-delete-files') + set_non_zero(properties, self.removed_eq_delete_files, 'removed-equality-delete-files') + set_non_zero(properties, self.added_pos_delete_files, 'added-position-delete-files') + set_non_zero(properties, self.removed_pos_delete_files, 'removed-position-delete-files') + set_non_zero(properties, self.added_delete_files, 'added-delete-files') + set_non_zero(properties, self.removed_delete_files, 'removed-delete-files') + set_non_zero(properties, self.added_records, 'added-records') + set_non_zero(properties, self.deleted_records, 'deleted-records') + set_non_zero(properties, self.added_pos_deletes, 'added-position-deletes') + set_non_zero(properties, self.removed_pos_deletes, 'removed-position-deletes') + set_non_zero(properties, self.added_eq_deletes, 'added-equality-deletes') + set_non_zero(properties, self.removed_eq_deletes, 'removed-equality-deletes') + + return properties + + +properties = ['records', 'files-size', 'data-files', 'delete-files', 'position-deletes', 'equality-deletes'] + + +def truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: + for prop in { + 'total-data-files', + 'total-delete-files', + 'total-records', + 'total-files-size', + 'total-position-deletes', + 'total-equality-deletes', + }: + summary[prop] = '0' + + if value := previous_summary.get('total-data-files'): + summary['deleted-data-files'] = value + if value := previous_summary.get('total-delete-files'): + summary['removed-delete-files'] = value + if value := previous_summary.get('total-records'): + summary['deleted-records'] = value + if value := previous_summary.get('total-files-size'): + summary['removed-files-size'] = value + if value := previous_summary.get('total-position-deletes'): + summary['removed-position-deletes'] = value + if value := previous_summary.get('total-equality-deletes'): + summary['removed-equality-deletes'] = value + + return summary + + +def merge_snapshot_summaries( + summary: Summary, + previous_summary: Optional[Mapping[str, str]] = None, +) -> Summary: + if summary.operation == Operation.OVERWRITE and previous_summary is not None: + summary = truncate_table_summary(summary, previous_summary) Review Comment: With Spark it is not common to overwrite a full table, but I would expect this with PyIceberg to be more common. Typically you load the whole dataset into memory, wrangle it, and write it back (or at least store a copy of it, to know on which data a model was trained). I've added the flag 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org