HonahX commented on code in PR #8622:
URL: https://github.com/apache/iceberg/pull/8622#discussion_r1334945453


##########
python/pyiceberg/manifest.py:
##########
@@ -265,6 +306,64 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
         super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, 
**named_data})
 
 
+class PartitionFieldStats:
+    _type: PrimitiveType
+    _contains_null: bool
+    _contains_nan: bool
+    _min: Optional[Any]
+    _max: Optional[Any]
+
+    def __init__(self, iceberg_type: IcebergType) -> None:
+        if not isinstance(iceberg_type, PrimitiveType):
+            raise ValueError(f"Expected a primitive type for the partition 
field, got {iceberg_type}")
+        self._type = iceberg_type
+        self._contains_null = False
+        self._contains_nan = False
+        self._min = None
+        self._max = None
+
+    def to_summary(self) -> PartitionFieldSummary:
+        return PartitionFieldSummary(
+            contains_null=self._contains_null,
+            contains_nan=self._contains_nan,
+            lower_bound=to_bytes(self._type, self._min) if self._min is not 
None else None,
+            upper_bound=to_bytes(self._type, self._max) if self._max is not 
None else None,
+        )
+
+    def update(self, value: Any) -> None:
+        if value is None:
+            self._contains_null = True
+        elif math.isnan(value):
+            self._contains_nan = True
+        else:
+            if self._min is None:
+                self._min = value
+                self._max = value
+            else:
+                self._max = max(self._max, value)
+                self._min = min(self._min, value)
+
+
+class PartitionSummary:
+    _field_stats: List[PartitionFieldStats]
+    _types: List[IcebergType]
+
+    def __init__(self, spec: PartitionSpec, schema: Schema):
+        self._types = [field.field_type for field in 
spec.partition_type(schema).fields]
+        self._field_stats = [PartitionFieldStats(field_type) for field_type in 
self._types]
+
+    def summaries(self) -> List[PartitionFieldSummary]:
+        return [field.to_summary() for field in self._field_stats]
+
+    def update(self, partition_keys: Record) -> PartitionSummary:

Review Comment:
   Thanks for the suggestion. I will try it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to