wgtmac commented on code in PR #651:
URL: https://github.com/apache/iceberg-cpp/pull/651#discussion_r3297106589


##########
src/iceberg/parquet/parquet_metrics.h:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/parquet/parquet_metrics.h
+/// \brief Utilities for extracting metrics from Parquet files.
+
+#include <unordered_map>
+
+#include <parquet/metadata.h>
+
+#include "iceberg/iceberg_bundle_export.h"
+#include "iceberg/metrics.h"
+#include "iceberg/metrics_config.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+
+namespace iceberg::parquet {
+
+/// \brief Utility class for computing metrics from Parquet files.
+class ICEBERG_BUNDLE_EXPORT ParquetMetrics {
+ public:
+  ParquetMetrics() = delete;
+
+  /// \brief Compute file-level metrics from Parquet file metadata.
+  ///
+  /// This function extracts metrics including row count, column sizes, value 
counts,
+  /// null value counts, and lower/upper bounds from Parquet file metadata.
+  /// NaN value counts are not currently collected from Parquet metadata.
+  /// The metrics are computed according to the provided MetricsConfig, which 
determines
+  /// which columns to collect metrics for and at what granularity (counts 
only, truncated
+  /// bounds, or full bounds).
+  ///
+  /// \param schema The Iceberg schema for the table.
+  /// \param parquet_schema The Parquet schema descriptor.
+  /// \param metrics_config The configuration specifying how to collect 
metrics.
+  /// \param metadata The Parquet file metadata containing row group 
statistics.
+  /// \param field_metrics Optional per-field metrics computed during write.
+  ///        If provided, these take precedence over footer statistics.
+  /// \return Result containing the computed Metrics or an error.
+  static Result<Metrics> GetMetrics(
+      const Schema& schema, const ::parquet::SchemaDescriptor& parquet_schema,

Review Comment:
   In principle, we cannot expose symbols like `::parquet::` which belongs to 
thirdparty dependencies. So we need to rename this header file to 
`parquet_metrics_internal.h` to not install it. (note that the source file does 
not need to rename)



##########
src/iceberg/metrics.h:
##########
@@ -30,6 +30,35 @@
 
 namespace iceberg {
 
+/// \brief Field-level metrics for a single column.
+///
+/// This structure captures value counts, null counts, NaN counts, and optional
+/// lower/upper bounds for a specific field identified by its field_id.
+struct ICEBERG_EXPORT FieldMetrics {
+  /// \brief The field ID this metrics belongs to.
+  int32_t field_id;

Review Comment:
   Do we need to initialize it by an invalid value just in case undefined 
behavior?



##########
src/iceberg/util/truncate_util.cc:
##########
@@ -167,42 +168,46 @@ Literal TruncateLiteralImpl<TypeId::kBinary>(const 
Literal& literal, int32_t wid
 }
 
 template <TypeId type_id>
-Result<Literal> TruncateLiteralMaxImpl(const Literal& literal, int32_t width) 
= delete;
+Result<std::optional<Literal>> TruncateLiteralMaxImpl(const Literal& literal,
+                                                      int32_t width) = delete;
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kString>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kString>(
+    const Literal& literal, int32_t width) {
   const auto& str = std::get<std::string>(literal.value());
-  ICEBERG_ASSIGN_OR_RAISE(std::string truncated,
-                          TruncateUtils::TruncateUTF8Max(str, width));
-  return Literal::String(std::move(truncated));
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated, TruncateUtils::TruncateUTF8Max(str, 
width));
+  if (!truncated.has_value()) {
+    return std::nullopt;
+  }
+  return std::optional<Literal>(Literal::String(std::move(truncated.value())));

Review Comment:
   We can directly write `return truncated.transform([](std::string t) { return 
Literal::String(std::move(t)); });` to make it compact.



##########
src/iceberg/parquet/parquet_metrics.cc:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_metrics.h"
+
+#include <limits>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+
+#include <parquet/column_reader.h>
+#include <parquet/schema.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/truncate_util.h"
+#include "iceberg/util/visit_type.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+/// \brief Get the Iceberg field ID from a Parquet column descriptor.
+/// \return The field ID, or nullopt if no field ID is set.
+std::optional<int32_t> GetFieldId(const ::parquet::ColumnDescriptor& column) {
+  const auto& node = column.schema_node();
+  if (node == nullptr || !node->is_primitive()) {
+    return std::nullopt;
+  }
+  if (node->field_id() < 0) {
+    return std::nullopt;
+  }
+  return node->field_id();
+}
+
+/// \brief Find the column index for a field in the Parquet schema.
+std::optional<int32_t> FindColumnIndex(const ::parquet::SchemaDescriptor& 
parquet_schema,
+                                       int32_t field_id) {
+  auto columns = std::views::iota(0, parquet_schema.num_columns());
+  auto it = std::ranges::find_if(columns, [&](int i) {
+    auto column_field_id = GetFieldId(*parquet_schema.Column(i));
+    return column_field_id.has_value() && column_field_id.value() == field_id;
+  });
+  return it != columns.end() ? std::optional(*it) : std::nullopt;
+}
+
+/// \brief Collect counts (value count and null count) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \return A pair of (value_count, null_count), or nullopt if stats are not 
available.
+std::optional<FieldMetrics> CollectCounts(int32_t field_id,
+                                          const ::parquet::FileMetaData& 
metadata,
+                                          int32_t column_idx) {
+  int64_t value_count = 0;
+  int64_t null_count = 0;
+
+  for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+  }
+
+  return FieldMetrics{
+      .field_id = field_id, .value_count = value_count, .null_value_count = 
null_count};
+}
+
+/// \brief Collect bounds (lower and upper) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param iceberg_type The Iceberg primitive type for deserializing values.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \param truncate_length The length to truncate strings/binary values.
+/// \return FieldMetrics with counts and bounds, or nullopt if stats are not 
available.
+Result<std::optional<FieldMetrics>> CollectBounds(
+    int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+    const ::parquet::FileMetaData& metadata, int32_t column_idx,
+    int32_t truncate_length) {
+  int64_t null_count = 0;
+  int64_t value_count = 0;
+  std::optional<Literal> lower_bound;
+  std::optional<Literal> upper_bound;
+
+  for (int32_t rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+
+    if (stats->HasMinMax()) {
+      auto min_bytes = stats->EncodeMin();
+      auto min_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(min_bytes.data()), 
min_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto min_value,
+                              Conversions::FromBytes(iceberg_type, min_span));

Review Comment:
   `stats->EncodeMin()` is encoded by Parquet plain encoding but 
`Conversions::FromBytes` expects Iceberg single value serialization. This 
conversion may work coincidently for some primitive types. We need to fix it by 
creating a mapping between Parquet and Iceberg types and then use explicit cast 
to obtain typed Parquet values to create correct literals.



##########
src/iceberg/parquet/parquet_metrics.cc:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_metrics.h"
+
+#include <limits>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+
+#include <parquet/column_reader.h>
+#include <parquet/schema.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/truncate_util.h"
+#include "iceberg/util/visit_type.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+/// \brief Get the Iceberg field ID from a Parquet column descriptor.
+/// \return The field ID, or nullopt if no field ID is set.
+std::optional<int32_t> GetFieldId(const ::parquet::ColumnDescriptor& column) {
+  const auto& node = column.schema_node();
+  if (node == nullptr || !node->is_primitive()) {
+    return std::nullopt;
+  }
+  if (node->field_id() < 0) {
+    return std::nullopt;
+  }
+  return node->field_id();
+}
+
+/// \brief Find the column index for a field in the Parquet schema.
+std::optional<int32_t> FindColumnIndex(const ::parquet::SchemaDescriptor& 
parquet_schema,
+                                       int32_t field_id) {
+  auto columns = std::views::iota(0, parquet_schema.num_columns());
+  auto it = std::ranges::find_if(columns, [&](int i) {
+    auto column_field_id = GetFieldId(*parquet_schema.Column(i));
+    return column_field_id.has_value() && column_field_id.value() == field_id;
+  });
+  return it != columns.end() ? std::optional(*it) : std::nullopt;
+}
+
+/// \brief Collect counts (value count and null count) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \return A pair of (value_count, null_count), or nullopt if stats are not 
available.
+std::optional<FieldMetrics> CollectCounts(int32_t field_id,
+                                          const ::parquet::FileMetaData& 
metadata,
+                                          int32_t column_idx) {
+  int64_t value_count = 0;
+  int64_t null_count = 0;
+
+  for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+  }
+
+  return FieldMetrics{
+      .field_id = field_id, .value_count = value_count, .null_value_count = 
null_count};
+}
+
+/// \brief Collect bounds (lower and upper) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param iceberg_type The Iceberg primitive type for deserializing values.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \param truncate_length The length to truncate strings/binary values.
+/// \return FieldMetrics with counts and bounds, or nullopt if stats are not 
available.
+Result<std::optional<FieldMetrics>> CollectBounds(
+    int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+    const ::parquet::FileMetaData& metadata, int32_t column_idx,
+    int32_t truncate_length) {
+  int64_t null_count = 0;
+  int64_t value_count = 0;
+  std::optional<Literal> lower_bound;
+  std::optional<Literal> upper_bound;
+
+  for (int32_t rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+
+    if (stats->HasMinMax()) {
+      auto min_bytes = stats->EncodeMin();
+      auto min_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(min_bytes.data()), 
min_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto min_value,
+                              Conversions::FromBytes(iceberg_type, min_span));
+      if (!lower_bound.has_value() || min_value < lower_bound.value()) {
+        lower_bound = std::move(min_value);
+      }
+
+      auto max_bytes = stats->EncodeMax();
+      auto max_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(max_bytes.data()), 
max_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto max_value,
+                              Conversions::FromBytes(iceberg_type, max_span));
+      if (!upper_bound.has_value() || max_value > upper_bound.value()) {
+        upper_bound = std::move(max_value);
+      }
+    }
+  }
+
+  if (!lower_bound.has_value() || !upper_bound.has_value() || 
lower_bound->IsNaN() ||
+      upper_bound->IsNaN()) {

Review Comment:
   If the above conversion has been fixed, we can move NaN check only for 
floating types.



##########
src/iceberg/parquet/parquet_writer.cc:
##########
@@ -165,15 +157,35 @@ class ParquetWriter::Impl {
 
   std::vector<int64_t> split_offsets() const { return split_offsets_; }
 
+  Result<Metrics> metrics() {
+    if (writer_) {
+      return Invalid("Cannot return metrics for unclosed writer");
+    }
+    if (!metadata_) {

Review Comment:
   Isn't this an error when `metadata_` is not available if writer is closed?



##########
src/iceberg/parquet/parquet_writer.cc:
##########
@@ -130,6 +134,7 @@ class ParquetWriter::Impl {
     for (int i = 0; i < metadata->num_row_groups(); ++i) {
       split_offsets_.push_back(metadata->RowGroup(i)->file_offset());
     }
+    metadata_ = writer_->metadata();

Review Comment:
   We can consolidate `metadata` with `metadata_`



##########
src/iceberg/metrics_config.cc:
##########
@@ -116,6 +130,14 @@ Result<std::shared_ptr<MetricsConfig>> 
MetricsConfig::Make(const Table& table) {
                       *sort_order.value_or(SortOrder::Unsorted()));
 }
 
+Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(
+    std::unordered_map<std::string, std::string> properties) {
+  // Create a minimal TableProperties wrapper for the properties
+  TableProperties props = TableProperties::FromMap(std::move(properties));
+
+  return MakeInternal(props, Schema({}), *SortOrder::Unsorted());

Review Comment:
   This looks inconsistent with the Java impl. Here we pass an empty schema to 
`MakeInternal` which is consumed later but the Java impl passes a null schema 
and directly assign the metrics mode to default later to bypass the schema 
checking. Perhaps we should change signature of MakeInternal to use `const 
Schema*` to indicate a missing schema.



##########
src/iceberg/util/truncate_util.cc:
##########
@@ -167,42 +168,46 @@ Literal TruncateLiteralImpl<TypeId::kBinary>(const 
Literal& literal, int32_t wid
 }
 
 template <TypeId type_id>
-Result<Literal> TruncateLiteralMaxImpl(const Literal& literal, int32_t width) 
= delete;
+Result<std::optional<Literal>> TruncateLiteralMaxImpl(const Literal& literal,
+                                                      int32_t width) = delete;
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kString>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kString>(
+    const Literal& literal, int32_t width) {
   const auto& str = std::get<std::string>(literal.value());
-  ICEBERG_ASSIGN_OR_RAISE(std::string truncated,
-                          TruncateUtils::TruncateUTF8Max(str, width));
-  return Literal::String(std::move(truncated));
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated, TruncateUtils::TruncateUTF8Max(str, 
width));
+  if (!truncated.has_value()) {
+    return std::nullopt;
+  }
+  return std::optional<Literal>(Literal::String(std::move(truncated.value())));
 }
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kBinary>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kBinary>(
+    const Literal& literal, int32_t width) {
   const auto& data = std::get<std::vector<uint8_t>>(literal.value());
   if (static_cast<int32_t>(data.size()) <= width) {
-    return literal;
+    return std::optional<Literal>(literal);
   }
 
   std::vector<uint8_t> truncated(data.begin(), data.begin() + width);
   for (auto it = truncated.rbegin(); it != truncated.rend(); ++it) {
     if (*it < 0xFF) {
       ++(*it);
       truncated.resize(truncated.size() - std::distance(truncated.rbegin(), 
it));
-      return Literal::Binary(std::move(truncated));
+      return std::optional<Literal>(Literal::Binary(std::move(truncated)));
     }
   }
-  return InvalidArgument("Cannot truncate upper bound for binary: all bytes 
are 0xFF");
+  return std::nullopt;

Review Comment:
   I think the original error message is worth keeping.



##########
src/iceberg/parquet/parquet_metrics.cc:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_metrics.h"
+
+#include <limits>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+
+#include <parquet/column_reader.h>
+#include <parquet/schema.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/truncate_util.h"
+#include "iceberg/util/visit_type.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+/// \brief Get the Iceberg field ID from a Parquet column descriptor.
+/// \return The field ID, or nullopt if no field ID is set.
+std::optional<int32_t> GetFieldId(const ::parquet::ColumnDescriptor& column) {
+  const auto& node = column.schema_node();
+  if (node == nullptr || !node->is_primitive()) {
+    return std::nullopt;
+  }
+  if (node->field_id() < 0) {
+    return std::nullopt;
+  }
+  return node->field_id();
+}
+
+/// \brief Find the column index for a field in the Parquet schema.
+std::optional<int32_t> FindColumnIndex(const ::parquet::SchemaDescriptor& 
parquet_schema,
+                                       int32_t field_id) {
+  auto columns = std::views::iota(0, parquet_schema.num_columns());
+  auto it = std::ranges::find_if(columns, [&](int i) {
+    auto column_field_id = GetFieldId(*parquet_schema.Column(i));
+    return column_field_id.has_value() && column_field_id.value() == field_id;
+  });
+  return it != columns.end() ? std::optional(*it) : std::nullopt;
+}
+
+/// \brief Collect counts (value count and null count) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \return A pair of (value_count, null_count), or nullopt if stats are not 
available.
+std::optional<FieldMetrics> CollectCounts(int32_t field_id,
+                                          const ::parquet::FileMetaData& 
metadata,
+                                          int32_t column_idx) {
+  int64_t value_count = 0;
+  int64_t null_count = 0;
+
+  for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+  }
+
+  return FieldMetrics{
+      .field_id = field_id, .value_count = value_count, .null_value_count = 
null_count};
+}
+
+/// \brief Collect bounds (lower and upper) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param iceberg_type The Iceberg primitive type for deserializing values.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \param truncate_length The length to truncate strings/binary values.
+/// \return FieldMetrics with counts and bounds, or nullopt if stats are not 
available.
+Result<std::optional<FieldMetrics>> CollectBounds(
+    int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+    const ::parquet::FileMetaData& metadata, int32_t column_idx,
+    int32_t truncate_length) {
+  int64_t null_count = 0;
+  int64_t value_count = 0;
+  std::optional<Literal> lower_bound;
+  std::optional<Literal> upper_bound;
+
+  for (int32_t rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+
+    if (stats->HasMinMax()) {
+      auto min_bytes = stats->EncodeMin();
+      auto min_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(min_bytes.data()), 
min_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto min_value,
+                              Conversions::FromBytes(iceberg_type, min_span));
+      if (!lower_bound.has_value() || min_value < lower_bound.value()) {
+        lower_bound = std::move(min_value);
+      }
+
+      auto max_bytes = stats->EncodeMax();
+      auto max_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(max_bytes.data()), 
max_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto max_value,
+                              Conversions::FromBytes(iceberg_type, max_span));
+      if (!upper_bound.has_value() || max_value > upper_bound.value()) {
+        upper_bound = std::move(max_value);
+      }
+    }
+  }
+
+  if (!lower_bound.has_value() || !upper_bound.has_value() || 
lower_bound->IsNaN() ||
+      upper_bound->IsNaN()) {
+    return FieldMetrics{
+        .field_id = field_id,
+        .value_count = value_count,
+        .null_value_count = null_count,
+    };
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower,
+                          TruncateUtils::TruncateLowerBound(

Review Comment:
   Ditto, truncation can only be done on string types.



##########
src/iceberg/util/truncate_util.cc:
##########
@@ -167,42 +168,46 @@ Literal TruncateLiteralImpl<TypeId::kBinary>(const 
Literal& literal, int32_t wid
 }
 
 template <TypeId type_id>
-Result<Literal> TruncateLiteralMaxImpl(const Literal& literal, int32_t width) 
= delete;
+Result<std::optional<Literal>> TruncateLiteralMaxImpl(const Literal& literal,
+                                                      int32_t width) = delete;
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kString>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kString>(
+    const Literal& literal, int32_t width) {
   const auto& str = std::get<std::string>(literal.value());
-  ICEBERG_ASSIGN_OR_RAISE(std::string truncated,
-                          TruncateUtils::TruncateUTF8Max(str, width));
-  return Literal::String(std::move(truncated));
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated, TruncateUtils::TruncateUTF8Max(str, 
width));
+  if (!truncated.has_value()) {
+    return std::nullopt;
+  }
+  return std::optional<Literal>(Literal::String(std::move(truncated.value())));
 }
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kBinary>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kBinary>(
+    const Literal& literal, int32_t width) {
   const auto& data = std::get<std::vector<uint8_t>>(literal.value());
   if (static_cast<int32_t>(data.size()) <= width) {
-    return literal;
+    return std::optional<Literal>(literal);

Review Comment:
   We don't need an extra wrapper of std::optional here, isn't it? Same for 
other changes in this file.



##########
src/iceberg/util/truncate_util.cc:
##########
@@ -167,42 +168,46 @@ Literal TruncateLiteralImpl<TypeId::kBinary>(const 
Literal& literal, int32_t wid
 }
 
 template <TypeId type_id>
-Result<Literal> TruncateLiteralMaxImpl(const Literal& literal, int32_t width) 
= delete;
+Result<std::optional<Literal>> TruncateLiteralMaxImpl(const Literal& literal,
+                                                      int32_t width) = delete;
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kString>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kString>(
+    const Literal& literal, int32_t width) {
   const auto& str = std::get<std::string>(literal.value());
-  ICEBERG_ASSIGN_OR_RAISE(std::string truncated,
-                          TruncateUtils::TruncateUTF8Max(str, width));
-  return Literal::String(std::move(truncated));
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated, TruncateUtils::TruncateUTF8Max(str, 
width));
+  if (!truncated.has_value()) {
+    return std::nullopt;
+  }
+  return std::optional<Literal>(Literal::String(std::move(truncated.value())));
 }
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kBinary>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kBinary>(
+    const Literal& literal, int32_t width) {
   const auto& data = std::get<std::vector<uint8_t>>(literal.value());
   if (static_cast<int32_t>(data.size()) <= width) {
-    return literal;
+    return std::optional<Literal>(literal);
   }
 
   std::vector<uint8_t> truncated(data.begin(), data.begin() + width);
   for (auto it = truncated.rbegin(); it != truncated.rend(); ++it) {
     if (*it < 0xFF) {
       ++(*it);
       truncated.resize(truncated.size() - std::distance(truncated.rbegin(), 
it));
-      return Literal::Binary(std::move(truncated));
+      return std::optional<Literal>(Literal::Binary(std::move(truncated)));
     }
   }
-  return InvalidArgument("Cannot truncate upper bound for binary: all bytes 
are 0xFF");
+  return std::nullopt;

Review Comment:
   I think this error message is worth keeping.



##########
src/iceberg/parquet/parquet_metrics.cc:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_metrics.h"
+
+#include <limits>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+
+#include <parquet/column_reader.h>
+#include <parquet/schema.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/truncate_util.h"
+#include "iceberg/util/visit_type.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+/// \brief Get the Iceberg field ID from a Parquet column descriptor.
+/// \return The field ID, or nullopt if no field ID is set.
+std::optional<int32_t> GetFieldId(const ::parquet::ColumnDescriptor& column) {
+  const auto& node = column.schema_node();
+  if (node == nullptr || !node->is_primitive()) {
+    return std::nullopt;
+  }
+  if (node->field_id() < 0) {
+    return std::nullopt;
+  }
+  return node->field_id();
+}
+
+/// \brief Find the column index for a field in the Parquet schema.
+std::optional<int32_t> FindColumnIndex(const ::parquet::SchemaDescriptor& 
parquet_schema,
+                                       int32_t field_id) {
+  auto columns = std::views::iota(0, parquet_schema.num_columns());
+  auto it = std::ranges::find_if(columns, [&](int i) {
+    auto column_field_id = GetFieldId(*parquet_schema.Column(i));
+    return column_field_id.has_value() && column_field_id.value() == field_id;
+  });
+  return it != columns.end() ? std::optional(*it) : std::nullopt;
+}
+
+/// \brief Collect counts (value count and null count) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \return A pair of (value_count, null_count), or nullopt if stats are not 
available.
+std::optional<FieldMetrics> CollectCounts(int32_t field_id,
+                                          const ::parquet::FileMetaData& 
metadata,
+                                          int32_t column_idx) {
+  int64_t value_count = 0;
+  int64_t null_count = 0;
+
+  for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+  }
+
+  return FieldMetrics{
+      .field_id = field_id, .value_count = value_count, .null_value_count = 
null_count};
+}
+
+/// \brief Collect bounds (lower and upper) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param iceberg_type The Iceberg primitive type for deserializing values.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \param truncate_length The length to truncate strings/binary values.
+/// \return FieldMetrics with counts and bounds, or nullopt if stats are not 
available.
+Result<std::optional<FieldMetrics>> CollectBounds(
+    int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+    const ::parquet::FileMetaData& metadata, int32_t column_idx,
+    int32_t truncate_length) {
+  int64_t null_count = 0;
+  int64_t value_count = 0;
+  std::optional<Literal> lower_bound;
+  std::optional<Literal> upper_bound;
+
+  for (int32_t rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+
+    if (stats->HasMinMax()) {
+      auto min_bytes = stats->EncodeMin();
+      auto min_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(min_bytes.data()), 
min_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto min_value,
+                              Conversions::FromBytes(iceberg_type, min_span));
+      if (!lower_bound.has_value() || min_value < lower_bound.value()) {
+        lower_bound = std::move(min_value);
+      }
+
+      auto max_bytes = stats->EncodeMax();
+      auto max_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(max_bytes.data()), 
max_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto max_value,
+                              Conversions::FromBytes(iceberg_type, max_span));
+      if (!upper_bound.has_value() || max_value > upper_bound.value()) {
+        upper_bound = std::move(max_value);
+      }
+    }
+  }
+
+  if (!lower_bound.has_value() || !upper_bound.has_value() || 
lower_bound->IsNaN() ||
+      upper_bound->IsNaN()) {
+    return FieldMetrics{
+        .field_id = field_id,
+        .value_count = value_count,
+        .null_value_count = null_count,
+    };
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower,
+                          TruncateUtils::TruncateLowerBound(
+                              *iceberg_type, lower_bound.value(), 
truncate_length));
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated_upper,
+                          TruncateUtils::TruncateUpperBound(
+                              *iceberg_type, upper_bound.value(), 
truncate_length));
+
+  return FieldMetrics{
+      .field_id = field_id,
+      .value_count = value_count,
+      .null_value_count = null_count,
+      .lower_bound = std::move(truncated_lower),
+      .upper_bound = std::move(truncated_upper),
+  };
+}
+
+/// \brief Process pre-computed field metrics, applying truncation if needed.
+/// \param field_id The field ID to look up.
+/// \param field_metrics The map of pre-computed field metrics.
+/// \param primitive_type The primitive type for truncation.
+/// \param truncate_length The truncation length (0 means no bounds).
+/// \return Processed FieldMetrics with truncated bounds if applicable.
+Result<std::optional<FieldMetrics>> MetricsFromFieldMetrics(
+    int32_t field_id, const std::unordered_map<int32_t, FieldMetrics>& 
field_metrics,
+    std::shared_ptr<PrimitiveType> primitive_type, int32_t truncate_length) {
+  auto it = field_metrics.find(field_id);
+  if (it == field_metrics.end()) {
+    return std::nullopt;
+  }
+
+  const auto& fm = it->second;
+  FieldMetrics result{.field_id = fm.field_id,
+                      .value_count = fm.value_count,
+                      .null_value_count = fm.null_value_count,
+                      .nan_value_count = fm.nan_value_count};
+
+  if (truncate_length > 0) {

Review Comment:
   Can we check primitive_type first to skip types that do not need truncation?



##########
src/iceberg/test/parquet_metrics_test.cc:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/c/bridge.h>
+#include <arrow/filesystem/filesystem.h>
+#include <gtest/gtest.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/metadata.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/test/metrics_test_base.h"
+#include "iceberg/util/checked_cast.h"
+
+namespace iceberg::test {
+
+class ParquetMetricsTest : public MetricsTestBase, public ::testing::Test {

Review Comment:
   The test suite is broad, but several cases are too forgiving and would miss 
real bugs.
   - Decimal bounds are under-tested. MetricsForDecimals covers 
decimal-as-INT32, INT64, and fixed, but only checks that bounds exist. It 
should assert exact lower/upper values for each physical layout. This is 
exactly how the `EncodeMin()` vs Iceberg `Conversions::FromBytes` bug can slip 
through.
   - “Multiple row group” tests may not actually test multiple row groups. 
MetricsForTopLevelWithMultipleRowGroup and nested equivalent write 201 rows, 
but Parquet’s `SupportsSmallRowGroups()` returns false, so row group count is 
not verified. Force multiple row groups or remove the claim.
   - `field_metrics` override path is not tested. `ParquetMetrics::GetMetrics` 
supports precomputed `FieldMetrics`, but parquet_writer.cc always passes `{}`. 
Add direct tests where non-empty `field_metrics` overrides footer stats, 
carries NaN counts, and applies truncation.
   - Bounds assertions ignore literal type. AssertBounds receives a 
`PrimitiveType` but does not validate the returned `Literal` type. So date vs 
int, timestamp vs long, etc. could pass accidentally. Assert the literal type 
as well as the value.
   - Column sizes are barely verified. Several tests only check 
`column_sizes.empty()` or `!empty()`. They should assert exact field-id sets, 
especially for `none`, `counts`, nested-field overrides, and list/map exclusion 
behavior.
   - Truncate integration is incomplete. Utility tests cover `std::nullopt` for 
impossible upper bounds, but there is no Parquet metrics test proving that an 
unrepresentable truncated upper bound omits only the upper bound while 
preserving counts/lower bound.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to