Fokko commented on code in PR #521: URL: https://github.com/apache/iceberg-python/pull/521#discussion_r1524719927
########## pyiceberg/table/snapshots.py: ########## @@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel): class SnapshotSummaryCollector: - added_file_size: int - removed_file_size: int - added_data_files: int - removed_data_files: int - added_eq_delete_files: int - removed_eq_delete_files: int - added_pos_delete_files: int - removed_pos_delete_files: int - added_delete_files: int - removed_delete_files: int - added_records: int - deleted_records: int - added_pos_deletes: int - removed_pos_deletes: int - added_eq_deletes: int - removed_eq_deletes: int + metrics: UpdateMetrics + partition_metrics: DefaultDict[str, UpdateMetrics] + max_changed_partitions_for_summaries: int def __init__(self) -> None: - self.added_file_size = 0 - self.removed_file_size = 0 - self.added_data_files = 0 - self.removed_data_files = 0 - self.added_eq_delete_files = 0 - self.removed_eq_delete_files = 0 - self.added_pos_delete_files = 0 - self.removed_pos_delete_files = 0 - self.added_delete_files = 0 - self.removed_delete_files = 0 - self.added_records = 0 - self.deleted_records = 0 - self.added_pos_deletes = 0 - self.removed_pos_deletes = 0 - self.added_eq_deletes = 0 - self.removed_eq_deletes = 0 - - def add_file(self, data_file: DataFile) -> None: - self.added_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.added_data_files += 1 - self.added_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.added_delete_files += 1 - self.added_pos_delete_files += 1 - self.added_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.added_delete_files += 1 - self.added_eq_delete_files += 1 - self.added_eq_deletes += data_file.record_count - else: - raise ValueError(f"Unknown data file content: {data_file.content}") - - def remove_file(self, data_file: DataFile) -> None: - self.removed_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.removed_data_files += 1 - self.deleted_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.removed_delete_files += 1 - self.removed_pos_delete_files += 1 - self.removed_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.removed_delete_files += 1 - self.removed_eq_delete_files += 1 - self.removed_eq_deletes += data_file.record_count + self.metrics = UpdateMetrics() + self.partition_metrics = defaultdict(UpdateMetrics) + self.max_changed_partitions_for_summaries = 0 + + def set_partition_summary_limit(self, limit: int) -> None: + self.max_changed_partitions_for_summaries = limit + + def add_file( + self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None Review Comment: Since this is more-or-less internal, I think it makes sense to make the spec and schema required. ########## pyiceberg/table/__init__.py: ########## @@ -2569,9 +2572,21 @@ def _write_delete_manifest() -> List[ManifestFile]: def _summary(self) -> Summary: ssc = SnapshotSummaryCollector() + partition_summary_limit = self._transaction.table_metadata.properties.get( Review Comment: I think they are stored as strings, so maybe we should try to parse it, and raise a ValueError otherwise. ########## tests/integration/test_writes.py: ########## @@ -525,12 +525,15 @@ def test_summaries_with_only_nulls( 'total-records': '2', } - assert summaries[0] == { - 'total-data-files': '0', - 'total-delete-files': '0', + assert summaries[2] == { Review Comment: Nice catch! ########## pyiceberg/table/snapshots.py: ########## @@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel): class SnapshotSummaryCollector: - added_file_size: int - removed_file_size: int - added_data_files: int - removed_data_files: int - added_eq_delete_files: int - removed_eq_delete_files: int - added_pos_delete_files: int - removed_pos_delete_files: int - added_delete_files: int - removed_delete_files: int - added_records: int - deleted_records: int - added_pos_deletes: int - removed_pos_deletes: int - added_eq_deletes: int - removed_eq_deletes: int + metrics: UpdateMetrics + partition_metrics: DefaultDict[str, UpdateMetrics] + max_changed_partitions_for_summaries: int def __init__(self) -> None: - self.added_file_size = 0 - self.removed_file_size = 0 - self.added_data_files = 0 - self.removed_data_files = 0 - self.added_eq_delete_files = 0 - self.removed_eq_delete_files = 0 - self.added_pos_delete_files = 0 - self.removed_pos_delete_files = 0 - self.added_delete_files = 0 - self.removed_delete_files = 0 - self.added_records = 0 - self.deleted_records = 0 - self.added_pos_deletes = 0 - self.removed_pos_deletes = 0 - self.added_eq_deletes = 0 - self.removed_eq_deletes = 0 - - def add_file(self, data_file: DataFile) -> None: - self.added_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.added_data_files += 1 - self.added_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.added_delete_files += 1 - self.added_pos_delete_files += 1 - self.added_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.added_delete_files += 1 - self.added_eq_delete_files += 1 - self.added_eq_deletes += data_file.record_count - else: - raise ValueError(f"Unknown data file content: {data_file.content}") - - def remove_file(self, data_file: DataFile) -> None: - self.removed_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.removed_data_files += 1 - self.deleted_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.removed_delete_files += 1 - self.removed_pos_delete_files += 1 - self.removed_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.removed_delete_files += 1 - self.removed_eq_delete_files += 1 - self.removed_eq_deletes += data_file.record_count + self.metrics = UpdateMetrics() + self.partition_metrics = defaultdict(UpdateMetrics) + self.max_changed_partitions_for_summaries = 0 + + def set_partition_summary_limit(self, limit: int) -> None: + self.max_changed_partitions_for_summaries = limit + + def add_file( + self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None + ) -> None: + self.metrics.add_file(data_file) + if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: Review Comment: The partition is required, and should always be there:  ########## pyiceberg/table/snapshots.py: ########## @@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel): class SnapshotSummaryCollector: - added_file_size: int - removed_file_size: int - added_data_files: int - removed_data_files: int - added_eq_delete_files: int - removed_eq_delete_files: int - added_pos_delete_files: int - removed_pos_delete_files: int - added_delete_files: int - removed_delete_files: int - added_records: int - deleted_records: int - added_pos_deletes: int - removed_pos_deletes: int - added_eq_deletes: int - removed_eq_deletes: int + metrics: UpdateMetrics + partition_metrics: DefaultDict[str, UpdateMetrics] + max_changed_partitions_for_summaries: int def __init__(self) -> None: - self.added_file_size = 0 - self.removed_file_size = 0 - self.added_data_files = 0 - self.removed_data_files = 0 - self.added_eq_delete_files = 0 - self.removed_eq_delete_files = 0 - self.added_pos_delete_files = 0 - self.removed_pos_delete_files = 0 - self.added_delete_files = 0 - self.removed_delete_files = 0 - self.added_records = 0 - self.deleted_records = 0 - self.added_pos_deletes = 0 - self.removed_pos_deletes = 0 - self.added_eq_deletes = 0 - self.removed_eq_deletes = 0 - - def add_file(self, data_file: DataFile) -> None: - self.added_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.added_data_files += 1 - self.added_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.added_delete_files += 1 - self.added_pos_delete_files += 1 - self.added_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.added_delete_files += 1 - self.added_eq_delete_files += 1 - self.added_eq_deletes += data_file.record_count - else: - raise ValueError(f"Unknown data file content: {data_file.content}") - - def remove_file(self, data_file: DataFile) -> None: - self.removed_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.removed_data_files += 1 - self.deleted_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.removed_delete_files += 1 - self.removed_pos_delete_files += 1 - self.removed_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.removed_delete_files += 1 - self.removed_eq_delete_files += 1 - self.removed_eq_deletes += data_file.record_count + self.metrics = UpdateMetrics() + self.partition_metrics = defaultdict(UpdateMetrics) + self.max_changed_partitions_for_summaries = 0 + + def set_partition_summary_limit(self, limit: int) -> None: + self.max_changed_partitions_for_summaries = limit + + def add_file( + self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None Review Comment: Or something like: ```suggestion self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC ``` ########## pyiceberg/table/snapshots.py: ########## @@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel): class SnapshotSummaryCollector: - added_file_size: int - removed_file_size: int - added_data_files: int - removed_data_files: int - added_eq_delete_files: int - removed_eq_delete_files: int - added_pos_delete_files: int - removed_pos_delete_files: int - added_delete_files: int - removed_delete_files: int - added_records: int - deleted_records: int - added_pos_deletes: int - removed_pos_deletes: int - added_eq_deletes: int - removed_eq_deletes: int + metrics: UpdateMetrics + partition_metrics: DefaultDict[str, UpdateMetrics] + max_changed_partitions_for_summaries: int def __init__(self) -> None: - self.added_file_size = 0 - self.removed_file_size = 0 - self.added_data_files = 0 - self.removed_data_files = 0 - self.added_eq_delete_files = 0 - self.removed_eq_delete_files = 0 - self.added_pos_delete_files = 0 - self.removed_pos_delete_files = 0 - self.added_delete_files = 0 - self.removed_delete_files = 0 - self.added_records = 0 - self.deleted_records = 0 - self.added_pos_deletes = 0 - self.removed_pos_deletes = 0 - self.added_eq_deletes = 0 - self.removed_eq_deletes = 0 - - def add_file(self, data_file: DataFile) -> None: - self.added_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.added_data_files += 1 - self.added_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.added_delete_files += 1 - self.added_pos_delete_files += 1 - self.added_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.added_delete_files += 1 - self.added_eq_delete_files += 1 - self.added_eq_deletes += data_file.record_count - else: - raise ValueError(f"Unknown data file content: {data_file.content}") - - def remove_file(self, data_file: DataFile) -> None: - self.removed_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.removed_data_files += 1 - self.deleted_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.removed_delete_files += 1 - self.removed_pos_delete_files += 1 - self.removed_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.removed_delete_files += 1 - self.removed_eq_delete_files += 1 - self.removed_eq_deletes += data_file.record_count + self.metrics = UpdateMetrics() + self.partition_metrics = defaultdict(UpdateMetrics) + self.max_changed_partitions_for_summaries = 0 + + def set_partition_summary_limit(self, limit: int) -> None: + self.max_changed_partitions_for_summaries = limit + + def add_file( + self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None + ) -> None: + self.metrics.add_file(data_file) + if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: + if partition_spec is None or schema is None: + raise ValueError("add data file with partition but without specifying the partiton_spec and schema") + self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema) + + def remove_file(self, data_file: DataFile, partition_spec: Optional[PartitionSpec], schema: Optional[Schema]) -> None: + self.metrics.remove_file(data_file) + if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: + if partition_spec is None or schema is None: + raise ValueError("remove data file with partition but without specifying the partiton_spec and schema") + self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema) + + def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None: + partition_path = partition_spec.partition_to_path(file.partition, schema) + partition_metrics: UpdateMetrics = self.partition_metrics[partition_path] + + if is_add_file: + partition_metrics.add_file(file) else: - raise ValueError(f"Unknown data file content: {data_file.content}") + partition_metrics.remove_file(file) def build(self) -> Dict[str, str]: - def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: - if num > 0: - properties[property_name] = str(num) - - properties: Dict[str, str] = {} - set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE) - set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE) - set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES) - set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES) - set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES) - set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) - set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) - set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) - set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES) - set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES) - set_when_positive(properties, self.added_records, ADDED_RECORDS) - set_when_positive(properties, self.deleted_records, DELETED_RECORDS) - set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES) - set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES) - set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES) - set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES) + properties = self.metrics.to_dict() + changed_partitions_size = len(self.partition_metrics) + set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP) + if changed_partitions_size <= self.max_changed_partitions_for_summaries: + for partition_path, update_metrics_partition in self.partition_metrics.items(): + if (summary := self.partition_summary(update_metrics_partition)) and len(summary) != 0: + properties[CHANGED_PARTITION_PREFIX + partition_path] = summary return properties + def partition_summary(self, update_metrics: UpdateMetrics) -> str: Review Comment: ```suggestion def _partition_summary(self, update_metrics: UpdateMetrics) -> str: ``` ########## pyiceberg/table/snapshots.py: ########## @@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel): class SnapshotSummaryCollector: - added_file_size: int - removed_file_size: int - added_data_files: int - removed_data_files: int - added_eq_delete_files: int - removed_eq_delete_files: int - added_pos_delete_files: int - removed_pos_delete_files: int - added_delete_files: int - removed_delete_files: int - added_records: int - deleted_records: int - added_pos_deletes: int - removed_pos_deletes: int - added_eq_deletes: int - removed_eq_deletes: int + metrics: UpdateMetrics + partition_metrics: DefaultDict[str, UpdateMetrics] + max_changed_partitions_for_summaries: int def __init__(self) -> None: - self.added_file_size = 0 - self.removed_file_size = 0 - self.added_data_files = 0 - self.removed_data_files = 0 - self.added_eq_delete_files = 0 - self.removed_eq_delete_files = 0 - self.added_pos_delete_files = 0 - self.removed_pos_delete_files = 0 - self.added_delete_files = 0 - self.removed_delete_files = 0 - self.added_records = 0 - self.deleted_records = 0 - self.added_pos_deletes = 0 - self.removed_pos_deletes = 0 - self.added_eq_deletes = 0 - self.removed_eq_deletes = 0 - - def add_file(self, data_file: DataFile) -> None: - self.added_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.added_data_files += 1 - self.added_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.added_delete_files += 1 - self.added_pos_delete_files += 1 - self.added_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.added_delete_files += 1 - self.added_eq_delete_files += 1 - self.added_eq_deletes += data_file.record_count - else: - raise ValueError(f"Unknown data file content: {data_file.content}") - - def remove_file(self, data_file: DataFile) -> None: - self.removed_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.removed_data_files += 1 - self.deleted_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.removed_delete_files += 1 - self.removed_pos_delete_files += 1 - self.removed_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.removed_delete_files += 1 - self.removed_eq_delete_files += 1 - self.removed_eq_deletes += data_file.record_count + self.metrics = UpdateMetrics() + self.partition_metrics = defaultdict(UpdateMetrics) + self.max_changed_partitions_for_summaries = 0 + + def set_partition_summary_limit(self, limit: int) -> None: + self.max_changed_partitions_for_summaries = limit + + def add_file( + self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None Review Comment: Same goes for the other methods, would be nice to have less noise there of arguments that we want to pass in anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org