Fokko commented on code in PR #7831:
URL: https://github.com/apache/iceberg/pull/7831#discussion_r1288329800


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -1025,3 +1039,344 @@ def map_key_partner(self, partner_map: 
Optional[pa.Array]) -> Optional[pa.Array]
 
     def map_value_partner(self, partner_map: Optional[pa.Array]) -> 
Optional[pa.Array]:
         return partner_map.items if isinstance(partner_map, pa.MapArray) else 
None
+
+
+_PRIMITIVE_TO_PHYSICAL = {
+    BooleanType(): "BOOLEAN",
+    IntegerType(): "INT32",
+    LongType(): "INT64",
+    FloatType(): "FLOAT",
+    DoubleType(): "DOUBLE",
+    DateType(): "INT32",
+    TimeType(): "INT64",
+    TimestampType(): "INT64",
+    TimestamptzType(): "INT64",
+    StringType(): "BYTE_ARRAY",
+    UUIDType(): "FIXED_LEN_BYTE_ARRAY",
+    BinaryType(): "BYTE_ARRAY",
+}
+_PHYSICAL_TYPES = set(_PRIMITIVE_TO_PHYSICAL.values()).union({"INT96"})
+
+
+class StatsAggregator:
+    current_min: Any
+    current_max: Any
+    trunc_length: Optional[int]
+
+    def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, 
trunc_length: Optional[int] = None) -> None:
+        self.current_min = None
+        self.current_max = None
+        self.trunc_length = trunc_length
+
+        if physical_type_string not in _PHYSICAL_TYPES:
+            raise ValueError(f"Unknown physical type {physical_type_string}")
+
+        if physical_type_string == "INT96":
+            raise NotImplementedError("Statistics not implemented for INT96 
physical type")
+
+        expected_physical_type = _PRIMITIVE_TO_PHYSICAL[iceberg_type]
+        if expected_physical_type != physical_type_string:
+            raise ValueError(
+                f"Unexpected physical type {physical_type_string} for 
{iceberg_type}, expected {expected_physical_type}"
+            )
+
+        self.primitive_type = iceberg_type
+
+    def serialize(self, value: Any) -> bytes:
+        if self.primitive_type == UUIDType():
+            value = uuid.UUID(bytes=value)
+
+        return to_bytes(self.primitive_type, value)
+
+    def update_min(self, val: Any) -> None:
+        self.current_min = val if self.current_min is None else min(val, 
self.current_min)
+
+    def update_max(self, val: Any) -> None:
+        self.current_max = val if self.current_max is None else max(val, 
self.current_max)
+
+    def min_as_bytes(self) -> bytes:
+        return self.serialize(
+            self.current_min
+            if self.trunc_length is None
+            else 
TruncateTransform(width=self.trunc_length).transform(self.primitive_type)(self.current_min)
+        )
+
+    def max_as_bytes(self) -> Optional[bytes]:
+        if self.current_max is None:
+            return None
+
+        if self.primitive_type == StringType():
+            if type(self.current_max) != str:
+                raise ValueError("Expected the current_max to be a string")
+
+            s_result = self.current_max[: self.trunc_length]
+            if s_result != self.current_max:
+                chars = [*s_result]
+
+                for i in range(-1, -len(s_result) - 1, -1):
+                    try:
+                        to_inc = ord(chars[i])
+                        # will raise exception if the highest unicode code is 
reached
+                        _next = chr(to_inc + 1)
+                        chars[i] = _next
+                        return self.serialize("".join(chars))
+                    except ValueError:
+                        pass
+                return None  # didn't find a valid upper bound
+            return self.serialize(s_result)
+        elif self.primitive_type == BinaryType():
+            if type(self.current_max) != bytes:
+                raise ValueError("Expected the current_max to be bytes")
+            b_result = self.current_max[: self.trunc_length]
+            if b_result != self.current_max:
+                _bytes = [*b_result]
+                for i in range(-1, -len(b_result) - 1, -1):
+                    if _bytes[i] < 255:
+                        _bytes[i] += 1
+                        return b"".join([i.to_bytes(1, byteorder="little") for 
i in _bytes])
+                return None
+
+            return self.serialize(b_result)
+        else:
+            if self.trunc_length is not None:
+                raise ValueError(f"{self.primitive_type} cannot be truncated")
+            return self.serialize(self.current_max)
+
+
+DEFAULT_TRUNCATION_LENGTH = 16
+TRUNCATION_EXPR = r"^truncate\((\d+)\)$"
+
+
+class MetricModeTypes(Enum):
+    TRUNCATE = "truncate"
+    NONE = "none"
+    COUNTS = "counts"
+    FULL = "full"
+
+
+DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
+COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
+
+
+@dataclass(frozen=True)
+class MetricsMode(Singleton):
+    type: MetricModeTypes
+    length: Optional[int] = None
+
+
+_DEFAULT_METRICS_MODE = MetricsMode(MetricModeTypes.TRUNCATE, 
DEFAULT_TRUNCATION_LENGTH)
+
+
+def match_metrics_mode(mode: str) -> MetricsMode:
+    sanitized_mode = mode.lower()
+    if sanitized_mode.startswith("truncate"):
+        m = re.match(TRUNCATION_EXPR, sanitized_mode)
+        if m:
+            length = int(m[1])
+            if length < 1:
+                raise ValueError("Truncation length must be larger than 0")
+            return MetricsMode(MetricModeTypes.TRUNCATE, int(m[1]))
+        else:
+            raise ValueError(f"Malformed truncate: {mode}")
+    elif sanitized_mode.startswith("none"):
+        return MetricsMode(MetricModeTypes.NONE)
+    elif sanitized_mode.startswith("counts"):
+        return MetricsMode(MetricModeTypes.COUNTS)
+    elif sanitized_mode.startswith("full"):
+        return MetricsMode(MetricModeTypes.FULL)
+    else:
+        raise ValueError(f"Unsupported metrics mode: {mode}")
+

Review Comment:
   ```suggestion
   def match_metrics_mode(mode: str) -> MetricsMode:
       sanitized_mode = mode.trim().lower()
       if sanitized_mode.startswith("truncate"):
           m = re.match(TRUNCATION_EXPR, sanitized_mode)
           if m:
               length = int(m[1])
               if length < 1:
                   raise ValueError("Truncation length must be larger than 0")
               return MetricsMode(MetricModeTypes.TRUNCATE, int(m[1]))
           else:
               raise ValueError(f"Malformed truncate: {mode}")
       elif sanitized_mode == "none":
           return MetricsMode(MetricModeTypes.NONE)
       elif sanitized_mode == "counts":
           return MetricsMode(MetricModeTypes.COUNTS)
       elif sanitized_mode == "full":
           return MetricsMode(MetricModeTypes.FULL)
       else:
           raise ValueError(f"Unsupported metrics mode: {mode}")
   
   ```



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -1025,3 +1039,335 @@ def map_key_partner(self, partner_map: 
Optional[pa.Array]) -> Optional[pa.Array]
 
     def map_value_partner(self, partner_map: Optional[pa.Array]) -> 
Optional[pa.Array]:
         return partner_map.items if isinstance(partner_map, pa.MapArray) else 
None
+
+
+_PRIMITIVE_TO_PHYSICAL = {
+    BooleanType(): "BOOLEAN",
+    IntegerType(): "INT32",
+    LongType(): "INT64",
+    FloatType(): "FLOAT",
+    DoubleType(): "DOUBLE",
+    DateType(): "INT32",
+    TimeType(): "INT64",
+    TimestampType(): "INT64",
+    TimestamptzType(): "INT64",
+    StringType(): "BYTE_ARRAY",
+    UUIDType(): "FIXED_LEN_BYTE_ARRAY",
+    BinaryType(): "BYTE_ARRAY",
+}
+_PHYSICAL_TYPES = set(_PRIMITIVE_TO_PHYSICAL.values()).union({"INT96"})
+
+
+class StatsAggregator:
+    current_min: Any
+    current_max: Any
+    trunc_length: Optional[int]
+
+    def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, 
trunc_length: Optional[int] = None) -> None:
+        self.current_min = None
+        self.current_max = None
+        self.trunc_length = trunc_length
+
+        if physical_type_string not in _PHYSICAL_TYPES:
+            raise ValueError(f"Unknown physical type {physical_type_string}")
+
+        if physical_type_string == "INT96":
+            raise NotImplementedError("Statistics not implemented for INT96 
physical type")
+
+        expected_physical_type = _PRIMITIVE_TO_PHYSICAL[iceberg_type]
+        if expected_physical_type != physical_type_string:
+            raise ValueError(
+                f"Unexpected physical type {physical_type_string} for 
{iceberg_type}, expected {expected_physical_type}"
+            )
+
+        self.primitive_type = iceberg_type
+
+    def serialize(self, value: Any) -> bytes:
+        if self.primitive_type == UUIDType():
+            value = uuid.UUID(bytes=value)
+
+        return to_bytes(self.primitive_type, value)
+
+    def add_min(self, val: Any) -> None:
+        self.current_min = val if self.current_min is None else min(val, 
self.current_min)
+
+    def add_max(self, val: Any) -> None:
+        self.current_max = val if self.current_max is None else max(val, 
self.current_max)
+
+    def get_min(self) -> bytes:
+        return self.serialize(
+            self.current_min
+            if self.trunc_length is None
+            else 
TruncateTransform(width=self.trunc_length).transform(self.primitive_type)(self.current_min)
+        )
+
+    def get_max(self) -> Optional[bytes]:
+        if self.current_max is None:
+            return None
+
+        if self.primitive_type == StringType():
+            if type(self.current_max) != str:
+                raise ValueError("Expected the current_max to be a string")
+
+            s_result = self.current_max[: self.trunc_length]
+            if s_result != self.current_max:
+                chars = [*s_result]
+
+                for i in range(-1, -len(s_result) - 1, -1):
+                    try:
+                        to_inc = ord(chars[i])
+                        # will raise exception if the highest unicode code is 
reached
+                        _next = chr(to_inc + 1)
+                        chars[i] = _next
+                        return self.serialize("".join(chars))
+                    except ValueError:
+                        pass
+                return None  # didn't find a valid upper bound
+            return self.serialize(s_result)
+        elif self.primitive_type == BinaryType():
+            if type(self.current_max) != bytes:
+                raise ValueError("Expected the current_max to be bytes")
+            b_result = self.current_max[: self.trunc_length]
+            if b_result != self.current_max:
+                _bytes = [*b_result]
+                for i in range(-1, -len(b_result) - 1, -1):
+                    if _bytes[i] < 255:
+                        _bytes[i] += 1
+                        return b"".join([i.to_bytes(1, byteorder="little") for 
i in _bytes])
+                return None
+
+            return self.serialize(b_result)
+        else:
+            return self.serialize(self.current_max)[: self.trunc_length]
+
+
+DEFAULT_TRUNCATION_LENGTH = 16
+TRUNCATION_EXPR = r"^truncate\((\d+)\)$"
+
+
+class MetricModeTypes(Enum):
+    TRUNCATE = "truncate"
+    NONE = "none"
+    COUNTS = "counts"
+    FULL = "full"
+
+
+DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
+COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
+
+
+@dataclass(frozen=True)
+class MetricsMode(Singleton):
+    type: MetricModeTypes
+    length: Optional[int] = None
+
+
+def match_metrics_mode(mode: str) -> MetricsMode:
+    sanitized_mode = mode.lower()
+    if sanitized_mode.startswith("truncate"):
+        m = re.match(TRUNCATION_EXPR, mode, re.IGNORECASE)
+        if m:
+            length = int(m[1])
+            if length < 1:
+                raise ValueError("Truncation length must be larger than 0")
+            return MetricsMode(MetricModeTypes.TRUNCATE, int(m[1]))
+        else:
+            raise ValueError(f"Malformed truncate: {mode}")
+    elif sanitized_mode.startswith("none"):
+        return MetricsMode(MetricModeTypes.NONE)
+    elif sanitized_mode.startswith("counts"):
+        return MetricsMode(MetricModeTypes.COUNTS)
+    elif sanitized_mode.startswith("full"):
+        return MetricsMode(MetricModeTypes.FULL)
+    else:
+        raise ValueError(f"Unsupported metrics mode: {mode}")
+
+
+@dataclass(frozen=True)
+class StatisticsCollector:
+    field_id: int
+    iceberg_type: PrimitiveType
+    mode: MetricsMode
+    column_name: str
+
+
+class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]]):
+    _field_id: int = 0
+    _schema: Schema
+    _properties: Dict[str, str]
+
+    def __init__(self, schema: Schema, properties: Dict[str, str]):
+        self._schema = schema
+        self._properties = properties
+
+    def schema(self, schema: Schema, struct_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        return struct_result()
+
+    def struct(
+        self, struct: StructType, field_results: List[Callable[[], 
List[StatisticsCollector]]]
+    ) -> List[StatisticsCollector]:
+        return list(chain(*[result() for result in field_results]))
+
+    def field(self, field: NestedField, field_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = field.field_id

Review Comment:
   I see your point, and happy to add those fields. Currently, they are not 
there, so I would suggest doing that in a separate PR, to avoid going on a 
tangent in this PR. Created: https://github.com/apache/iceberg/issues/8273



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -1025,3 +1039,335 @@ def map_key_partner(self, partner_map: 
Optional[pa.Array]) -> Optional[pa.Array]
 
     def map_value_partner(self, partner_map: Optional[pa.Array]) -> 
Optional[pa.Array]:
         return partner_map.items if isinstance(partner_map, pa.MapArray) else 
None
+
+
+_PRIMITIVE_TO_PHYSICAL = {
+    BooleanType(): "BOOLEAN",
+    IntegerType(): "INT32",
+    LongType(): "INT64",
+    FloatType(): "FLOAT",
+    DoubleType(): "DOUBLE",
+    DateType(): "INT32",
+    TimeType(): "INT64",
+    TimestampType(): "INT64",
+    TimestamptzType(): "INT64",
+    StringType(): "BYTE_ARRAY",
+    UUIDType(): "FIXED_LEN_BYTE_ARRAY",
+    BinaryType(): "BYTE_ARRAY",
+}
+_PHYSICAL_TYPES = set(_PRIMITIVE_TO_PHYSICAL.values()).union({"INT96"})
+
+
+class StatsAggregator:
+    current_min: Any
+    current_max: Any
+    trunc_length: Optional[int]
+
+    def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, 
trunc_length: Optional[int] = None) -> None:
+        self.current_min = None
+        self.current_max = None
+        self.trunc_length = trunc_length
+
+        if physical_type_string not in _PHYSICAL_TYPES:
+            raise ValueError(f"Unknown physical type {physical_type_string}")
+
+        if physical_type_string == "INT96":
+            raise NotImplementedError("Statistics not implemented for INT96 
physical type")
+
+        expected_physical_type = _PRIMITIVE_TO_PHYSICAL[iceberg_type]
+        if expected_physical_type != physical_type_string:
+            raise ValueError(
+                f"Unexpected physical type {physical_type_string} for 
{iceberg_type}, expected {expected_physical_type}"
+            )
+
+        self.primitive_type = iceberg_type
+
+    def serialize(self, value: Any) -> bytes:
+        if self.primitive_type == UUIDType():
+            value = uuid.UUID(bytes=value)
+
+        return to_bytes(self.primitive_type, value)
+
+    def add_min(self, val: Any) -> None:
+        self.current_min = val if self.current_min is None else min(val, 
self.current_min)
+
+    def add_max(self, val: Any) -> None:
+        self.current_max = val if self.current_max is None else max(val, 
self.current_max)
+
+    def get_min(self) -> bytes:
+        return self.serialize(
+            self.current_min
+            if self.trunc_length is None
+            else 
TruncateTransform(width=self.trunc_length).transform(self.primitive_type)(self.current_min)
+        )
+
+    def get_max(self) -> Optional[bytes]:
+        if self.current_max is None:
+            return None
+
+        if self.primitive_type == StringType():
+            if type(self.current_max) != str:
+                raise ValueError("Expected the current_max to be a string")
+
+            s_result = self.current_max[: self.trunc_length]
+            if s_result != self.current_max:
+                chars = [*s_result]
+
+                for i in range(-1, -len(s_result) - 1, -1):
+                    try:
+                        to_inc = ord(chars[i])
+                        # will raise exception if the highest unicode code is 
reached
+                        _next = chr(to_inc + 1)
+                        chars[i] = _next
+                        return self.serialize("".join(chars))
+                    except ValueError:
+                        pass
+                return None  # didn't find a valid upper bound
+            return self.serialize(s_result)
+        elif self.primitive_type == BinaryType():
+            if type(self.current_max) != bytes:
+                raise ValueError("Expected the current_max to be bytes")
+            b_result = self.current_max[: self.trunc_length]
+            if b_result != self.current_max:
+                _bytes = [*b_result]
+                for i in range(-1, -len(b_result) - 1, -1):
+                    if _bytes[i] < 255:
+                        _bytes[i] += 1
+                        return b"".join([i.to_bytes(1, byteorder="little") for 
i in _bytes])
+                return None
+
+            return self.serialize(b_result)
+        else:
+            return self.serialize(self.current_max)[: self.trunc_length]
+
+
+DEFAULT_TRUNCATION_LENGTH = 16
+TRUNCATION_EXPR = r"^truncate\((\d+)\)$"
+
+
+class MetricModeTypes(Enum):
+    TRUNCATE = "truncate"
+    NONE = "none"
+    COUNTS = "counts"
+    FULL = "full"
+
+
+DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
+COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
+
+
+@dataclass(frozen=True)
+class MetricsMode(Singleton):
+    type: MetricModeTypes
+    length: Optional[int] = None
+
+
+def match_metrics_mode(mode: str) -> MetricsMode:
+    sanitized_mode = mode.lower()
+    if sanitized_mode.startswith("truncate"):
+        m = re.match(TRUNCATION_EXPR, mode, re.IGNORECASE)
+        if m:
+            length = int(m[1])
+            if length < 1:
+                raise ValueError("Truncation length must be larger than 0")
+            return MetricsMode(MetricModeTypes.TRUNCATE, int(m[1]))
+        else:
+            raise ValueError(f"Malformed truncate: {mode}")
+    elif sanitized_mode.startswith("none"):
+        return MetricsMode(MetricModeTypes.NONE)
+    elif sanitized_mode.startswith("counts"):
+        return MetricsMode(MetricModeTypes.COUNTS)
+    elif sanitized_mode.startswith("full"):
+        return MetricsMode(MetricModeTypes.FULL)
+    else:
+        raise ValueError(f"Unsupported metrics mode: {mode}")
+
+
+@dataclass(frozen=True)
+class StatisticsCollector:
+    field_id: int
+    iceberg_type: PrimitiveType
+    mode: MetricsMode
+    column_name: str
+
+
+class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]]):
+    _field_id: int = 0
+    _schema: Schema
+    _properties: Dict[str, str]
+
+    def __init__(self, schema: Schema, properties: Dict[str, str]):
+        self._schema = schema
+        self._properties = properties
+
+    def schema(self, schema: Schema, struct_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        return struct_result()
+
+    def struct(
+        self, struct: StructType, field_results: List[Callable[[], 
List[StatisticsCollector]]]
+    ) -> List[StatisticsCollector]:
+        return list(chain(*[result() for result in field_results]))
+
+    def field(self, field: NestedField, field_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = field.field_id
+        result = field_result()
+        return result
+
+    def list(self, list_type: ListType, element_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = list_type.element_id
+        return element_result()
+
+    def map(
+        self,
+        map_type: MapType,
+        key_result: Callable[[], List[StatisticsCollector]],
+        value_result: Callable[[], List[StatisticsCollector]],
+    ) -> List[StatisticsCollector]:
+        self._field_id = map_type.key_id
+        k = key_result()
+        self._field_id = map_type.value_id
+        v = value_result()
+        return k + v
+
+    def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
+        column_name = self._schema.find_column_name(self._field_id)
+        if column_name is None:
+            raise ValueError(f"Column for field {self._field_id} not found")
+
+        metrics_mode = MetricsMode(MetricModeTypes.TRUNCATE, 
DEFAULT_TRUNCATION_LENGTH)
+
+        default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
+        if default_mode:
+            metrics_mode = match_metrics_mode(default_mode)
+
+        col_mode = 
self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
+        if col_mode:
+            metrics_mode = match_metrics_mode(col_mode)
+
+        if (
+            not (isinstance(primitive, StringType) or isinstance(primitive, 
BinaryType))
+            and metrics_mode.type == MetricModeTypes.TRUNCATE
+        ):
+            metrics_mode = MetricsMode(MetricModeTypes.FULL)
+
+        return [StatisticsCollector(field_id=self._field_id, 
iceberg_type=primitive, mode=metrics_mode, column_name=column_name)]
+
+
+def fill_parquet_file_metadata(
+    df: DataFile,
+    parquet_metadata: pq.FileMetaData,
+    file_size: int,
+    table_metadata: TableMetadata,

Review Comment:
   I agree that we shouldn't pass in the `TableMetadata` here. Since we know 
the write schema, we can just pass that one in.
   
   > Metrics modes can be pre-calculated to produce a map from field ID to 
mode. It makes more sense to do this in a common util method than to mix the 
code into the already complex StatisticsCollector visitor
   
   I'm not sure about this one, we know the order of the columns, so we can 
just rely on that one. We can also have a lookup, but I'm not sure that it 
makes it much simpler than keeping the mode next to the id and type.



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1345,3 +1374,655 @@ def test_pyarrow_wrap_fsspec(example_task: 
FileScanTask, table_schema_simple: Sc
 bar: [[1,2,3]]
 baz: [[true,false,null]]"""
     )
+
+
+def construct_test_table() -> Tuple[Any, Any, Union[TableMetadataV1, 
TableMetadataV2]]:

Review Comment:
   `test_pyarrow_stats.py`?



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -1025,3 +1039,335 @@ def map_key_partner(self, partner_map: 
Optional[pa.Array]) -> Optional[pa.Array]
 
     def map_value_partner(self, partner_map: Optional[pa.Array]) -> 
Optional[pa.Array]:
         return partner_map.items if isinstance(partner_map, pa.MapArray) else 
None
+
+
+_PRIMITIVE_TO_PHYSICAL = {
+    BooleanType(): "BOOLEAN",
+    IntegerType(): "INT32",
+    LongType(): "INT64",
+    FloatType(): "FLOAT",
+    DoubleType(): "DOUBLE",
+    DateType(): "INT32",
+    TimeType(): "INT64",
+    TimestampType(): "INT64",
+    TimestamptzType(): "INT64",
+    StringType(): "BYTE_ARRAY",
+    UUIDType(): "FIXED_LEN_BYTE_ARRAY",
+    BinaryType(): "BYTE_ARRAY",
+}
+_PHYSICAL_TYPES = set(_PRIMITIVE_TO_PHYSICAL.values()).union({"INT96"})
+
+
+class StatsAggregator:
+    current_min: Any
+    current_max: Any
+    trunc_length: Optional[int]
+
+    def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, 
trunc_length: Optional[int] = None) -> None:
+        self.current_min = None
+        self.current_max = None
+        self.trunc_length = trunc_length
+
+        if physical_type_string not in _PHYSICAL_TYPES:
+            raise ValueError(f"Unknown physical type {physical_type_string}")
+
+        if physical_type_string == "INT96":
+            raise NotImplementedError("Statistics not implemented for INT96 
physical type")
+
+        expected_physical_type = _PRIMITIVE_TO_PHYSICAL[iceberg_type]
+        if expected_physical_type != physical_type_string:
+            raise ValueError(
+                f"Unexpected physical type {physical_type_string} for 
{iceberg_type}, expected {expected_physical_type}"
+            )
+
+        self.primitive_type = iceberg_type
+
+    def serialize(self, value: Any) -> bytes:
+        if self.primitive_type == UUIDType():
+            value = uuid.UUID(bytes=value)
+
+        return to_bytes(self.primitive_type, value)
+
+    def add_min(self, val: Any) -> None:
+        self.current_min = val if self.current_min is None else min(val, 
self.current_min)
+
+    def add_max(self, val: Any) -> None:
+        self.current_max = val if self.current_max is None else max(val, 
self.current_max)
+
+    def get_min(self) -> bytes:
+        return self.serialize(
+            self.current_min
+            if self.trunc_length is None
+            else 
TruncateTransform(width=self.trunc_length).transform(self.primitive_type)(self.current_min)
+        )
+
+    def get_max(self) -> Optional[bytes]:
+        if self.current_max is None:
+            return None
+
+        if self.primitive_type == StringType():
+            if type(self.current_max) != str:
+                raise ValueError("Expected the current_max to be a string")
+
+            s_result = self.current_max[: self.trunc_length]
+            if s_result != self.current_max:
+                chars = [*s_result]
+
+                for i in range(-1, -len(s_result) - 1, -1):
+                    try:
+                        to_inc = ord(chars[i])
+                        # will raise exception if the highest unicode code is 
reached
+                        _next = chr(to_inc + 1)
+                        chars[i] = _next
+                        return self.serialize("".join(chars))
+                    except ValueError:
+                        pass
+                return None  # didn't find a valid upper bound
+            return self.serialize(s_result)
+        elif self.primitive_type == BinaryType():
+            if type(self.current_max) != bytes:
+                raise ValueError("Expected the current_max to be bytes")
+            b_result = self.current_max[: self.trunc_length]
+            if b_result != self.current_max:
+                _bytes = [*b_result]
+                for i in range(-1, -len(b_result) - 1, -1):
+                    if _bytes[i] < 255:
+                        _bytes[i] += 1
+                        return b"".join([i.to_bytes(1, byteorder="little") for 
i in _bytes])
+                return None
+
+            return self.serialize(b_result)
+        else:
+            return self.serialize(self.current_max)[: self.trunc_length]
+
+
+DEFAULT_TRUNCATION_LENGTH = 16
+TRUNCATION_EXPR = r"^truncate\((\d+)\)$"
+
+
+class MetricModeTypes(Enum):
+    TRUNCATE = "truncate"
+    NONE = "none"
+    COUNTS = "counts"
+    FULL = "full"
+
+
+DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
+COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
+
+
+@dataclass(frozen=True)
+class MetricsMode(Singleton):
+    type: MetricModeTypes
+    length: Optional[int] = None
+
+
+def match_metrics_mode(mode: str) -> MetricsMode:
+    sanitized_mode = mode.lower()
+    if sanitized_mode.startswith("truncate"):
+        m = re.match(TRUNCATION_EXPR, mode, re.IGNORECASE)
+        if m:
+            length = int(m[1])
+            if length < 1:
+                raise ValueError("Truncation length must be larger than 0")
+            return MetricsMode(MetricModeTypes.TRUNCATE, int(m[1]))
+        else:
+            raise ValueError(f"Malformed truncate: {mode}")
+    elif sanitized_mode.startswith("none"):
+        return MetricsMode(MetricModeTypes.NONE)
+    elif sanitized_mode.startswith("counts"):
+        return MetricsMode(MetricModeTypes.COUNTS)
+    elif sanitized_mode.startswith("full"):
+        return MetricsMode(MetricModeTypes.FULL)
+    else:
+        raise ValueError(f"Unsupported metrics mode: {mode}")
+
+
+@dataclass(frozen=True)
+class StatisticsCollector:
+    field_id: int
+    iceberg_type: PrimitiveType
+    mode: MetricsMode
+    column_name: str
+
+
+class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]]):
+    _field_id: int = 0
+    _schema: Schema
+    _properties: Dict[str, str]
+
+    def __init__(self, schema: Schema, properties: Dict[str, str]):
+        self._schema = schema
+        self._properties = properties
+
+    def schema(self, schema: Schema, struct_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        return struct_result()
+
+    def struct(
+        self, struct: StructType, field_results: List[Callable[[], 
List[StatisticsCollector]]]
+    ) -> List[StatisticsCollector]:
+        return list(chain(*[result() for result in field_results]))
+
+    def field(self, field: NestedField, field_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = field.field_id
+        result = field_result()
+        return result
+
+    def list(self, list_type: ListType, element_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = list_type.element_id
+        return element_result()
+
+    def map(
+        self,
+        map_type: MapType,
+        key_result: Callable[[], List[StatisticsCollector]],
+        value_result: Callable[[], List[StatisticsCollector]],
+    ) -> List[StatisticsCollector]:
+        self._field_id = map_type.key_id
+        k = key_result()
+        self._field_id = map_type.value_id
+        v = value_result()
+        return k + v
+
+    def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
+        column_name = self._schema.find_column_name(self._field_id)
+        if column_name is None:
+            raise ValueError(f"Column for field {self._field_id} not found")
+
+        metrics_mode = MetricsMode(MetricModeTypes.TRUNCATE, 
DEFAULT_TRUNCATION_LENGTH)
+
+        default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
+        if default_mode:
+            metrics_mode = match_metrics_mode(default_mode)
+
+        col_mode = 
self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
+        if col_mode:
+            metrics_mode = match_metrics_mode(col_mode)
+
+        if (
+            not (isinstance(primitive, StringType) or isinstance(primitive, 
BinaryType))
+            and metrics_mode.type == MetricModeTypes.TRUNCATE
+        ):
+            metrics_mode = MetricsMode(MetricModeTypes.FULL)
+
+        return [StatisticsCollector(field_id=self._field_id, 
iceberg_type=primitive, mode=metrics_mode, column_name=column_name)]
+
+
+def fill_parquet_file_metadata(
+    df: DataFile,
+    parquet_metadata: pq.FileMetaData,
+    file_size: int,
+    table_metadata: TableMetadata,
+) -> None:
+    """
+    Computes and fills the following fields of the DataFile object.
+
+    - file_format
+    - record_count
+    - file_size_in_bytes
+    - column_sizes
+    - value_counts
+    - null_value_counts
+    - nan_value_counts
+    - lower_bounds
+    - upper_bounds
+    - split_offsets
+
+    Args:
+        df (DataFile): A DataFile object representing the Parquet file for 
which metadata is to be filled.
+        parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata 
object.
+        file_size (int): The total compressed file size cannot be retrieved 
from the metadata and hence has to
+            be passed here. Depending on the kind of file system and pyarrow 
library call used, different
+            ways to obtain this value might be appropriate.
+        table_metadata (pyiceberg.table.metadata.TableMetadata): The Iceberg 
table metadata. It is required to
+            compute the mapping if column position to iceberg schema type id. 
It's also used to set the mode
+            for column metrics collection
+    """
+    schema = next(filter(lambda s: s.schema_id == 
table_metadata.current_schema_id, table_metadata.schemas))
+
+    stats_columns = pre_order_visit(schema, PyArrowStatisticsCollector(schema, 
table_metadata.properties))
+
+    if parquet_metadata.num_columns != len(stats_columns):
+        raise ValueError(
+            f"Number of columns in metadata ({len(stats_columns)}) is 
different from the number of columns in pyarrow table 
({parquet_metadata.num_columns})"
+        )
+
+    column_sizes: Dict[int, int] = {}
+    value_counts: Dict[int, int] = {}
+    split_offsets: List[int] = []
+
+    null_value_counts: Dict[int, int] = {}
+    nan_value_counts: Dict[int, int] = {}
+
+    col_aggs = {}
+
+    for r in range(parquet_metadata.num_row_groups):
+        # References:
+        # 
https://github.com/apache/iceberg/blob/fc381a81a1fdb8f51a0637ca27cd30673bd7aad3/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L232
+        # 
https://github.com/apache/parquet-mr/blob/ac29db4611f86a07cc6877b416aa4b183e09b353/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java#L184
+
+        row_group = parquet_metadata.row_group(r)
+
+        data_offset = row_group.column(0).data_page_offset
+        dictionary_offset = row_group.column(0).dictionary_page_offset
+
+        if row_group.column(0).has_dictionary_page and dictionary_offset < 
data_offset:
+            split_offsets.append(dictionary_offset)
+        else:
+            split_offsets.append(data_offset)
+
+        for pos, stats_col in enumerate(stats_columns):
+            field_id = stats_col.field_id
+
+            column = row_group.column(pos)
+
+            column_sizes[field_id] = column_sizes.get(field_id, 0) + 
column.total_compressed_size
+
+            if stats_col.mode == MetricsMode(MetricModeTypes.NONE):
+                continue
+
+            value_counts[field_id] = value_counts.get(field_id, 0) + 
column.num_values
+
+            if column.is_stats_set:
+                try:
+                    statistics = column.statistics
+
+                    null_value_counts[field_id] = 
null_value_counts.get(field_id, 0) + statistics.null_count
+
+                    if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS):
+                        continue
+
+                    if field_id not in col_aggs:
+                        col_aggs[field_id] = StatsAggregator(
+                            stats_col.iceberg_type, statistics.physical_type, 
stats_col.mode.length
+                        )
+
+                    col_aggs[field_id].add_min(statistics.min)
+                    col_aggs[field_id].add_max(statistics.max)
+
+                except pyarrow.lib.ArrowNotImplementedError as e:
+                    logger.warning(e)
+            else:
+                logger.warning("PyArrow statistics missing for column %d when 
writing file", pos)
+
+    split_offsets.sort()
+
+    lower_bounds = {}
+    upper_bounds = {}
+
+    for k, agg in col_aggs.items():
+        lower_bounds[k] = agg.get_min()
+        _max = agg.get_max()
+        if _max is not None:
+            upper_bounds[k] = _max
+
+    df.file_format = FileFormat.PARQUET

Review Comment:
   No, I prefer to keep them immutable, but I'm okay with it for now



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1345,3 +1374,655 @@ def test_pyarrow_wrap_fsspec(example_task: 
FileScanTask, table_schema_simple: Sc
 bar: [[1,2,3]]
 baz: [[true,false,null]]"""
     )
+
+
+def construct_test_table() -> Tuple[Any, Any, Union[TableMetadataV1, 
TableMetadataV2]]:
+    table_metadata = {
+        "format-version": 2,
+        "location": "s3://bucket/test/location",
+        "last-column-id": 7,
+        "current-schema-id": 0,
+        "schemas": [
+            {
+                "type": "struct",
+                "schema-id": 0,
+                "fields": [
+                    {"id": 1, "name": "strings", "required": False, "type": 
"string"},
+                    {"id": 2, "name": "floats", "required": False, "type": 
"float"},
+                    {
+                        "id": 3,
+                        "name": "list",
+                        "required": False,
+                        "type": {"type": "list", "element-id": 5, "element": 
"long", "element-required": False},
+                    },
+                    {
+                        "id": 4,
+                        "name": "maps",
+                        "required": False,
+                        "type": {
+                            "type": "map",
+                            "key-id": 6,
+                            "key": "long",
+                            "value-id": 7,
+                            "value": "long",
+                            "value-required": False,
+                        },
+                    },
+                ],
+            },
+        ],
+        "default-spec-id": 0,
+        "partition-specs": [{"spec-id": 0, "fields": []}],
+        "properties": {},
+    }
+
+    table_metadata = TableMetadataUtil.parse_obj(table_metadata)
+    arrow_schema = schema_to_pyarrow(table_metadata.schemas[0])
+
+    _strings = ["zzzzzzzzzzzzzzzzzzzz", "rrrrrrrrrrrrrrrrrrrr", None, 
"aaaaaaaaaaaaaaaaaaaa"]
+
+    _floats = [3.14, math.nan, 1.69, 100]
+
+    _list = [[1, 2, 3], [4, 5, 6], None, [7, 8, 9]]
+
+    _maps: List[Optional[Dict[int, int]]] = [
+        {1: 2, 3: 4},
+        None,
+        {5: 6},
+        {},
+    ]
+
+    table = pa.Table.from_pydict(
+        {
+            "strings": _strings,
+            "floats": _floats,
+            "list": _list,
+            "maps": _maps,
+        },
+        schema=arrow_schema,
+    )
+    metadata_collector: List[Any] = []
+
+    with pa.BufferOutputStream() as f:
+        with pq.ParquetWriter(f, table.schema, 
metadata_collector=metadata_collector) as writer:
+            writer.write_table(table)
+
+        return f.getvalue(), metadata_collector[0], table_metadata
+
+
+def test_record_count() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert datafile.record_count == 4
+
+
+def test_file_size() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert datafile.file_size_in_bytes == len(file_bytes)
+
+
+def test_value_counts() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert len(datafile.value_counts) == 5
+    assert datafile.value_counts[1] == 4
+    assert datafile.value_counts[2] == 4
+    assert datafile.value_counts[5] == 10  # 3 lists with 3 items and a None 
value
+    assert datafile.value_counts[6] == 5
+    assert datafile.value_counts[7] == 5
+
+
+def test_column_sizes() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert len(datafile.column_sizes) == 5
+    # these values are an artifact of how the write_table encodes the columns
+    assert datafile.column_sizes[1] == 116
+    assert datafile.column_sizes[2] == 89
+    assert datafile.column_sizes[5] == 151
+    assert datafile.column_sizes[6] == 117
+    assert datafile.column_sizes[7] == 117
+
+
+def test_null_and_nan_counts() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert len(datafile.null_value_counts) == 5
+    assert datafile.null_value_counts[1] == 1
+    assert datafile.null_value_counts[2] == 0
+    assert datafile.null_value_counts[5] == 1

Review Comment:
   Type:
   ```json
   {"type": "list", "element-id": 5, "element": "long", "element-required": 
False}
   ```
   And the data:
   ```python
   _list = [[1, 2, 3], [4, 5, 6], None, [7, 8, 9]]
   ```
   I count a single `null`



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -1025,3 +1039,344 @@ def map_key_partner(self, partner_map: 
Optional[pa.Array]) -> Optional[pa.Array]
 
     def map_value_partner(self, partner_map: Optional[pa.Array]) -> 
Optional[pa.Array]:
         return partner_map.items if isinstance(partner_map, pa.MapArray) else 
None
+
+
+_PRIMITIVE_TO_PHYSICAL = {
+    BooleanType(): "BOOLEAN",
+    IntegerType(): "INT32",
+    LongType(): "INT64",
+    FloatType(): "FLOAT",
+    DoubleType(): "DOUBLE",
+    DateType(): "INT32",
+    TimeType(): "INT64",
+    TimestampType(): "INT64",
+    TimestamptzType(): "INT64",
+    StringType(): "BYTE_ARRAY",
+    UUIDType(): "FIXED_LEN_BYTE_ARRAY",
+    BinaryType(): "BYTE_ARRAY",
+}
+_PHYSICAL_TYPES = set(_PRIMITIVE_TO_PHYSICAL.values()).union({"INT96"})
+
+
+class StatsAggregator:
+    current_min: Any
+    current_max: Any
+    trunc_length: Optional[int]
+
+    def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, 
trunc_length: Optional[int] = None) -> None:
+        self.current_min = None
+        self.current_max = None
+        self.trunc_length = trunc_length
+
+        if physical_type_string not in _PHYSICAL_TYPES:
+            raise ValueError(f"Unknown physical type {physical_type_string}")
+
+        if physical_type_string == "INT96":
+            raise NotImplementedError("Statistics not implemented for INT96 
physical type")
+
+        expected_physical_type = _PRIMITIVE_TO_PHYSICAL[iceberg_type]
+        if expected_physical_type != physical_type_string:
+            raise ValueError(
+                f"Unexpected physical type {physical_type_string} for 
{iceberg_type}, expected {expected_physical_type}"
+            )
+
+        self.primitive_type = iceberg_type
+
+    def serialize(self, value: Any) -> bytes:
+        if self.primitive_type == UUIDType():
+            value = uuid.UUID(bytes=value)
+
+        return to_bytes(self.primitive_type, value)
+
+    def update_min(self, val: Any) -> None:
+        self.current_min = val if self.current_min is None else min(val, 
self.current_min)
+
+    def update_max(self, val: Any) -> None:
+        self.current_max = val if self.current_max is None else max(val, 
self.current_max)
+
+    def min_as_bytes(self) -> bytes:
+        return self.serialize(
+            self.current_min
+            if self.trunc_length is None
+            else 
TruncateTransform(width=self.trunc_length).transform(self.primitive_type)(self.current_min)
+        )
+
+    def max_as_bytes(self) -> Optional[bytes]:
+        if self.current_max is None:
+            return None
+
+        if self.primitive_type == StringType():
+            if type(self.current_max) != str:
+                raise ValueError("Expected the current_max to be a string")
+
+            s_result = self.current_max[: self.trunc_length]
+            if s_result != self.current_max:
+                chars = [*s_result]
+
+                for i in range(-1, -len(s_result) - 1, -1):
+                    try:
+                        to_inc = ord(chars[i])
+                        # will raise exception if the highest unicode code is 
reached
+                        _next = chr(to_inc + 1)
+                        chars[i] = _next
+                        return self.serialize("".join(chars))
+                    except ValueError:
+                        pass
+                return None  # didn't find a valid upper bound
+            return self.serialize(s_result)
+        elif self.primitive_type == BinaryType():
+            if type(self.current_max) != bytes:
+                raise ValueError("Expected the current_max to be bytes")
+            b_result = self.current_max[: self.trunc_length]
+            if b_result != self.current_max:
+                _bytes = [*b_result]
+                for i in range(-1, -len(b_result) - 1, -1):
+                    if _bytes[i] < 255:
+                        _bytes[i] += 1
+                        return b"".join([i.to_bytes(1, byteorder="little") for 
i in _bytes])
+                return None
+
+            return self.serialize(b_result)
+        else:
+            if self.trunc_length is not None:
+                raise ValueError(f"{self.primitive_type} cannot be truncated")
+            return self.serialize(self.current_max)
+
+
+DEFAULT_TRUNCATION_LENGTH = 16
+TRUNCATION_EXPR = r"^truncate\((\d+)\)$"
+
+
+class MetricModeTypes(Enum):
+    TRUNCATE = "truncate"
+    NONE = "none"
+    COUNTS = "counts"
+    FULL = "full"
+
+
+DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
+COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
+
+
+@dataclass(frozen=True)
+class MetricsMode(Singleton):
+    type: MetricModeTypes
+    length: Optional[int] = None
+
+
+_DEFAULT_METRICS_MODE = MetricsMode(MetricModeTypes.TRUNCATE, 
DEFAULT_TRUNCATION_LENGTH)
+
+
+def match_metrics_mode(mode: str) -> MetricsMode:
+    sanitized_mode = mode.lower()
+    if sanitized_mode.startswith("truncate"):
+        m = re.match(TRUNCATION_EXPR, sanitized_mode)
+        if m:
+            length = int(m[1])
+            if length < 1:
+                raise ValueError("Truncation length must be larger than 0")
+            return MetricsMode(MetricModeTypes.TRUNCATE, int(m[1]))
+        else:
+            raise ValueError(f"Malformed truncate: {mode}")
+    elif sanitized_mode.startswith("none"):
+        return MetricsMode(MetricModeTypes.NONE)
+    elif sanitized_mode.startswith("counts"):
+        return MetricsMode(MetricModeTypes.COUNTS)
+    elif sanitized_mode.startswith("full"):
+        return MetricsMode(MetricModeTypes.FULL)
+    else:
+        raise ValueError(f"Unsupported metrics mode: {mode}")
+
+
+@dataclass(frozen=True)
+class StatisticsCollector:
+    field_id: int
+    iceberg_type: PrimitiveType
+    mode: MetricsMode
+    column_name: str
+
+
+class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]]):
+    _field_id: int = 0
+    _schema: Schema
+    _properties: Dict[str, str]
+    _default_mode: Optional[str]
+
+    def __init__(self, schema: Schema, properties: Dict[str, str]):
+        self._schema = schema
+        self._properties = properties
+        self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
+
+    def schema(self, schema: Schema, struct_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        return struct_result()
+
+    def struct(
+        self, struct: StructType, field_results: List[Callable[[], 
List[StatisticsCollector]]]
+    ) -> List[StatisticsCollector]:
+        return list(chain(*[result() for result in field_results]))
+
+    def field(self, field: NestedField, field_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = field.field_id
+        result = field_result()
+        return result
+
+    def list(self, list_type: ListType, element_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = list_type.element_id
+        return element_result()
+
+    def map(
+        self,
+        map_type: MapType,
+        key_result: Callable[[], List[StatisticsCollector]],
+        value_result: Callable[[], List[StatisticsCollector]],
+    ) -> List[StatisticsCollector]:
+        self._field_id = map_type.key_id
+        k = key_result()
+        self._field_id = map_type.value_id
+        v = value_result()
+        return k + v
+
+    def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
+        column_name = self._schema.find_column_name(self._field_id)
+        if column_name is None:
+            return []
+
+        metrics_mode = _DEFAULT_METRICS_MODE
+
+        if self._default_mode:
+            metrics_mode = match_metrics_mode(self._default_mode)
+
+        col_mode = 
self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
+        if col_mode:
+            metrics_mode = match_metrics_mode(col_mode)
+
+        if (
+            not (isinstance(primitive, StringType) or isinstance(primitive, 
BinaryType))
+            and metrics_mode.type == MetricModeTypes.TRUNCATE
+        ):
+            metrics_mode = MetricsMode(MetricModeTypes.FULL)
+
+        return [StatisticsCollector(field_id=self._field_id, 
iceberg_type=primitive, mode=metrics_mode, column_name=column_name)]
+
+
+def fill_parquet_file_metadata(

Review Comment:
   Let's make sure that this isn't being used outside of the module.
   ```suggestion
   def _fill_parquet_file_metadata(
   ```



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1345,3 +1374,655 @@ def test_pyarrow_wrap_fsspec(example_task: 
FileScanTask, table_schema_simple: Sc
 bar: [[1,2,3]]
 baz: [[true,false,null]]"""
     )
+
+
+def construct_test_table() -> Tuple[Any, Any, Union[TableMetadataV1, 
TableMetadataV2]]:
+    table_metadata = {
+        "format-version": 2,
+        "location": "s3://bucket/test/location",
+        "last-column-id": 7,
+        "current-schema-id": 0,
+        "schemas": [
+            {
+                "type": "struct",
+                "schema-id": 0,
+                "fields": [
+                    {"id": 1, "name": "strings", "required": False, "type": 
"string"},
+                    {"id": 2, "name": "floats", "required": False, "type": 
"float"},
+                    {
+                        "id": 3,
+                        "name": "list",
+                        "required": False,
+                        "type": {"type": "list", "element-id": 5, "element": 
"long", "element-required": False},
+                    },
+                    {
+                        "id": 4,
+                        "name": "maps",
+                        "required": False,
+                        "type": {
+                            "type": "map",
+                            "key-id": 6,
+                            "key": "long",
+                            "value-id": 7,
+                            "value": "long",
+                            "value-required": False,
+                        },
+                    },
+                ],
+            },
+        ],
+        "default-spec-id": 0,
+        "partition-specs": [{"spec-id": 0, "fields": []}],
+        "properties": {},
+    }
+
+    table_metadata = TableMetadataUtil.parse_obj(table_metadata)
+    arrow_schema = schema_to_pyarrow(table_metadata.schemas[0])
+
+    _strings = ["zzzzzzzzzzzzzzzzzzzz", "rrrrrrrrrrrrrrrrrrrr", None, 
"aaaaaaaaaaaaaaaaaaaa"]
+
+    _floats = [3.14, math.nan, 1.69, 100]
+
+    _list = [[1, 2, 3], [4, 5, 6], None, [7, 8, 9]]
+
+    _maps: List[Optional[Dict[int, int]]] = [
+        {1: 2, 3: 4},
+        None,
+        {5: 6},
+        {},
+    ]
+
+    table = pa.Table.from_pydict(
+        {
+            "strings": _strings,
+            "floats": _floats,
+            "list": _list,
+            "maps": _maps,
+        },
+        schema=arrow_schema,
+    )
+    metadata_collector: List[Any] = []
+
+    with pa.BufferOutputStream() as f:
+        with pq.ParquetWriter(f, table.schema, 
metadata_collector=metadata_collector) as writer:
+            writer.write_table(table)
+
+        return f.getvalue(), metadata_collector[0], table_metadata
+
+
+def test_record_count() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert datafile.record_count == 4
+
+
+def test_file_size() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert datafile.file_size_in_bytes == len(file_bytes)
+
+
+def test_value_counts() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert len(datafile.value_counts) == 5
+    assert datafile.value_counts[1] == 4
+    assert datafile.value_counts[2] == 4
+    assert datafile.value_counts[5] == 10  # 3 lists with 3 items and a None 
value
+    assert datafile.value_counts[6] == 5
+    assert datafile.value_counts[7] == 5
+
+
+def test_column_sizes() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert len(datafile.column_sizes) == 5
+    # these values are an artifact of how the write_table encodes the columns
+    assert datafile.column_sizes[1] == 116
+    assert datafile.column_sizes[2] == 89
+    assert datafile.column_sizes[5] == 151
+    assert datafile.column_sizes[6] == 117
+    assert datafile.column_sizes[7] == 117
+
+
+def test_null_and_nan_counts() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert len(datafile.null_value_counts) == 5
+    assert datafile.null_value_counts[1] == 1
+    assert datafile.null_value_counts[2] == 0
+    assert datafile.null_value_counts[5] == 1
+    assert datafile.null_value_counts[6] == 2
+    assert datafile.null_value_counts[7] == 2
+
+    # #arrow does not include this in the statistics
+    # assert len(datafile.nan_value_counts)  == 3
+    # assert datafile.nan_value_counts[1]    == 0

Review Comment:
   I was just wondering. The evaluator is not taking those into consideration 
for any not `float` or `double` type, any reason to still add those?



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -1025,3 +1039,335 @@ def map_key_partner(self, partner_map: 
Optional[pa.Array]) -> Optional[pa.Array]
 
     def map_value_partner(self, partner_map: Optional[pa.Array]) -> 
Optional[pa.Array]:
         return partner_map.items if isinstance(partner_map, pa.MapArray) else 
None
+
+
+_PRIMITIVE_TO_PHYSICAL = {
+    BooleanType(): "BOOLEAN",
+    IntegerType(): "INT32",
+    LongType(): "INT64",
+    FloatType(): "FLOAT",
+    DoubleType(): "DOUBLE",
+    DateType(): "INT32",
+    TimeType(): "INT64",
+    TimestampType(): "INT64",
+    TimestamptzType(): "INT64",
+    StringType(): "BYTE_ARRAY",
+    UUIDType(): "FIXED_LEN_BYTE_ARRAY",
+    BinaryType(): "BYTE_ARRAY",
+}
+_PHYSICAL_TYPES = set(_PRIMITIVE_TO_PHYSICAL.values()).union({"INT96"})
+
+
+class StatsAggregator:
+    current_min: Any
+    current_max: Any
+    trunc_length: Optional[int]
+
+    def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, 
trunc_length: Optional[int] = None) -> None:
+        self.current_min = None
+        self.current_max = None
+        self.trunc_length = trunc_length
+
+        if physical_type_string not in _PHYSICAL_TYPES:
+            raise ValueError(f"Unknown physical type {physical_type_string}")
+
+        if physical_type_string == "INT96":
+            raise NotImplementedError("Statistics not implemented for INT96 
physical type")
+
+        expected_physical_type = _PRIMITIVE_TO_PHYSICAL[iceberg_type]
+        if expected_physical_type != physical_type_string:
+            raise ValueError(
+                f"Unexpected physical type {physical_type_string} for 
{iceberg_type}, expected {expected_physical_type}"
+            )
+
+        self.primitive_type = iceberg_type
+
+    def serialize(self, value: Any) -> bytes:
+        if self.primitive_type == UUIDType():
+            value = uuid.UUID(bytes=value)
+
+        return to_bytes(self.primitive_type, value)
+
+    def add_min(self, val: Any) -> None:
+        self.current_min = val if self.current_min is None else min(val, 
self.current_min)
+
+    def add_max(self, val: Any) -> None:
+        self.current_max = val if self.current_max is None else max(val, 
self.current_max)
+
+    def get_min(self) -> bytes:
+        return self.serialize(
+            self.current_min
+            if self.trunc_length is None
+            else 
TruncateTransform(width=self.trunc_length).transform(self.primitive_type)(self.current_min)
+        )
+
+    def get_max(self) -> Optional[bytes]:
+        if self.current_max is None:
+            return None
+
+        if self.primitive_type == StringType():
+            if type(self.current_max) != str:
+                raise ValueError("Expected the current_max to be a string")
+
+            s_result = self.current_max[: self.trunc_length]
+            if s_result != self.current_max:
+                chars = [*s_result]
+
+                for i in range(-1, -len(s_result) - 1, -1):
+                    try:
+                        to_inc = ord(chars[i])
+                        # will raise exception if the highest unicode code is 
reached
+                        _next = chr(to_inc + 1)
+                        chars[i] = _next
+                        return self.serialize("".join(chars))
+                    except ValueError:
+                        pass
+                return None  # didn't find a valid upper bound
+            return self.serialize(s_result)
+        elif self.primitive_type == BinaryType():
+            if type(self.current_max) != bytes:
+                raise ValueError("Expected the current_max to be bytes")
+            b_result = self.current_max[: self.trunc_length]
+            if b_result != self.current_max:
+                _bytes = [*b_result]
+                for i in range(-1, -len(b_result) - 1, -1):
+                    if _bytes[i] < 255:
+                        _bytes[i] += 1
+                        return b"".join([i.to_bytes(1, byteorder="little") for 
i in _bytes])
+                return None
+
+            return self.serialize(b_result)
+        else:
+            return self.serialize(self.current_max)[: self.trunc_length]
+
+
+DEFAULT_TRUNCATION_LENGTH = 16
+TRUNCATION_EXPR = r"^truncate\((\d+)\)$"
+
+
+class MetricModeTypes(Enum):
+    TRUNCATE = "truncate"
+    NONE = "none"
+    COUNTS = "counts"
+    FULL = "full"
+
+
+DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
+COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
+
+
+@dataclass(frozen=True)
+class MetricsMode(Singleton):
+    type: MetricModeTypes
+    length: Optional[int] = None
+
+
+def match_metrics_mode(mode: str) -> MetricsMode:
+    sanitized_mode = mode.lower()
+    if sanitized_mode.startswith("truncate"):
+        m = re.match(TRUNCATION_EXPR, mode, re.IGNORECASE)
+        if m:
+            length = int(m[1])
+            if length < 1:
+                raise ValueError("Truncation length must be larger than 0")
+            return MetricsMode(MetricModeTypes.TRUNCATE, int(m[1]))
+        else:
+            raise ValueError(f"Malformed truncate: {mode}")
+    elif sanitized_mode.startswith("none"):
+        return MetricsMode(MetricModeTypes.NONE)
+    elif sanitized_mode.startswith("counts"):
+        return MetricsMode(MetricModeTypes.COUNTS)
+    elif sanitized_mode.startswith("full"):
+        return MetricsMode(MetricModeTypes.FULL)
+    else:
+        raise ValueError(f"Unsupported metrics mode: {mode}")
+
+
+@dataclass(frozen=True)
+class StatisticsCollector:
+    field_id: int
+    iceberg_type: PrimitiveType
+    mode: MetricsMode
+    column_name: str
+
+
+class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]]):
+    _field_id: int = 0
+    _schema: Schema
+    _properties: Dict[str, str]
+
+    def __init__(self, schema: Schema, properties: Dict[str, str]):
+        self._schema = schema
+        self._properties = properties
+
+    def schema(self, schema: Schema, struct_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        return struct_result()
+
+    def struct(
+        self, struct: StructType, field_results: List[Callable[[], 
List[StatisticsCollector]]]
+    ) -> List[StatisticsCollector]:
+        return list(chain(*[result() for result in field_results]))
+
+    def field(self, field: NestedField, field_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = field.field_id
+        result = field_result()
+        return result
+
+    def list(self, list_type: ListType, element_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = list_type.element_id
+        return element_result()
+
+    def map(
+        self,
+        map_type: MapType,
+        key_result: Callable[[], List[StatisticsCollector]],
+        value_result: Callable[[], List[StatisticsCollector]],
+    ) -> List[StatisticsCollector]:
+        self._field_id = map_type.key_id
+        k = key_result()
+        self._field_id = map_type.value_id
+        v = value_result()
+        return k + v
+
+    def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
+        column_name = self._schema.find_column_name(self._field_id)
+        if column_name is None:
+            raise ValueError(f"Column for field {self._field_id} not found")
+
+        metrics_mode = MetricsMode(MetricModeTypes.TRUNCATE, 
DEFAULT_TRUNCATION_LENGTH)
+
+        default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
+        if default_mode:
+            metrics_mode = match_metrics_mode(default_mode)
+
+        col_mode = 
self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
+        if col_mode:
+            metrics_mode = match_metrics_mode(col_mode)
+
+        if (
+            not (isinstance(primitive, StringType) or isinstance(primitive, 
BinaryType))
+            and metrics_mode.type == MetricModeTypes.TRUNCATE
+        ):
+            metrics_mode = MetricsMode(MetricModeTypes.FULL)
+
+        return [StatisticsCollector(field_id=self._field_id, 
iceberg_type=primitive, mode=metrics_mode, column_name=column_name)]
+
+
+def fill_parquet_file_metadata(
+    df: DataFile,
+    parquet_metadata: pq.FileMetaData,
+    file_size: int,
+    table_metadata: TableMetadata,
+) -> None:
+    """
+    Computes and fills the following fields of the DataFile object.
+
+    - file_format
+    - record_count
+    - file_size_in_bytes
+    - column_sizes
+    - value_counts
+    - null_value_counts
+    - nan_value_counts
+    - lower_bounds
+    - upper_bounds
+    - split_offsets
+
+    Args:
+        df (DataFile): A DataFile object representing the Parquet file for 
which metadata is to be filled.
+        parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata 
object.
+        file_size (int): The total compressed file size cannot be retrieved 
from the metadata and hence has to
+            be passed here. Depending on the kind of file system and pyarrow 
library call used, different
+            ways to obtain this value might be appropriate.
+        table_metadata (pyiceberg.table.metadata.TableMetadata): The Iceberg 
table metadata. It is required to
+            compute the mapping if column position to iceberg schema type id. 
It's also used to set the mode
+            for column metrics collection
+    """
+    schema = next(filter(lambda s: s.schema_id == 
table_metadata.current_schema_id, table_metadata.schemas))
+
+    stats_columns = pre_order_visit(schema, PyArrowStatisticsCollector(schema, 
table_metadata.properties))
+
+    if parquet_metadata.num_columns != len(stats_columns):

Review Comment:
   I think we should leave this one in, for now, I think they are always the 
same, but when this is not the case, then we should be notified



##########
python/tests/io/test_pyarrow.py:
##########
@@ -16,16 +16,39 @@
 # under the License.
 # pylint: disable=protected-access,unused-argument,redefined-outer-name
 
+import math
 import os
 import tempfile
-from typing import Any, List, Optional
+import uuid
+from datetime import (

Review Comment:
   We use this for PyArrow which has the datetime classes in their public API.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -1025,3 +1039,335 @@ def map_key_partner(self, partner_map: 
Optional[pa.Array]) -> Optional[pa.Array]
 
     def map_value_partner(self, partner_map: Optional[pa.Array]) -> 
Optional[pa.Array]:
         return partner_map.items if isinstance(partner_map, pa.MapArray) else 
None
+
+
+_PRIMITIVE_TO_PHYSICAL = {
+    BooleanType(): "BOOLEAN",
+    IntegerType(): "INT32",
+    LongType(): "INT64",
+    FloatType(): "FLOAT",
+    DoubleType(): "DOUBLE",
+    DateType(): "INT32",
+    TimeType(): "INT64",
+    TimestampType(): "INT64",
+    TimestamptzType(): "INT64",
+    StringType(): "BYTE_ARRAY",
+    UUIDType(): "FIXED_LEN_BYTE_ARRAY",
+    BinaryType(): "BYTE_ARRAY",
+}
+_PHYSICAL_TYPES = set(_PRIMITIVE_TO_PHYSICAL.values()).union({"INT96"})
+
+
+class StatsAggregator:
+    current_min: Any
+    current_max: Any
+    trunc_length: Optional[int]
+
+    def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, 
trunc_length: Optional[int] = None) -> None:
+        self.current_min = None
+        self.current_max = None
+        self.trunc_length = trunc_length
+
+        if physical_type_string not in _PHYSICAL_TYPES:
+            raise ValueError(f"Unknown physical type {physical_type_string}")
+
+        if physical_type_string == "INT96":
+            raise NotImplementedError("Statistics not implemented for INT96 
physical type")
+
+        expected_physical_type = _PRIMITIVE_TO_PHYSICAL[iceberg_type]
+        if expected_physical_type != physical_type_string:
+            raise ValueError(
+                f"Unexpected physical type {physical_type_string} for 
{iceberg_type}, expected {expected_physical_type}"
+            )
+
+        self.primitive_type = iceberg_type
+
+    def serialize(self, value: Any) -> bytes:
+        if self.primitive_type == UUIDType():
+            value = uuid.UUID(bytes=value)
+
+        return to_bytes(self.primitive_type, value)
+
+    def add_min(self, val: Any) -> None:
+        self.current_min = val if self.current_min is None else min(val, 
self.current_min)
+
+    def add_max(self, val: Any) -> None:
+        self.current_max = val if self.current_max is None else max(val, 
self.current_max)
+
+    def get_min(self) -> bytes:
+        return self.serialize(
+            self.current_min
+            if self.trunc_length is None
+            else 
TruncateTransform(width=self.trunc_length).transform(self.primitive_type)(self.current_min)
+        )
+
+    def get_max(self) -> Optional[bytes]:
+        if self.current_max is None:
+            return None
+
+        if self.primitive_type == StringType():
+            if type(self.current_max) != str:
+                raise ValueError("Expected the current_max to be a string")
+
+            s_result = self.current_max[: self.trunc_length]
+            if s_result != self.current_max:
+                chars = [*s_result]
+
+                for i in range(-1, -len(s_result) - 1, -1):
+                    try:
+                        to_inc = ord(chars[i])
+                        # will raise exception if the highest unicode code is 
reached
+                        _next = chr(to_inc + 1)
+                        chars[i] = _next
+                        return self.serialize("".join(chars))
+                    except ValueError:
+                        pass
+                return None  # didn't find a valid upper bound
+            return self.serialize(s_result)
+        elif self.primitive_type == BinaryType():
+            if type(self.current_max) != bytes:
+                raise ValueError("Expected the current_max to be bytes")
+            b_result = self.current_max[: self.trunc_length]
+            if b_result != self.current_max:
+                _bytes = [*b_result]
+                for i in range(-1, -len(b_result) - 1, -1):
+                    if _bytes[i] < 255:
+                        _bytes[i] += 1
+                        return b"".join([i.to_bytes(1, byteorder="little") for 
i in _bytes])
+                return None
+
+            return self.serialize(b_result)
+        else:
+            return self.serialize(self.current_max)[: self.trunc_length]
+
+
+DEFAULT_TRUNCATION_LENGTH = 16
+TRUNCATION_EXPR = r"^truncate\((\d+)\)$"
+
+
+class MetricModeTypes(Enum):
+    TRUNCATE = "truncate"
+    NONE = "none"
+    COUNTS = "counts"
+    FULL = "full"
+
+
+DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
+COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
+
+
+@dataclass(frozen=True)
+class MetricsMode(Singleton):
+    type: MetricModeTypes
+    length: Optional[int] = None
+
+
+def match_metrics_mode(mode: str) -> MetricsMode:
+    sanitized_mode = mode.lower()
+    if sanitized_mode.startswith("truncate"):
+        m = re.match(TRUNCATION_EXPR, mode, re.IGNORECASE)
+        if m:
+            length = int(m[1])
+            if length < 1:
+                raise ValueError("Truncation length must be larger than 0")
+            return MetricsMode(MetricModeTypes.TRUNCATE, int(m[1]))
+        else:
+            raise ValueError(f"Malformed truncate: {mode}")
+    elif sanitized_mode.startswith("none"):
+        return MetricsMode(MetricModeTypes.NONE)
+    elif sanitized_mode.startswith("counts"):
+        return MetricsMode(MetricModeTypes.COUNTS)
+    elif sanitized_mode.startswith("full"):
+        return MetricsMode(MetricModeTypes.FULL)
+    else:
+        raise ValueError(f"Unsupported metrics mode: {mode}")
+
+
+@dataclass(frozen=True)
+class StatisticsCollector:
+    field_id: int
+    iceberg_type: PrimitiveType
+    mode: MetricsMode
+    column_name: str
+
+
+class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]]):
+    _field_id: int = 0
+    _schema: Schema
+    _properties: Dict[str, str]
+
+    def __init__(self, schema: Schema, properties: Dict[str, str]):
+        self._schema = schema
+        self._properties = properties
+
+    def schema(self, schema: Schema, struct_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        return struct_result()
+
+    def struct(
+        self, struct: StructType, field_results: List[Callable[[], 
List[StatisticsCollector]]]
+    ) -> List[StatisticsCollector]:
+        return list(chain(*[result() for result in field_results]))
+
+    def field(self, field: NestedField, field_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = field.field_id
+        result = field_result()
+        return result
+
+    def list(self, list_type: ListType, element_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
+        self._field_id = list_type.element_id
+        return element_result()
+
+    def map(
+        self,
+        map_type: MapType,
+        key_result: Callable[[], List[StatisticsCollector]],
+        value_result: Callable[[], List[StatisticsCollector]],
+    ) -> List[StatisticsCollector]:
+        self._field_id = map_type.key_id
+        k = key_result()
+        self._field_id = map_type.value_id
+        v = value_result()
+        return k + v
+
+    def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
+        column_name = self._schema.find_column_name(self._field_id)
+        if column_name is None:
+            raise ValueError(f"Column for field {self._field_id} not found")
+
+        metrics_mode = MetricsMode(MetricModeTypes.TRUNCATE, 
DEFAULT_TRUNCATION_LENGTH)
+
+        default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
+        if default_mode:
+            metrics_mode = match_metrics_mode(default_mode)
+
+        col_mode = 
self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
+        if col_mode:
+            metrics_mode = match_metrics_mode(col_mode)
+
+        if (
+            not (isinstance(primitive, StringType) or isinstance(primitive, 
BinaryType))
+            and metrics_mode.type == MetricModeTypes.TRUNCATE
+        ):
+            metrics_mode = MetricsMode(MetricModeTypes.FULL)
+
+        return [StatisticsCollector(field_id=self._field_id, 
iceberg_type=primitive, mode=metrics_mode, column_name=column_name)]
+
+
+def fill_parquet_file_metadata(
+    df: DataFile,
+    parquet_metadata: pq.FileMetaData,
+    file_size: int,
+    table_metadata: TableMetadata,

Review Comment:
   We could also just pass in the `stats_columns`:
   ```suggestion
       stats_columns: List[StatisticsCollector]
   ```



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1345,3 +1374,655 @@ def test_pyarrow_wrap_fsspec(example_task: 
FileScanTask, table_schema_simple: Sc
 bar: [[1,2,3]]
 baz: [[true,false,null]]"""
     )
+
+
+def construct_test_table() -> Tuple[Any, Any, Union[TableMetadataV1, 
TableMetadataV2]]:
+    table_metadata = {
+        "format-version": 2,
+        "location": "s3://bucket/test/location",
+        "last-column-id": 7,
+        "current-schema-id": 0,
+        "schemas": [
+            {
+                "type": "struct",
+                "schema-id": 0,
+                "fields": [
+                    {"id": 1, "name": "strings", "required": False, "type": 
"string"},
+                    {"id": 2, "name": "floats", "required": False, "type": 
"float"},
+                    {
+                        "id": 3,
+                        "name": "list",
+                        "required": False,
+                        "type": {"type": "list", "element-id": 5, "element": 
"long", "element-required": False},
+                    },
+                    {
+                        "id": 4,
+                        "name": "maps",
+                        "required": False,
+                        "type": {
+                            "type": "map",
+                            "key-id": 6,
+                            "key": "long",
+                            "value-id": 7,
+                            "value": "long",
+                            "value-required": False,
+                        },
+                    },
+                ],
+            },
+        ],
+        "default-spec-id": 0,
+        "partition-specs": [{"spec-id": 0, "fields": []}],
+        "properties": {},
+    }
+
+    table_metadata = TableMetadataUtil.parse_obj(table_metadata)
+    arrow_schema = schema_to_pyarrow(table_metadata.schemas[0])
+
+    _strings = ["zzzzzzzzzzzzzzzzzzzz", "rrrrrrrrrrrrrrrrrrrr", None, 
"aaaaaaaaaaaaaaaaaaaa"]
+
+    _floats = [3.14, math.nan, 1.69, 100]
+
+    _list = [[1, 2, 3], [4, 5, 6], None, [7, 8, 9]]
+
+    _maps: List[Optional[Dict[int, int]]] = [
+        {1: 2, 3: 4},
+        None,
+        {5: 6},
+        {},
+    ]
+
+    table = pa.Table.from_pydict(
+        {
+            "strings": _strings,
+            "floats": _floats,
+            "list": _list,
+            "maps": _maps,
+        },
+        schema=arrow_schema,
+    )
+    metadata_collector: List[Any] = []
+
+    with pa.BufferOutputStream() as f:
+        with pq.ParquetWriter(f, table.schema, 
metadata_collector=metadata_collector) as writer:
+            writer.write_table(table)
+
+        return f.getvalue(), metadata_collector[0], table_metadata
+
+
+def test_record_count() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert datafile.record_count == 4
+
+
+def test_file_size() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert datafile.file_size_in_bytes == len(file_bytes)
+
+
+def test_value_counts() -> None:
+    (file_bytes, metadata, table_metadata) = construct_test_table()
+
+    datafile = DataFile()
+    fill_parquet_file_metadata(datafile, metadata, len(file_bytes), 
table_metadata)
+
+    assert len(datafile.value_counts) == 5
+    assert datafile.value_counts[1] == 4
+    assert datafile.value_counts[2] == 4
+    assert datafile.value_counts[5] == 10  # 3 lists with 3 items and a None 
value
+    assert datafile.value_counts[6] == 5

Review Comment:
   With Spark:
   
   ```sql
   CREATE TABLE nyc.test_map_maps2 AS 
   SELECT map_from_arrays(array(1.0, 3.0), array('2', '4')) as map, array('a', 
'b', 'c') as arr
   ```
   
   Schema:
   ```json
   {
        "type": "struct",
        "schema-id": 0,
        "fields": [{
                "id": 1,
                "name": "map",
                "required": false,
                "type": {
                        "type": "map",
                        "key-id": 3,
                        "key": "decimal(2, 1)",
                        "value-id": 4,
                        "value": "string",
                        "value-required": false
                }
        }, {
                "id": 2,
                "name": "arr",
                "required": false,
                "type": {
                        "type": "list",
                        "element-id": 5,
                        "element": "string",
                        "element-required": false
                }
        }]
   }
   ```
   
   We don't get any stats in Spark:
   ```json
   {
        "status": 1,
        "snapshot_id": {
                "long": 4895801649705337905
        },
        "data_file": {
                "file_path": 
"s3://warehouse/nyc/test_map_maps/data/00000-95-750d8f3e-8d49-44ec-b37e-9e101e003a5d-00001.parquet",
                "file_format": "PARQUET",
                "partition": {},
                "record_count": 1,
                "file_size_in_bytes": 1438,
                "block_size_in_bytes": 67108864,
                "column_sizes": {
                        "array": [{
                                "key": 3,
                                "value": 57
                        }, {
                                "key": 4,
                                "value": 58
                        }, {
                                "key": 5,
                                "value": 61
                        }]
                },
                "value_counts": {
                        "array": [{
                                "key": 3,
                                "value": 2
                        }, {
                                "key": 4,
                                "value": 2
                        }, {
                                "key": 5,
                                "value": 3
                        }]
                },
                "null_value_counts": {
                        "array": [{
                                "key": 3,
                                "value": 0
                        }, {
                                "key": 4,
                                "value": 0
                        }, {
                                "key": 5,
                                "value": 0
                        }]
                },
                "nan_value_counts": {
                        "array": []
                },
                "lower_bounds": {
                        "array": []
                },
                "upper_bounds": {
                        "array": []
                },
                "key_metadata": null,
                "split_offsets": {
                        "array": [4]
                },
                "sort_order_id": {
                        "int": 0
                }
        }
   }
   ```
   
   The stats are only computed for the primitive types (see 
`PyArrowStatisticsCollector`).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to