zhjwpku commented on code in PR #159:
URL: https://github.com/apache/iceberg-cpp/pull/159#discussion_r2256826054
##########
src/iceberg/parquet/parquet_schema_util.cc:
##########
@@ -17,20 +17,393 @@
* under the License.
*/
+#include <charconv>
+
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+#include <arrow/util/key_value_metadata.h>
+#include <parquet/arrow/schema.h>
#include <parquet/schema.h>
+#include "iceberg/metadata_columns.h"
#include "iceberg/parquet/parquet_schema_util_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema_util_internal.h"
#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter.h"
+#include "iceberg/util/macros.h"
namespace iceberg::parquet {
+namespace {
+
+constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
+
+std::optional<int32_t> FieldIdFromMetadata(
+ const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+ if (!metadata) {
+ return std::nullopt;
+ }
+ int key = metadata->FindKey(kParquetFieldIdKey.data());
+ if (key < 0) {
+ return std::nullopt;
+ }
+ std::string field_id_str = metadata->value(key);
+ int32_t field_id = -1;
+ auto [_, ec] = std::from_chars(field_id_str.data(),
+ field_id_str.data() + field_id_str.size(),
field_id);
+ if (ec != std::errc() || field_id < 0) {
+ return std::nullopt;
+ }
+ return field_id;
+}
+
+std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField&
parquet_field) {
+ return FieldIdFromMetadata(parquet_field.field->metadata());
+}
+
+// TODO(gangwu): support v3 unknown type
+Status ValidateParquetSchemaEvolution(
+ const Type& expected_type, const ::parquet::arrow::SchemaField&
parquet_field) {
+ const auto& arrow_type = parquet_field.field->type();
+ switch (expected_type.type_id()) {
+ case TypeId::kBoolean:
+ if (arrow_type->id() == ::arrow::Type::BOOL) {
+ return {};
+ }
+ break;
+ case TypeId::kInt:
+ if (arrow_type->id() == ::arrow::Type::INT32) {
+ return {};
+ }
+ break;
+ case TypeId::kLong:
+ if (arrow_type->id() == ::arrow::Type::INT64 ||
+ arrow_type->id() == ::arrow::Type::INT32) {
+ return {};
+ }
+ break;
+ case TypeId::kFloat:
+ if (arrow_type->id() == ::arrow::Type::FLOAT) {
+ return {};
+ }
+ break;
+ case TypeId::kDouble:
+ if (arrow_type->id() == ::arrow::Type::DOUBLE ||
+ arrow_type->id() == ::arrow::Type::FLOAT) {
+ return {};
+ }
+ break;
+ case TypeId::kDate:
+ if (arrow_type->id() == ::arrow::Type::DATE32) {
+ return {};
+ }
+ break;
+ case TypeId::kTime:
+ if (arrow_type->id() == ::arrow::Type::TIME64) {
+ return {};
+ }
+ break;
+ case TypeId::kTimestamp:
+ if (arrow_type->id() == ::arrow::Type::TIMESTAMP) {
+ const auto& timestamp_type =
+ internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type);
+ if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO &&
+ timestamp_type.timezone().empty()) {
+ return {};
+ }
+ }
+ break;
+ case TypeId::kTimestampTz:
+ if (arrow_type->id() == ::arrow::Type::TIMESTAMP) {
+ const auto& timestamp_type =
+ internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type);
+ if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO &&
+ !timestamp_type.timezone().empty()) {
+ return {};
+ }
+ }
+ break;
+ case TypeId::kString:
+ if (arrow_type->id() == ::arrow::Type::STRING) {
+ return {};
+ }
+ break;
+ case TypeId::kBinary:
+ if (arrow_type->id() == ::arrow::Type::BINARY) {
+ return {};
+ }
+ break;
+ case TypeId::kDecimal:
+ if (arrow_type->id() == ::arrow::Type::DECIMAL128) {
+ const auto& decimal_type =
+ internal::checked_cast<const DecimalType&>(expected_type);
+ const auto& arrow_decimal =
+ internal::checked_cast<const
::arrow::Decimal128Type&>(*arrow_type);
+ if (decimal_type.scale() == arrow_decimal.scale() &&
+ decimal_type.precision() >= arrow_decimal.precision()) {
+ return {};
+ }
+ }
+ break;
+ case TypeId::kUuid:
+ if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) {
+ const auto& fixed_binary =
+ internal::checked_cast<const
::arrow::FixedSizeBinaryType&>(*arrow_type);
+ if (fixed_binary.byte_width() == 16) {
+ return {};
+ }
+ }
+ break;
+ case TypeId::kFixed:
+ if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) {
+ const auto& fixed_binary =
+ internal::checked_cast<const
::arrow::FixedSizeBinaryType&>(*arrow_type);
+ if (fixed_binary.byte_width() ==
+ internal::checked_cast<const FixedType&>(expected_type).length()) {
+ return {};
+ }
+ }
+ break;
+ case TypeId::kStruct:
+ if (arrow_type->id() == ::arrow::Type::STRUCT) {
+ return {};
+ }
+ break;
+ case TypeId::kList:
+ if (arrow_type->id() == ::arrow::Type::LIST) {
+ return {};
+ }
+ break;
+ case TypeId::kMap:
+ if (arrow_type->id() == ::arrow::Type::MAP) {
+ return {};
+ }
+ break;
+ default:
+ break;
+ }
+
+ return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}",
+ expected_type, arrow_type->ToString());
+}
+
+// Forward declaration
+Result<FieldProjection> ProjectNested(
+ const Type& nested_type,
+ const std::vector<::parquet::arrow::SchemaField>& parquet_fields);
+
+Result<FieldProjection> ProjectStruct(
+ const StructType& struct_type,
+ const std::vector<::parquet::arrow::SchemaField>& parquet_fields) {
+ struct FieldContext {
+ size_t local_index;
+ const ::parquet::arrow::SchemaField& parquet_field;
+ };
+ std::unordered_map<int32_t, FieldContext> field_context_map;
+ field_context_map.reserve(parquet_fields.size());
+
+ for (size_t i = 0; i < parquet_fields.size(); ++i) {
+ const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i];
+ auto field_id = GetFieldId(parquet_field);
+ if (!field_id) {
+ continue;
+ }
+ if (const auto [iter, inserted] = field_context_map.emplace(
+ std::piecewise_construct, std::forward_as_tuple(field_id.value()),
+ std::forward_as_tuple(i, parquet_field));
+ !inserted) [[unlikely]] {
+ return InvalidSchema("Duplicate field id {} found in Parquet schema",
+ field_id.value());
+ }
+ }
+
+ FieldProjection result;
+ result.children.reserve(struct_type.fields().size());
+
+ for (const auto& field : struct_type.fields()) {
+ int32_t field_id = field.field_id();
+ FieldProjection child_projection;
+
+ if (auto iter = field_context_map.find(field_id); iter !=
field_context_map.cend()) {
+ const auto& parquet_field = iter->second.parquet_field;
+ ICEBERG_RETURN_UNEXPECTED(
+ ValidateParquetSchemaEvolution(*field.type(), parquet_field));
+ if (field.type()->is_nested()) {
+ ICEBERG_ASSIGN_OR_RAISE(child_projection,
+ ProjectNested(*field.type(),
parquet_field.children));
+ } else {
+ child_projection.attributes =
+
std::make_shared<ParquetExtraAttributes>(parquet_field.column_index);
+ }
+ child_projection.from = iter->second.local_index;
+ child_projection.kind = FieldProjection::Kind::kProjected;
+ } else if (MetadataColumns::IsMetadataColumn(field_id)) {
+ child_projection.kind = FieldProjection::Kind::kMetadata;
+ } else if (field.optional()) {
+ child_projection.kind = FieldProjection::Kind::kNull;
+ } else {
+ return InvalidSchema("Missing required field with id: {}", field_id);
+ }
+
+ result.children.emplace_back(std::move(child_projection));
+ }
+
+ PruneFieldProjection(result);
+ return result;
+}
+
+Result<FieldProjection> ProjectList(
+ const ListType& list_type,
+ const std::vector<::parquet::arrow::SchemaField>& parquet_fields) {
+ if (parquet_fields.size() != 1) {
+ return InvalidSchema("List type must have exactly one field, got {}",
+ parquet_fields.size());
+ }
+
+ const auto& parquet_field = parquet_fields[0];
Review Comment:
nit: use `parquet_fields.back()`? Since you use `list_type.fields().back()`
below.
##########
src/iceberg/parquet/parquet_schema_util.cc:
##########
@@ -17,20 +17,393 @@
* under the License.
*/
+#include <charconv>
+
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+#include <arrow/util/key_value_metadata.h>
+#include <parquet/arrow/schema.h>
#include <parquet/schema.h>
+#include "iceberg/metadata_columns.h"
#include "iceberg/parquet/parquet_schema_util_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema_util_internal.h"
#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter.h"
+#include "iceberg/util/macros.h"
namespace iceberg::parquet {
+namespace {
+
+constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
+
+std::optional<int32_t> FieldIdFromMetadata(
+ const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+ if (!metadata) {
+ return std::nullopt;
+ }
+ int key = metadata->FindKey(kParquetFieldIdKey.data());
Review Comment:
Below is the FindKey signature, can we just pass kParquetFieldIdKey without
the .data() part?
```
/// \brief Perform linear search for key, returning -1 if not found
int FindKey(std::string_view key) const;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]