rdblue commented on code in PR #61: URL: https://github.com/apache/iceberg-python/pull/61#discussion_r1416461866
########## pyiceberg/table/snapshots.py: ########## @@ -116,3 +168,193 @@ class MetadataLogEntry(IcebergBaseModel): class SnapshotLogEntry(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +class SnapshotSummaryCollector: + added_size: int + removed_size: int + added_files: int + removed_files: int + added_eq_delete_files: int + removed_eq_delete_files: int + added_pos_delete_files: int + removed_pos_delete_files: int + added_delete_files: int + removed_delete_files: int + added_records: int + deleted_records: int + added_pos_deletes: int + removed_pos_deletes: int + added_eq_deletes: int + removed_eq_deletes: int + + def __init__(self) -> None: + self.added_size = 0 + self.removed_size = 0 + self.added_files = 0 + self.removed_files = 0 + self.added_eq_delete_files = 0 + self.removed_eq_delete_files = 0 + self.added_pos_delete_files = 0 + self.removed_pos_delete_files = 0 + self.added_delete_files = 0 + self.removed_delete_files = 0 + self.added_records = 0 + self.deleted_records = 0 + self.added_pos_deletes = 0 + self.removed_pos_deletes = 0 + self.added_eq_deletes = 0 + self.removed_eq_deletes = 0 + + def add_file(self, data_file: DataFile) -> None: + self.added_size += data_file.file_size_in_bytes + + if data_file.content == DataFileContent.DATA: + self.added_files += 1 + self.added_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.added_delete_files += 1 + self.added_pos_delete_files += 1 + self.added_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.added_delete_files += 1 + self.added_eq_delete_files += 1 + self.added_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def remove_file(self, data_file: DataFile) -> None: + self.removed_size += data_file.file_size_in_bytes + + if data_file.content == DataFileContent.DATA: + self.removed_files += 1 + self.deleted_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.removed_delete_files += 1 + self.removed_pos_delete_files += 1 + self.removed_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.removed_delete_files += 1 + self.removed_eq_delete_files += 1 + self.removed_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def build(self) -> Dict[str, str]: + def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: + if num > 0: + properties[property_name] = str(num) + + properties: Dict[str, str] = {} + set_when_positive(properties, self.added_size, ADDED_FILE_SIZE) + set_when_positive(properties, self.removed_size, REMOVED_FILE_SIZE) + set_when_positive(properties, self.added_files, ADDED_DATA_FILES) + set_when_positive(properties, self.removed_files, DELETED_DATA_FILES) + set_when_positive(properties, self.added_eq_delete_files, EQUALITY_DELETE_FILES) + set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) + set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) + set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) + set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES) + set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES) + set_when_positive(properties, self.added_records, ADDED_RECORDS) + set_when_positive(properties, self.deleted_records, DELETED_RECORDS) + set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES) + set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES) + set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES) + set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES) + + return properties + + +def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: + for prop in { + TOTAL_DATA_FILES, + TOTAL_DELETE_FILES, + TOTAL_RECORDS, + TOTAL_FILE_SIZE, + TOTAL_POSITION_DELETES, + TOTAL_EQUALITY_DELETES, + }: + summary[prop] = '0' + + if value := previous_summary.get(TOTAL_DATA_FILES): + summary[DELETED_DATA_FILES] = value + if value := previous_summary.get(TOTAL_DELETE_FILES): + summary[REMOVED_DELETE_FILES] = value + if value := previous_summary.get(TOTAL_RECORDS): + summary[DELETED_RECORDS] = value + if value := previous_summary.get(TOTAL_FILE_SIZE): + summary[REMOVED_FILE_SIZE] = value + if value := previous_summary.get(TOTAL_POSITION_DELETES): + summary[REMOVED_POSITION_DELETES] = value + if value := previous_summary.get(TOTAL_EQUALITY_DELETES): + summary[REMOVED_EQUALITY_DELETES] = value + + return summary + + +def _merge_snapshot_summaries( Review Comment: Is this really a merge? To me, a merge is like adding two summaries together. This is actually updating from a previous snapshot summary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org