mapleFU commented on code in PR #124: URL: https://github.com/apache/iceberg-cpp/pull/124#discussion_r2160658424
########## src/iceberg/avro/avro_data_util_internal.h: ########## @@ -26,10 +26,21 @@ namespace iceberg::avro { +/// \brief Append an Avro datum to an Arrow array builder. +/// +/// This function handles schema evolution by using the provided projection to map +/// fields from the Avro data to the expected Arrow schema. +/// +/// \param avro_node The Avro schema node (must be a record at root level) +/// \param avro_datum The Avro data to append +/// \param projection Schema projection from `projected_schema` to `avro_node` +/// \param projected_schema The projected schema +/// \param array_builder The Arrow array builder to append to (must be a struct builder) Review Comment: So it's a undefined behavior to pass-in a different type builder? Can the argument here just a StructBuilder? ########## src/iceberg/avro/avro_data_util.cc: ########## @@ -17,16 +17,440 @@ * under the License. */ +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/json/from_string.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <avro/Generic.hh> +#include <avro/Node.hh> +#include <avro/NodeImpl.hh> +#include <avro/Types.hh> + +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/avro/avro_data_util_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/schema_util.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" namespace iceberg::avro { +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Append Avro record data to Arrow struct builder. +Status AppendStructToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const std::span<const FieldProjection>& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + const auto& avro_record = avro_datum.value<::avro::GenericRecord>(); + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + for (size_t i = 0; i < projections.size(); ++i) { + const auto& field_projection = projections[i]; + const auto& expected_field = struct_type.fields()[i]; + auto* field_builder = struct_builder->field_builder(static_cast<int>(i)); + + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = + std::get<FieldProjection::SourceFieldIndex>(field_projection.from); + if (avro_field_index >= avro_record.fieldCount()) { + return InvalidArgument("Avro field index {} out of bound {}", avro_field_index, + avro_record.fieldCount()); + } + + const auto& avro_field_node = avro_node->leafAt(avro_field_index); + const auto& avro_field_datum = avro_record.fieldAt(avro_field_index); + ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(avro_field_node, avro_field_datum, + field_projection, expected_field, + field_builder)); + } else if (field_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } else { + return NotImplemented("Unsupported field projection kind: {}", + ToString(field_projection.kind)); + } + } + return {}; +} + +/// \brief Append Avro array data to Arrow list builder. +Status AppendListToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& projection, const ListType& list_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); + } + const auto& avro_array = avro_datum.value<::avro::GenericArray>(); + + auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append()); + + const auto& element_projection = projection.children[0]; + auto* value_builder = list_builder->value_builder(); + const auto& element_node = avro_node->leafAt(0); + const auto& element_field = list_type.fields().back(); + + for (const auto& element : avro_array.value()) { + ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder( + element_node, element, element_projection, element_field, value_builder)); + } + return {}; +} + +/// \brief Append Avro map data to Arrow map builder. +Status AppendMapToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& key_projection, + const FieldProjection& value_projection, + const MapType& map_type, ::arrow::ArrayBuilder* array_builder) { + auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder); + + if (avro_node->type() == ::avro::AVRO_MAP) { + // Handle regular Avro map: map<string, value> + const auto& avro_map = avro_datum.value<::avro::GenericMap>(); + const auto& map_entries = avro_map.value(); + + const auto& key_node = avro_node->leafAt(0); + const auto& value_node = avro_node->leafAt(1); + + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + for (const auto& entry : map_entries) { + ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder( + key_node, entry.first, key_projection, key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder( + value_node, entry.second, value_projection, value_field, item_builder)); + } + + return {}; + } else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) { + // Handle array-based map: list<struct<key, value>> + const auto& avro_array = avro_datum.value<::avro::GenericArray>(); + const auto& array_entries = avro_array.value(); + + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + const auto& record_node = avro_node->leafAt(0); Review Comment: should check leafCount == 1? ########## src/iceberg/avro/avro_data_util.cc: ########## @@ -17,16 +17,383 @@ * under the License. */ +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/json/from_string.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <avro/Generic.hh> +#include <avro/Node.hh> +#include <avro/Types.hh> + +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/avro/avro_data_util_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/schema_util.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" namespace iceberg::avro { +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Append Avro record data to Arrow struct builder. +Status AppendStructToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const std::span<const FieldProjection>& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + const auto& avro_record = avro_datum.value<::avro::GenericRecord>(); + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + for (size_t i = 0; i < projections.size(); ++i) { + const auto& field_projection = projections[i]; + const auto& expected_field = struct_type.fields()[i]; + auto* field_builder = struct_builder->field_builder(static_cast<int>(i)); + + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = + std::get<FieldProjection::SourceFieldIndex>(field_projection.from); + if (avro_field_index >= avro_record.fieldCount()) { + return InvalidArgument("Avro field index {} out of bound {}", avro_field_index, + avro_record.fieldCount()); + } + + const auto& avro_field_node = avro_node->leafAt(avro_field_index); + const auto& avro_field_datum = avro_record.fieldAt(avro_field_index); + ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(avro_field_node, avro_field_datum, + field_projection, expected_field, + field_builder)); + } else if (field_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } else { + return NotImplemented("Unsupported field projection kind: {}", + ToString(field_projection.kind)); + } + } + return {}; +} + +/// \brief Append Avro array data to Arrow list builder. +Status AppendListToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& projection, const ListType& list_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); + } + const auto& avro_array = avro_datum.value<::avro::GenericArray>(); + + auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); Review Comment: Seems like underlying typeof `::arrow::ArrayBuilder` is owned by user, and would have ub when type is wrong. But it's ok to me here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org