wgtmac commented on code in PR #159:
URL: https://github.com/apache/iceberg-cpp/pull/159#discussion_r2254572871


##########
src/iceberg/parquet/parquet_schema_util.cc:
##########
@@ -17,20 +17,392 @@
  * under the License.
  */
 
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+#include <arrow/util/key_value_metadata.h>
+#include <parquet/arrow/schema.h>
 #include <parquet/schema.h>
 
+#include "iceberg/metadata_columns.h"
 #include "iceberg/parquet/parquet_schema_util_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema_util_internal.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg::parquet {
 
+namespace {
+
+constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
+
+std::optional<int32_t> FieldIdFromMetadata(
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return std::nullopt;
+  }
+  int key = metadata->FindKey(kParquetFieldIdKey.data());
+  if (key < 0) {
+    return std::nullopt;
+  }
+  std::string field_id_str = metadata->value(key);
+  int32_t field_id = -1;
+  try {
+    field_id = std::stoi(field_id_str);
+  } catch (const std::invalid_argument& e) {
+    return std::nullopt;
+  } catch (const std::out_of_range& e) {
+    return std::nullopt;
+  }
+  return field_id < 0 ? std::nullopt : std::make_optional(field_id);
+}
+
+std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField& 
parquet_field) {
+  return FieldIdFromMetadata(parquet_field.field->metadata());
+}
+
+Status ValidateParquetSchemaEvolution(
+    const Type& expected_type, const ::parquet::arrow::SchemaField& 
parquet_field) {
+  const auto& arrow_type = parquet_field.field->type();

Review Comment:
   I'm not sure. I think we can delay this support until we implement V3 
unknown type.



##########
src/iceberg/parquet/parquet_schema_util.cc:
##########
@@ -17,20 +17,392 @@
  * under the License.
  */
 
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+#include <arrow/util/key_value_metadata.h>
+#include <parquet/arrow/schema.h>
 #include <parquet/schema.h>
 
+#include "iceberg/metadata_columns.h"
 #include "iceberg/parquet/parquet_schema_util_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema_util_internal.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg::parquet {
 
+namespace {
+
+constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
+
+std::optional<int32_t> FieldIdFromMetadata(
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return std::nullopt;
+  }
+  int key = metadata->FindKey(kParquetFieldIdKey.data());
+  if (key < 0) {
+    return std::nullopt;
+  }
+  std::string field_id_str = metadata->value(key);
+  int32_t field_id = -1;
+  try {
+    field_id = std::stoi(field_id_str);
+  } catch (const std::invalid_argument& e) {
+    return std::nullopt;
+  } catch (const std::out_of_range& e) {
+    return std::nullopt;
+  }
+  return field_id < 0 ? std::nullopt : std::make_optional(field_id);
+}
+
+std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField& 
parquet_field) {
+  return FieldIdFromMetadata(parquet_field.field->metadata());
+}
+
+Status ValidateParquetSchemaEvolution(
+    const Type& expected_type, const ::parquet::arrow::SchemaField& 
parquet_field) {
+  const auto& arrow_type = parquet_field.field->type();
+  switch (expected_type.type_id()) {
+    case TypeId::kBoolean:
+      if (arrow_type->id() == ::arrow::Type::BOOL) {
+        return {};
+      }
+      break;
+    case TypeId::kInt:
+      if (arrow_type->id() == ::arrow::Type::INT32) {
+        return {};
+      }
+      break;
+    case TypeId::kLong:
+      if (arrow_type->id() == ::arrow::Type::INT64 ||
+          arrow_type->id() == ::arrow::Type::INT32) {
+        return {};
+      }
+      break;
+    case TypeId::kFloat:
+      if (arrow_type->id() == ::arrow::Type::FLOAT) {
+        return {};
+      }
+      break;
+    case TypeId::kDouble:
+      if (arrow_type->id() == ::arrow::Type::DOUBLE ||
+          arrow_type->id() == ::arrow::Type::FLOAT) {
+        return {};
+      }
+      break;
+    case TypeId::kDate:
+      if (arrow_type->id() == ::arrow::Type::DATE32) {
+        return {};
+      }
+      break;
+    case TypeId::kTime:
+      if (arrow_type->id() == ::arrow::Type::TIME64) {
+        return {};
+      }
+      break;
+    case TypeId::kTimestamp:
+      if (arrow_type->id() == ::arrow::Type::TIMESTAMP) {
+        const auto& timestamp_type =
+            internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type);
+        if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO &&
+            timestamp_type.timezone().empty()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kTimestampTz:
+      if (arrow_type->id() == ::arrow::Type::TIMESTAMP) {
+        const auto& timestamp_type =
+            internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type);
+        if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO &&
+            !timestamp_type.timezone().empty()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kString:
+      if (arrow_type->id() == ::arrow::Type::STRING) {
+        return {};
+      }
+      break;
+    case TypeId::kBinary:
+      if (arrow_type->id() == ::arrow::Type::BINARY) {
+        return {};
+      }
+      break;
+    case TypeId::kDecimal:
+      if (arrow_type->id() == ::arrow::Type::DECIMAL128) {
+        const auto& decimal_type =
+            internal::checked_cast<const DecimalType&>(expected_type);
+        const auto& arrow_decimal =
+            internal::checked_cast<const 
::arrow::Decimal128Type&>(*arrow_type);
+        if (decimal_type.scale() == arrow_decimal.scale() &&
+            decimal_type.precision() >= arrow_decimal.precision()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kUuid:
+      if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) {
+        const auto& fixed_binary =
+            internal::checked_cast<const 
::arrow::FixedSizeBinaryType&>(*arrow_type);
+        if (fixed_binary.byte_width() == 16) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kFixed:
+      if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) {
+        const auto& fixed_binary =
+            internal::checked_cast<const 
::arrow::FixedSizeBinaryType&>(*arrow_type);
+        if (fixed_binary.byte_width() ==
+            internal::checked_cast<const FixedType&>(expected_type).length()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kStruct:
+      if (arrow_type->id() == ::arrow::Type::STRUCT) {
+        return {};
+      }
+      break;
+    case TypeId::kList:
+      if (arrow_type->id() == ::arrow::Type::LIST) {
+        return {};
+      }
+      break;
+    case TypeId::kMap:
+      if (arrow_type->id() == ::arrow::Type::MAP) {
+        return {};
+      }
+      break;
+    default:
+      break;
+  }
+
+  return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}",
+                       expected_type, arrow_type->ToString());
+}
+
+// Forward declaration
+Result<FieldProjection> ProjectNested(
+    const Type& expected_type,
+    const std::vector<::parquet::arrow::SchemaField>& parquet_fields);
+
+Result<FieldProjection> ProjectStruct(
+    const StructType& struct_type,
+    const std::vector<::parquet::arrow::SchemaField>& parquet_fields) {
+  struct FieldContext {
+    size_t local_index;
+    const ::parquet::arrow::SchemaField& parquet_field;
+  };
+  std::unordered_map<int32_t, FieldContext> field_context_map;
+  field_context_map.reserve(parquet_fields.size());
+
+  for (size_t i = 0; i < parquet_fields.size(); ++i) {
+    const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i];
+    auto field_id = GetFieldId(parquet_field);
+    if (!field_id) {
+      continue;
+    }
+    if (const auto [iter, inserted] = field_context_map.emplace(
+            std::piecewise_construct, std::forward_as_tuple(field_id.value()),
+            std::forward_as_tuple(i, parquet_field));
+        !inserted) [[unlikely]] {
+      return InvalidSchema("Duplicate field id found in Parquet schema: {}",

Review Comment:
   Unfortunately we don't have full schema here and we don't even know the 
field names.



##########
src/iceberg/parquet/parquet_schema_util.cc:
##########
@@ -17,20 +17,392 @@
  * under the License.
  */
 
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+#include <arrow/util/key_value_metadata.h>
+#include <parquet/arrow/schema.h>
 #include <parquet/schema.h>
 
+#include "iceberg/metadata_columns.h"
 #include "iceberg/parquet/parquet_schema_util_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema_util_internal.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg::parquet {
 
+namespace {
+
+constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
+
+std::optional<int32_t> FieldIdFromMetadata(
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return std::nullopt;
+  }
+  int key = metadata->FindKey(kParquetFieldIdKey.data());
+  if (key < 0) {
+    return std::nullopt;
+  }
+  std::string field_id_str = metadata->value(key);
+  int32_t field_id = -1;
+  try {
+    field_id = std::stoi(field_id_str);
+  } catch (const std::invalid_argument& e) {
+    return std::nullopt;
+  } catch (const std::out_of_range& e) {
+    return std::nullopt;
+  }
+  return field_id < 0 ? std::nullopt : std::make_optional(field_id);
+}
+
+std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField& 
parquet_field) {
+  return FieldIdFromMetadata(parquet_field.field->metadata());
+}
+
+Status ValidateParquetSchemaEvolution(
+    const Type& expected_type, const ::parquet::arrow::SchemaField& 
parquet_field) {
+  const auto& arrow_type = parquet_field.field->type();
+  switch (expected_type.type_id()) {
+    case TypeId::kBoolean:
+      if (arrow_type->id() == ::arrow::Type::BOOL) {
+        return {};
+      }
+      break;
+    case TypeId::kInt:
+      if (arrow_type->id() == ::arrow::Type::INT32) {
+        return {};
+      }
+      break;
+    case TypeId::kLong:
+      if (arrow_type->id() == ::arrow::Type::INT64 ||
+          arrow_type->id() == ::arrow::Type::INT32) {
+        return {};
+      }
+      break;
+    case TypeId::kFloat:
+      if (arrow_type->id() == ::arrow::Type::FLOAT) {
+        return {};
+      }
+      break;
+    case TypeId::kDouble:
+      if (arrow_type->id() == ::arrow::Type::DOUBLE ||
+          arrow_type->id() == ::arrow::Type::FLOAT) {
+        return {};
+      }
+      break;
+    case TypeId::kDate:
+      if (arrow_type->id() == ::arrow::Type::DATE32) {
+        return {};
+      }
+      break;
+    case TypeId::kTime:
+      if (arrow_type->id() == ::arrow::Type::TIME64) {
+        return {};
+      }
+      break;
+    case TypeId::kTimestamp:
+      if (arrow_type->id() == ::arrow::Type::TIMESTAMP) {
+        const auto& timestamp_type =
+            internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type);
+        if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO &&
+            timestamp_type.timezone().empty()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kTimestampTz:
+      if (arrow_type->id() == ::arrow::Type::TIMESTAMP) {
+        const auto& timestamp_type =
+            internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type);
+        if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO &&
+            !timestamp_type.timezone().empty()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kString:
+      if (arrow_type->id() == ::arrow::Type::STRING) {
+        return {};
+      }
+      break;
+    case TypeId::kBinary:
+      if (arrow_type->id() == ::arrow::Type::BINARY) {
+        return {};
+      }
+      break;
+    case TypeId::kDecimal:
+      if (arrow_type->id() == ::arrow::Type::DECIMAL128) {
+        const auto& decimal_type =
+            internal::checked_cast<const DecimalType&>(expected_type);
+        const auto& arrow_decimal =
+            internal::checked_cast<const 
::arrow::Decimal128Type&>(*arrow_type);
+        if (decimal_type.scale() == arrow_decimal.scale() &&
+            decimal_type.precision() >= arrow_decimal.precision()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kUuid:
+      if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) {
+        const auto& fixed_binary =
+            internal::checked_cast<const 
::arrow::FixedSizeBinaryType&>(*arrow_type);
+        if (fixed_binary.byte_width() == 16) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kFixed:
+      if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) {
+        const auto& fixed_binary =
+            internal::checked_cast<const 
::arrow::FixedSizeBinaryType&>(*arrow_type);
+        if (fixed_binary.byte_width() ==
+            internal::checked_cast<const FixedType&>(expected_type).length()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kStruct:
+      if (arrow_type->id() == ::arrow::Type::STRUCT) {
+        return {};
+      }
+      break;
+    case TypeId::kList:
+      if (arrow_type->id() == ::arrow::Type::LIST) {
+        return {};
+      }
+      break;
+    case TypeId::kMap:
+      if (arrow_type->id() == ::arrow::Type::MAP) {
+        return {};
+      }
+      break;
+    default:
+      break;
+  }
+
+  return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}",
+                       expected_type, arrow_type->ToString());
+}
+
+// Forward declaration
+Result<FieldProjection> ProjectNested(
+    const Type& expected_type,
+    const std::vector<::parquet::arrow::SchemaField>& parquet_fields);
+
+Result<FieldProjection> ProjectStruct(
+    const StructType& struct_type,
+    const std::vector<::parquet::arrow::SchemaField>& parquet_fields) {
+  struct FieldContext {
+    size_t local_index;
+    const ::parquet::arrow::SchemaField& parquet_field;
+  };
+  std::unordered_map<int32_t, FieldContext> field_context_map;
+  field_context_map.reserve(parquet_fields.size());
+
+  for (size_t i = 0; i < parquet_fields.size(); ++i) {
+    const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i];
+    auto field_id = GetFieldId(parquet_field);
+    if (!field_id) {
+      continue;
+    }
+    if (const auto [iter, inserted] = field_context_map.emplace(
+            std::piecewise_construct, std::forward_as_tuple(field_id.value()),
+            std::forward_as_tuple(i, parquet_field));
+        !inserted) [[unlikely]] {
+      return InvalidSchema("Duplicate field id found in Parquet schema: {}",
+                           field_id.value());
+    }
+  }
+
+  FieldProjection result;
+  result.children.reserve(struct_type.fields().size());
+
+  for (const auto& expected_field : struct_type.fields()) {
+    int32_t field_id = expected_field.field_id();
+    FieldProjection child_projection;
+
+    if (auto iter = field_context_map.find(field_id); iter != 
field_context_map.cend()) {
+      const auto& parquet_field = iter->second.parquet_field;
+      ICEBERG_RETURN_UNEXPECTED(
+          ValidateParquetSchemaEvolution(*expected_field.type(), 
parquet_field));
+      if (expected_field.type()->is_nested()) {
+        ICEBERG_ASSIGN_OR_RAISE(child_projection, 
ProjectNested(*expected_field.type(),
+                                                                
parquet_field.children));
+      } else {
+        child_projection.attributes =
+            
std::make_shared<ParquetExtraAttributes>(parquet_field.column_index);
+      }
+      child_projection.from = iter->second.local_index;
+      child_projection.kind = FieldProjection::Kind::kProjected;
+    } else if (MetadataColumns::IsMetadataColumn(field_id)) {
+      child_projection.kind = FieldProjection::Kind::kMetadata;
+    } else if (expected_field.optional()) {
+      child_projection.kind = FieldProjection::Kind::kNull;

Review Comment:
   Right. I think direct children of map and list types do not support schema 
evolution per the spec (but if the child is a struct, its fields support schema 
evolution).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to