Fokko commented on code in PR #521:
URL: https://github.com/apache/iceberg-python/pull/521#discussion_r1524719927


##########
pyiceberg/table/snapshots.py:
##########
@@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel):
 
 
 class SnapshotSummaryCollector:
-    added_file_size: int
-    removed_file_size: int
-    added_data_files: int
-    removed_data_files: int
-    added_eq_delete_files: int
-    removed_eq_delete_files: int
-    added_pos_delete_files: int
-    removed_pos_delete_files: int
-    added_delete_files: int
-    removed_delete_files: int
-    added_records: int
-    deleted_records: int
-    added_pos_deletes: int
-    removed_pos_deletes: int
-    added_eq_deletes: int
-    removed_eq_deletes: int
+    metrics: UpdateMetrics
+    partition_metrics: DefaultDict[str, UpdateMetrics]
+    max_changed_partitions_for_summaries: int
 
     def __init__(self) -> None:
-        self.added_file_size = 0
-        self.removed_file_size = 0
-        self.added_data_files = 0
-        self.removed_data_files = 0
-        self.added_eq_delete_files = 0
-        self.removed_eq_delete_files = 0
-        self.added_pos_delete_files = 0
-        self.removed_pos_delete_files = 0
-        self.added_delete_files = 0
-        self.removed_delete_files = 0
-        self.added_records = 0
-        self.deleted_records = 0
-        self.added_pos_deletes = 0
-        self.removed_pos_deletes = 0
-        self.added_eq_deletes = 0
-        self.removed_eq_deletes = 0
-
-    def add_file(self, data_file: DataFile) -> None:
-        self.added_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.added_data_files += 1
-            self.added_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.added_delete_files += 1
-            self.added_pos_delete_files += 1
-            self.added_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.added_delete_files += 1
-            self.added_eq_delete_files += 1
-            self.added_eq_deletes += data_file.record_count
-        else:
-            raise ValueError(f"Unknown data file content: {data_file.content}")
-
-    def remove_file(self, data_file: DataFile) -> None:
-        self.removed_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.removed_data_files += 1
-            self.deleted_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.removed_delete_files += 1
-            self.removed_pos_delete_files += 1
-            self.removed_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.removed_delete_files += 1
-            self.removed_eq_delete_files += 1
-            self.removed_eq_deletes += data_file.record_count
+        self.metrics = UpdateMetrics()
+        self.partition_metrics = defaultdict(UpdateMetrics)
+        self.max_changed_partitions_for_summaries = 0
+
+    def set_partition_summary_limit(self, limit: int) -> None:
+        self.max_changed_partitions_for_summaries = limit
+
+    def add_file(
+        self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = 
None, schema: Optional[Schema] = None

Review Comment:
   Since this is more-or-less internal, I think it makes sense to make the spec 
and schema required.



##########
pyiceberg/table/__init__.py:
##########
@@ -2569,9 +2572,21 @@ def _write_delete_manifest() -> List[ManifestFile]:
 
     def _summary(self) -> Summary:
         ssc = SnapshotSummaryCollector()
+        partition_summary_limit = 
self._transaction.table_metadata.properties.get(

Review Comment:
   I think they are stored as strings, so maybe we should try to parse it, and 
raise a ValueError otherwise.



##########
tests/integration/test_writes.py:
##########
@@ -525,12 +525,15 @@ def test_summaries_with_only_nulls(
         'total-records': '2',
     }
 
-    assert summaries[0] == {
-        'total-data-files': '0',
-        'total-delete-files': '0',
+    assert summaries[2] == {

Review Comment:
   Nice catch!



##########
pyiceberg/table/snapshots.py:
##########
@@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel):
 
 
 class SnapshotSummaryCollector:
-    added_file_size: int
-    removed_file_size: int
-    added_data_files: int
-    removed_data_files: int
-    added_eq_delete_files: int
-    removed_eq_delete_files: int
-    added_pos_delete_files: int
-    removed_pos_delete_files: int
-    added_delete_files: int
-    removed_delete_files: int
-    added_records: int
-    deleted_records: int
-    added_pos_deletes: int
-    removed_pos_deletes: int
-    added_eq_deletes: int
-    removed_eq_deletes: int
+    metrics: UpdateMetrics
+    partition_metrics: DefaultDict[str, UpdateMetrics]
+    max_changed_partitions_for_summaries: int
 
     def __init__(self) -> None:
-        self.added_file_size = 0
-        self.removed_file_size = 0
-        self.added_data_files = 0
-        self.removed_data_files = 0
-        self.added_eq_delete_files = 0
-        self.removed_eq_delete_files = 0
-        self.added_pos_delete_files = 0
-        self.removed_pos_delete_files = 0
-        self.added_delete_files = 0
-        self.removed_delete_files = 0
-        self.added_records = 0
-        self.deleted_records = 0
-        self.added_pos_deletes = 0
-        self.removed_pos_deletes = 0
-        self.added_eq_deletes = 0
-        self.removed_eq_deletes = 0
-
-    def add_file(self, data_file: DataFile) -> None:
-        self.added_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.added_data_files += 1
-            self.added_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.added_delete_files += 1
-            self.added_pos_delete_files += 1
-            self.added_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.added_delete_files += 1
-            self.added_eq_delete_files += 1
-            self.added_eq_deletes += data_file.record_count
-        else:
-            raise ValueError(f"Unknown data file content: {data_file.content}")
-
-    def remove_file(self, data_file: DataFile) -> None:
-        self.removed_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.removed_data_files += 1
-            self.deleted_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.removed_delete_files += 1
-            self.removed_pos_delete_files += 1
-            self.removed_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.removed_delete_files += 1
-            self.removed_eq_delete_files += 1
-            self.removed_eq_deletes += data_file.record_count
+        self.metrics = UpdateMetrics()
+        self.partition_metrics = defaultdict(UpdateMetrics)
+        self.max_changed_partitions_for_summaries = 0
+
+    def set_partition_summary_limit(self, limit: int) -> None:
+        self.max_changed_partitions_for_summaries = limit
+
+    def add_file(
+        self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = 
None, schema: Optional[Schema] = None
+    ) -> None:
+        self.metrics.add_file(data_file)
+        if getattr(data_file, "partition", None) is not None and 
len(data_file.partition.record_fields()) != 0:

Review Comment:
   The partition is required, and should always be there: 
   
![image](https://github.com/apache/iceberg-python/assets/1134248/83fb20c2-036d-4fb5-8739-58ba642f2d51)
   



##########
pyiceberg/table/snapshots.py:
##########
@@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel):
 
 
 class SnapshotSummaryCollector:
-    added_file_size: int
-    removed_file_size: int
-    added_data_files: int
-    removed_data_files: int
-    added_eq_delete_files: int
-    removed_eq_delete_files: int
-    added_pos_delete_files: int
-    removed_pos_delete_files: int
-    added_delete_files: int
-    removed_delete_files: int
-    added_records: int
-    deleted_records: int
-    added_pos_deletes: int
-    removed_pos_deletes: int
-    added_eq_deletes: int
-    removed_eq_deletes: int
+    metrics: UpdateMetrics
+    partition_metrics: DefaultDict[str, UpdateMetrics]
+    max_changed_partitions_for_summaries: int
 
     def __init__(self) -> None:
-        self.added_file_size = 0
-        self.removed_file_size = 0
-        self.added_data_files = 0
-        self.removed_data_files = 0
-        self.added_eq_delete_files = 0
-        self.removed_eq_delete_files = 0
-        self.added_pos_delete_files = 0
-        self.removed_pos_delete_files = 0
-        self.added_delete_files = 0
-        self.removed_delete_files = 0
-        self.added_records = 0
-        self.deleted_records = 0
-        self.added_pos_deletes = 0
-        self.removed_pos_deletes = 0
-        self.added_eq_deletes = 0
-        self.removed_eq_deletes = 0
-
-    def add_file(self, data_file: DataFile) -> None:
-        self.added_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.added_data_files += 1
-            self.added_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.added_delete_files += 1
-            self.added_pos_delete_files += 1
-            self.added_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.added_delete_files += 1
-            self.added_eq_delete_files += 1
-            self.added_eq_deletes += data_file.record_count
-        else:
-            raise ValueError(f"Unknown data file content: {data_file.content}")
-
-    def remove_file(self, data_file: DataFile) -> None:
-        self.removed_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.removed_data_files += 1
-            self.deleted_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.removed_delete_files += 1
-            self.removed_pos_delete_files += 1
-            self.removed_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.removed_delete_files += 1
-            self.removed_eq_delete_files += 1
-            self.removed_eq_deletes += data_file.record_count
+        self.metrics = UpdateMetrics()
+        self.partition_metrics = defaultdict(UpdateMetrics)
+        self.max_changed_partitions_for_summaries = 0
+
+    def set_partition_summary_limit(self, limit: int) -> None:
+        self.max_changed_partitions_for_summaries = limit
+
+    def add_file(
+        self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = 
None, schema: Optional[Schema] = None

Review Comment:
   Or something like:
   ```suggestion
           self, data_file: DataFile, schema: Schema, partition_spec: 
PartitionSpec = UNPARTITIONED_PARTITION_SPEC 
   ```



##########
pyiceberg/table/snapshots.py:
##########
@@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel):
 
 
 class SnapshotSummaryCollector:
-    added_file_size: int
-    removed_file_size: int
-    added_data_files: int
-    removed_data_files: int
-    added_eq_delete_files: int
-    removed_eq_delete_files: int
-    added_pos_delete_files: int
-    removed_pos_delete_files: int
-    added_delete_files: int
-    removed_delete_files: int
-    added_records: int
-    deleted_records: int
-    added_pos_deletes: int
-    removed_pos_deletes: int
-    added_eq_deletes: int
-    removed_eq_deletes: int
+    metrics: UpdateMetrics
+    partition_metrics: DefaultDict[str, UpdateMetrics]
+    max_changed_partitions_for_summaries: int
 
     def __init__(self) -> None:
-        self.added_file_size = 0
-        self.removed_file_size = 0
-        self.added_data_files = 0
-        self.removed_data_files = 0
-        self.added_eq_delete_files = 0
-        self.removed_eq_delete_files = 0
-        self.added_pos_delete_files = 0
-        self.removed_pos_delete_files = 0
-        self.added_delete_files = 0
-        self.removed_delete_files = 0
-        self.added_records = 0
-        self.deleted_records = 0
-        self.added_pos_deletes = 0
-        self.removed_pos_deletes = 0
-        self.added_eq_deletes = 0
-        self.removed_eq_deletes = 0
-
-    def add_file(self, data_file: DataFile) -> None:
-        self.added_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.added_data_files += 1
-            self.added_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.added_delete_files += 1
-            self.added_pos_delete_files += 1
-            self.added_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.added_delete_files += 1
-            self.added_eq_delete_files += 1
-            self.added_eq_deletes += data_file.record_count
-        else:
-            raise ValueError(f"Unknown data file content: {data_file.content}")
-
-    def remove_file(self, data_file: DataFile) -> None:
-        self.removed_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.removed_data_files += 1
-            self.deleted_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.removed_delete_files += 1
-            self.removed_pos_delete_files += 1
-            self.removed_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.removed_delete_files += 1
-            self.removed_eq_delete_files += 1
-            self.removed_eq_deletes += data_file.record_count
+        self.metrics = UpdateMetrics()
+        self.partition_metrics = defaultdict(UpdateMetrics)
+        self.max_changed_partitions_for_summaries = 0
+
+    def set_partition_summary_limit(self, limit: int) -> None:
+        self.max_changed_partitions_for_summaries = limit
+
+    def add_file(
+        self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = 
None, schema: Optional[Schema] = None
+    ) -> None:
+        self.metrics.add_file(data_file)
+        if getattr(data_file, "partition", None) is not None and 
len(data_file.partition.record_fields()) != 0:
+            if partition_spec is None or schema is None:
+                raise ValueError("add data file with partition but without 
specifying the partiton_spec and schema")
+            self.update_partition_metrics(partition_spec=partition_spec, 
file=data_file, is_add_file=True, schema=schema)
+
+    def remove_file(self, data_file: DataFile, partition_spec: 
Optional[PartitionSpec], schema: Optional[Schema]) -> None:
+        self.metrics.remove_file(data_file)
+        if getattr(data_file, "partition", None) is not None and 
len(data_file.partition.record_fields()) != 0:
+            if partition_spec is None or schema is None:
+                raise ValueError("remove data file with partition but without 
specifying the partiton_spec and schema")
+            self.update_partition_metrics(partition_spec=partition_spec, 
file=data_file, is_add_file=False, schema=schema)
+
+    def update_partition_metrics(self, partition_spec: PartitionSpec, file: 
DataFile, is_add_file: bool, schema: Schema) -> None:
+        partition_path = partition_spec.partition_to_path(file.partition, 
schema)
+        partition_metrics: UpdateMetrics = 
self.partition_metrics[partition_path]
+
+        if is_add_file:
+            partition_metrics.add_file(file)
         else:
-            raise ValueError(f"Unknown data file content: {data_file.content}")
+            partition_metrics.remove_file(file)
 
     def build(self) -> Dict[str, str]:
-        def set_when_positive(properties: Dict[str, str], num: int, 
property_name: str) -> None:
-            if num > 0:
-                properties[property_name] = str(num)
-
-        properties: Dict[str, str] = {}
-        set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE)
-        set_when_positive(properties, self.removed_file_size, 
REMOVED_FILE_SIZE)
-        set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES)
-        set_when_positive(properties, self.removed_data_files, 
DELETED_DATA_FILES)
-        set_when_positive(properties, self.added_eq_delete_files, 
ADDED_EQUALITY_DELETE_FILES)
-        set_when_positive(properties, self.removed_eq_delete_files, 
REMOVED_EQUALITY_DELETE_FILES)
-        set_when_positive(properties, self.added_pos_delete_files, 
ADDED_POSITION_DELETE_FILES)
-        set_when_positive(properties, self.removed_pos_delete_files, 
REMOVED_POSITION_DELETE_FILES)
-        set_when_positive(properties, self.added_delete_files, 
ADDED_DELETE_FILES)
-        set_when_positive(properties, self.removed_delete_files, 
REMOVED_DELETE_FILES)
-        set_when_positive(properties, self.added_records, ADDED_RECORDS)
-        set_when_positive(properties, self.deleted_records, DELETED_RECORDS)
-        set_when_positive(properties, self.added_pos_deletes, 
ADDED_POSITION_DELETES)
-        set_when_positive(properties, self.removed_pos_deletes, 
REMOVED_POSITION_DELETES)
-        set_when_positive(properties, self.added_eq_deletes, 
ADDED_EQUALITY_DELETES)
-        set_when_positive(properties, self.removed_eq_deletes, 
REMOVED_EQUALITY_DELETES)
+        properties = self.metrics.to_dict()
+        changed_partitions_size = len(self.partition_metrics)
+        set_when_positive(properties, changed_partitions_size, 
CHANGED_PARTITION_COUNT_PROP)
+        if changed_partitions_size <= 
self.max_changed_partitions_for_summaries:
+            for partition_path, update_metrics_partition in 
self.partition_metrics.items():
+                if (summary := 
self.partition_summary(update_metrics_partition)) and len(summary) != 0:
+                    properties[CHANGED_PARTITION_PREFIX + partition_path] = 
summary
 
         return properties
 
+    def partition_summary(self, update_metrics: UpdateMetrics) -> str:

Review Comment:
   ```suggestion
       def _partition_summary(self, update_metrics: UpdateMetrics) -> str:
   ```



##########
pyiceberg/table/snapshots.py:
##########
@@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel):
 
 
 class SnapshotSummaryCollector:
-    added_file_size: int
-    removed_file_size: int
-    added_data_files: int
-    removed_data_files: int
-    added_eq_delete_files: int
-    removed_eq_delete_files: int
-    added_pos_delete_files: int
-    removed_pos_delete_files: int
-    added_delete_files: int
-    removed_delete_files: int
-    added_records: int
-    deleted_records: int
-    added_pos_deletes: int
-    removed_pos_deletes: int
-    added_eq_deletes: int
-    removed_eq_deletes: int
+    metrics: UpdateMetrics
+    partition_metrics: DefaultDict[str, UpdateMetrics]
+    max_changed_partitions_for_summaries: int
 
     def __init__(self) -> None:
-        self.added_file_size = 0
-        self.removed_file_size = 0
-        self.added_data_files = 0
-        self.removed_data_files = 0
-        self.added_eq_delete_files = 0
-        self.removed_eq_delete_files = 0
-        self.added_pos_delete_files = 0
-        self.removed_pos_delete_files = 0
-        self.added_delete_files = 0
-        self.removed_delete_files = 0
-        self.added_records = 0
-        self.deleted_records = 0
-        self.added_pos_deletes = 0
-        self.removed_pos_deletes = 0
-        self.added_eq_deletes = 0
-        self.removed_eq_deletes = 0
-
-    def add_file(self, data_file: DataFile) -> None:
-        self.added_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.added_data_files += 1
-            self.added_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.added_delete_files += 1
-            self.added_pos_delete_files += 1
-            self.added_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.added_delete_files += 1
-            self.added_eq_delete_files += 1
-            self.added_eq_deletes += data_file.record_count
-        else:
-            raise ValueError(f"Unknown data file content: {data_file.content}")
-
-    def remove_file(self, data_file: DataFile) -> None:
-        self.removed_file_size += data_file.file_size_in_bytes
-
-        if data_file.content == DataFileContent.DATA:
-            self.removed_data_files += 1
-            self.deleted_records += data_file.record_count
-        elif data_file.content == DataFileContent.POSITION_DELETES:
-            self.removed_delete_files += 1
-            self.removed_pos_delete_files += 1
-            self.removed_pos_deletes += data_file.record_count
-        elif data_file.content == DataFileContent.EQUALITY_DELETES:
-            self.removed_delete_files += 1
-            self.removed_eq_delete_files += 1
-            self.removed_eq_deletes += data_file.record_count
+        self.metrics = UpdateMetrics()
+        self.partition_metrics = defaultdict(UpdateMetrics)
+        self.max_changed_partitions_for_summaries = 0
+
+    def set_partition_summary_limit(self, limit: int) -> None:
+        self.max_changed_partitions_for_summaries = limit
+
+    def add_file(
+        self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = 
None, schema: Optional[Schema] = None

Review Comment:
   Same goes for the other methods, would be nice to have less noise there of 
arguments that we want to pass in anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to