wgtmac commented on code in PR #374: URL: https://github.com/apache/iceberg-cpp/pull/374#discussion_r2609611268
########## src/iceberg/avro/avro_direct_decoder.cc: ########## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <avro/Decoder.hh> +#include <avro/Node.hh> +#include <avro/NodeImpl.hh> +#include <avro/Types.hh> + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/avro/avro_direct_decoder_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Skip an Avro value based on its schema without decoding +Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) { + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + decoder.decodeNull(); + return {}; + + case ::avro::AVRO_BOOL: + decoder.decodeBool(); + return {}; + + case ::avro::AVRO_INT: + decoder.decodeInt(); + return {}; + + case ::avro::AVRO_LONG: + decoder.decodeLong(); + return {}; + + case ::avro::AVRO_FLOAT: + decoder.decodeFloat(); + return {}; + + case ::avro::AVRO_DOUBLE: + decoder.decodeDouble(); + return {}; + + case ::avro::AVRO_STRING: + decoder.skipString(); + return {}; + + case ::avro::AVRO_BYTES: + decoder.skipBytes(); + return {}; + + case ::avro::AVRO_FIXED: + decoder.skipFixed(avro_node->fixedSize()); + return {}; + + case ::avro::AVRO_RECORD: { + // Skip all fields in order + for (size_t i = 0; i < avro_node->leaves(); ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), decoder)); + } + return {}; + } + + case ::avro::AVRO_ENUM: + decoder.decodeEnum(); + return {}; + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + // skipArray() returns count like arrayStart(), must handle all blocks + int64_t block_count = decoder.skipArray(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder)); + } + block_count = decoder.arrayNext(); + } + return {}; + } + + case ::avro::AVRO_MAP: { + const auto& value_node = avro_node->leafAt(1); + // skipMap() returns count like mapStart(), must handle all blocks + int64_t block_count = decoder.skipMap(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + decoder.skipString(); // Skip key (always string in Avro maps) + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder)); + } + block_count = decoder.mapNext(); + } + return {}; + } + + case ::avro::AVRO_UNION: { + const size_t branch_index = decoder.decodeUnionIndex(); + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, + num_branches); + } + return SkipAvroValue(avro_node->leafAt(branch_index), decoder); + } + + default: + return InvalidArgument("Unsupported Avro type for skipping: {}", + ToString(avro_node)); + } +} + +/// \brief Decode Avro record directly to Arrow struct builder. +Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + // Build a map from Avro field index to projection index + // -1 means the field should be skipped + std::vector<int> avro_to_projection(avro_node->leaves(), -1); Review Comment: I'm not sure if we need to cache `avro_to_projection` somewhere, perhaps in the `DecodeContext` that I propose in the below comment. We can use `const FieldProjection*` as the key to find the cached value. ########## src/iceberg/avro/avro_reader.cc: ########## @@ -62,12 +63,12 @@ Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions& // A stateful context to keep track of the reading progress. struct ReadContext { - // The datum to reuse for reading the data. - std::unique_ptr<::avro::GenericDatum> datum_; // The arrow schema to build the record batch. std::shared_ptr<::arrow::Schema> arrow_schema_; // The builder to build the record batch. std::shared_ptr<::arrow::ArrayBuilder> builder_; + // GenericDatum for legacy path (only used if direct decoder is disabled) Review Comment: ```suggestion // GenericDatum for intermediate data (only used if direct decoder is disabled) ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -137,25 +160,37 @@ class AvroReader::Impl { } while (context_->builder_->length() < batch_size_) { - if (split_end_ && reader_->pastSync(split_end_.value())) { + if (IsPastSync()) { break; } - if (!reader_->read(*context_->datum_)) { - break; + + if (use_direct_decoder_) { + // New path: Use direct decoder Review Comment: ```suggestion ``` ########## src/iceberg/avro/avro_direct_decoder.cc: ########## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <avro/Decoder.hh> +#include <avro/Node.hh> +#include <avro/NodeImpl.hh> +#include <avro/Types.hh> + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/avro/avro_direct_decoder_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Skip an Avro value based on its schema without decoding +Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) { + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + decoder.decodeNull(); + return {}; + + case ::avro::AVRO_BOOL: + decoder.decodeBool(); + return {}; + + case ::avro::AVRO_INT: + decoder.decodeInt(); + return {}; + + case ::avro::AVRO_LONG: + decoder.decodeLong(); + return {}; + + case ::avro::AVRO_FLOAT: + decoder.decodeFloat(); + return {}; + + case ::avro::AVRO_DOUBLE: + decoder.decodeDouble(); + return {}; + + case ::avro::AVRO_STRING: + decoder.skipString(); + return {}; + + case ::avro::AVRO_BYTES: + decoder.skipBytes(); + return {}; + + case ::avro::AVRO_FIXED: + decoder.skipFixed(avro_node->fixedSize()); + return {}; + + case ::avro::AVRO_RECORD: { + // Skip all fields in order + for (size_t i = 0; i < avro_node->leaves(); ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), decoder)); + } + return {}; + } + + case ::avro::AVRO_ENUM: + decoder.decodeEnum(); + return {}; + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + // skipArray() returns count like arrayStart(), must handle all blocks + int64_t block_count = decoder.skipArray(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder)); + } + block_count = decoder.arrayNext(); + } + return {}; + } + + case ::avro::AVRO_MAP: { + const auto& value_node = avro_node->leafAt(1); + // skipMap() returns count like mapStart(), must handle all blocks + int64_t block_count = decoder.skipMap(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + decoder.skipString(); // Skip key (always string in Avro maps) + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder)); + } + block_count = decoder.mapNext(); + } + return {}; + } + + case ::avro::AVRO_UNION: { + const size_t branch_index = decoder.decodeUnionIndex(); + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, + num_branches); + } + return SkipAvroValue(avro_node->leafAt(branch_index), decoder); + } + + default: + return InvalidArgument("Unsupported Avro type for skipping: {}", + ToString(avro_node)); + } +} + +/// \brief Decode Avro record directly to Arrow struct builder. +Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + // Build a map from Avro field index to projection index + // -1 means the field should be skipped + std::vector<int> avro_to_projection(avro_node->leaves(), -1); + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = std::get<size_t>(field_projection.from); + avro_to_projection[avro_field_index] = static_cast<int>(proj_idx); + } + } + + // Read all Avro fields in order (must maintain decoder position) + for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) { + int proj_idx = avro_to_projection[avro_idx]; + + if (proj_idx < 0) { + // Skip this field - not in projection + const auto& avro_field_node = avro_node->leafAt(avro_idx); + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder)); + } else { + // Decode this field + const auto& field_projection = projections[proj_idx]; + const auto& expected_field = struct_type.fields()[proj_idx]; + const auto& avro_field_node = avro_node->leafAt(avro_idx); + auto* field_builder = struct_builder->field_builder(proj_idx); + + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + avro_field_node, decoder, field_projection, expected_field, field_builder)); + } + } + + // Handle null fields (fields in projection but not in Avro) + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kNull) { + auto* field_builder = struct_builder->field_builder(static_cast<int>(proj_idx)); + ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } + } + return {}; +} + +/// \brief Decode Avro array directly to Arrow list builder. +Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& element_projection, + const ListType& list_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); + } + + auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append()); + + auto* value_builder = list_builder->value_builder(); + const auto& element_node = avro_node->leafAt(0); + const auto& element_field = list_type.fields().back(); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + element_node, decoder, element_projection, element_field, value_builder)); + } + block_count = decoder.arrayNext(); + } + + return {}; +} + +/// \brief Decode Avro map directly to Arrow map builder. +Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& key_projection, + const FieldProjection& value_projection, + const MapType& map_type, ::arrow::ArrayBuilder* array_builder) { + auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder); + + if (avro_node->type() == ::avro::AVRO_MAP) { + // Handle regular Avro map: map<string, value> + const auto& key_node = avro_node->leafAt(0); + const auto& value_node = avro_node->leafAt(1); + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + // Read map block count + int64_t block_count = decoder.mapStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder)); + } + block_count = decoder.mapNext(); + } + + return {}; + } else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) { + // Handle array-based map: list<struct<key, value>> + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + const auto& record_node = avro_node->leafAt(0); + if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) { + return InvalidArgument( + "Array-based map must contain records with exactly 2 fields, got: {}", + ToString(record_node)); + } + const auto& key_node = record_node->leafAt(0); + const auto& value_node = record_node->leafAt(1); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder)); + } + block_count = decoder.arrayNext(); + } + + return {}; + } else { + return InvalidArgument("Expected Avro map or array with map logical type, got: {}", + ToString(avro_node)); + } +} + +/// \brief Decode nested Avro data directly to Arrow array builder. +Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const NestedType& projected_type, + ::arrow::ArrayBuilder* array_builder) { + switch (projected_type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast<const StructType&>(projected_type); + return DecodeStructToBuilder(avro_node, decoder, projections, struct_type, + array_builder); + } + + case TypeId::kList: { + if (projections.size() != 1) { + return InvalidArgument("Expected 1 projection for list, got: {}", + projections.size()); + } + const auto& list_type = internal::checked_cast<const ListType&>(projected_type); + return DecodeListToBuilder(avro_node, decoder, projections[0], list_type, + array_builder); + } + + case TypeId::kMap: { + if (projections.size() != 2) { + return InvalidArgument("Expected 2 projections for map, got: {}", + projections.size()); + } + const auto& map_type = internal::checked_cast<const MapType&>(projected_type); + return DecodeMapToBuilder(avro_node, decoder, projections[0], projections[1], + map_type, array_builder); + } + + default: + return InvalidArgument("Unsupported nested type: {}", projected_type.ToString()); + } +} + +Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder) { + const auto& projected_type = *projected_field.type(); + if (!projected_type.is_primitive()) { + return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString()); + } + + switch (projected_type.type_id()) { + case TypeId::kBoolean: { + if (avro_node->type() != ::avro::AVRO_BOOL) { + return InvalidArgument("Expected Avro boolean for boolean field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BooleanBuilder*>(array_builder); + bool value = decoder.decodeBool(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kInt: { + if (avro_node->type() != ::avro::AVRO_INT) { + return InvalidArgument("Expected Avro int for int field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Int32Builder*>(array_builder); + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kLong: { + auto* builder = internal::checked_cast<::arrow::Int64Builder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_LONG) { + int64_t value = decoder.decodeLong(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_INT) { + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<int64_t>(value))); + } else { + return InvalidArgument("Expected Avro int/long for long field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kFloat: { + if (avro_node->type() != ::avro::AVRO_FLOAT) { + return InvalidArgument("Expected Avro float for float field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::FloatBuilder*>(array_builder); + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kDouble: { + auto* builder = internal::checked_cast<::arrow::DoubleBuilder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_DOUBLE) { + double value = decoder.decodeDouble(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_FLOAT) { + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<double>(value))); + } else { + return InvalidArgument("Expected Avro float/double for double field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kString: { + if (avro_node->type() != ::avro::AVRO_STRING) { + return InvalidArgument("Expected Avro string for string field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder); + std::string value; + decoder.decodeString(value); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kBinary: { + if (avro_node->type() != ::avro::AVRO_BYTES) { + return InvalidArgument("Expected Avro bytes for binary field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder); + std::vector<uint8_t> bytes; + decoder.decodeBytes(bytes); + ICEBERG_ARROW_RETURN_NOT_OK( + builder->Append(bytes.data(), static_cast<int32_t>(bytes.size()))); + return {}; + } + + case TypeId::kFixed: { + if (avro_node->type() != ::avro::AVRO_FIXED) { + return InvalidArgument("Expected Avro fixed for fixed field, got: {}", + ToString(avro_node)); + } + const auto& fixed_type = internal::checked_cast<const FixedType&>(projected_type); + + std::vector<uint8_t> fixed_data(fixed_type.length()); + decoder.decodeFixed(fixed_type.length(), fixed_data); + + auto* builder = + internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(fixed_data.data())); + return {}; + } + + case TypeId::kUuid: { + if (avro_node->type() != ::avro::AVRO_FIXED || + avro_node->logicalType().type() != ::avro::LogicalType::UUID) { + return InvalidArgument("Expected Avro fixed for uuid field, got: {}", + ToString(avro_node)); + } + + auto* builder = + internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); + + std::vector<uint8_t> uuid_data(16); + decoder.decodeFixed(16, uuid_data); + + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(uuid_data.data())); + return {}; + } + + case TypeId::kDecimal: { + if (avro_node->type() != ::avro::AVRO_FIXED) { + return InvalidArgument("Expected Avro fixed for decimal field, got: {}", + ToString(avro_node)); + } + if (avro_node->logicalType().type() != ::avro::LogicalType::DECIMAL) { + return InvalidArgument( + "Expected Avro DECIMAL logical type for decimal field, got logical type: {}", + static_cast<int>(avro_node->logicalType().type())); + } + + const auto& decimal_type = + internal::checked_cast<const DecimalType&>(projected_type); + + // Note: Avro C++ LogicalType doesn't expose precision/scale getters, + // so we rely on schema projection validation + + // Use Avro schema's fixed size (not calculated) + size_t byte_width = avro_node->fixedSize(); + + // Validate byte width is sufficient for precision + // Max value with P digits: 10^P - 1, needs ceil(P * log(10) / log(256)) bytes + size_t min_bytes = (decimal_type.precision() * 415) / 1000 + 1; // ceil(P * 0.415) + if (byte_width < min_bytes) { + return InvalidArgument( + "Decimal byte width {} insufficient for precision {}, need at least {} bytes", + byte_width, decimal_type.precision(), min_bytes); + } + Review Comment: ```suggestion ``` I don't think we need this check. ########## src/iceberg/test/avro_test.cc: ########## @@ -244,4 +246,234 @@ TEST_F(AvroReaderTest, AvroWriterNestedType) { WriteAndVerify(schema, expected_string); } +// Comprehensive tests using in-memory MockFileIO + +TEST_F(AvroReaderTest, AllPrimitiveTypes) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "bool_col", std::make_shared<BooleanType>()), + SchemaField::MakeRequired(2, "int_col", std::make_shared<IntType>()), + SchemaField::MakeRequired(3, "long_col", std::make_shared<LongType>()), + SchemaField::MakeRequired(4, "float_col", std::make_shared<FloatType>()), + SchemaField::MakeRequired(5, "double_col", std::make_shared<DoubleType>()), + SchemaField::MakeRequired(6, "string_col", std::make_shared<StringType>()), + SchemaField::MakeRequired(7, "binary_col", std::make_shared<BinaryType>())}); + + std::string expected_string = R"([ + [true, 42, 1234567890, 3.14, 2.71828, "test", "AQID"], + [false, -100, -9876543210, -1.5, 0.0, "hello", "BAUG"] + ])"; + + WriteAndVerify(schema, expected_string); +} + +// Skipping DecimalType test - requires specific decimal encoding in JSON + +TEST_F(AvroReaderTest, DateTimeTypes) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "date_col", std::make_shared<DateType>()), + SchemaField::MakeRequired(2, "time_col", std::make_shared<TimeType>()), + SchemaField::MakeRequired(3, "timestamp_col", std::make_shared<TimestampType>())}); + + // Dates as days since epoch, time/timestamps as microseconds + std::string expected_string = R"([ + [18628, 43200000000, 1640995200000000], + [18629, 86399000000, 1641081599000000] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, NestedStruct) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired( + 2, "person", + std::make_shared<iceberg::StructType>(std::vector<SchemaField>{ + SchemaField::MakeRequired(3, "name", std::make_shared<StringType>()), + SchemaField::MakeRequired(4, "age", std::make_shared<IntType>()), + SchemaField::MakeOptional( + 5, "address", + std::make_shared<iceberg::StructType>(std::vector<SchemaField>{ + SchemaField::MakeRequired(6, "street", + std::make_shared<StringType>()), + SchemaField::MakeRequired(7, "city", + std::make_shared<StringType>())}))}))}); + + std::string expected_string = R"([ + [1, ["Alice", 30, ["123 Main St", "NYC"]]], + [2, ["Bob", 25, ["456 Oak Ave", "LA"]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, ListType) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired(2, "tags", + std::make_shared<ListType>(SchemaField::MakeRequired( + 3, "element", std::make_shared<StringType>())))}); + + std::string expected_string = R"([ + [1, ["tag1", "tag2", "tag3"]], + [2, ["foo", "bar"]], + [3, []] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, MapType) { Review Comment: Can we test a map with a non-string-typed key? ########## src/iceberg/avro/avro_direct_decoder.cc: ########## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <avro/Decoder.hh> +#include <avro/Node.hh> +#include <avro/NodeImpl.hh> +#include <avro/Types.hh> + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/avro/avro_direct_decoder_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Skip an Avro value based on its schema without decoding +Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) { + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + decoder.decodeNull(); + return {}; + + case ::avro::AVRO_BOOL: + decoder.decodeBool(); + return {}; + + case ::avro::AVRO_INT: + decoder.decodeInt(); + return {}; + + case ::avro::AVRO_LONG: + decoder.decodeLong(); + return {}; + + case ::avro::AVRO_FLOAT: + decoder.decodeFloat(); + return {}; + + case ::avro::AVRO_DOUBLE: + decoder.decodeDouble(); + return {}; + + case ::avro::AVRO_STRING: + decoder.skipString(); + return {}; + + case ::avro::AVRO_BYTES: + decoder.skipBytes(); + return {}; + + case ::avro::AVRO_FIXED: + decoder.skipFixed(avro_node->fixedSize()); + return {}; + + case ::avro::AVRO_RECORD: { + // Skip all fields in order + for (size_t i = 0; i < avro_node->leaves(); ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), decoder)); + } + return {}; + } + + case ::avro::AVRO_ENUM: + decoder.decodeEnum(); + return {}; + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + // skipArray() returns count like arrayStart(), must handle all blocks + int64_t block_count = decoder.skipArray(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder)); + } + block_count = decoder.arrayNext(); + } + return {}; + } + + case ::avro::AVRO_MAP: { + const auto& value_node = avro_node->leafAt(1); + // skipMap() returns count like mapStart(), must handle all blocks + int64_t block_count = decoder.skipMap(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + decoder.skipString(); // Skip key (always string in Avro maps) + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder)); + } + block_count = decoder.mapNext(); + } + return {}; + } + + case ::avro::AVRO_UNION: { + const size_t branch_index = decoder.decodeUnionIndex(); + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, + num_branches); + } + return SkipAvroValue(avro_node->leafAt(branch_index), decoder); + } + + default: + return InvalidArgument("Unsupported Avro type for skipping: {}", + ToString(avro_node)); + } +} + +/// \brief Decode Avro record directly to Arrow struct builder. +Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + // Build a map from Avro field index to projection index + // -1 means the field should be skipped + std::vector<int> avro_to_projection(avro_node->leaves(), -1); + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = std::get<size_t>(field_projection.from); + avro_to_projection[avro_field_index] = static_cast<int>(proj_idx); + } + } + + // Read all Avro fields in order (must maintain decoder position) + for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) { + int proj_idx = avro_to_projection[avro_idx]; + + if (proj_idx < 0) { + // Skip this field - not in projection + const auto& avro_field_node = avro_node->leafAt(avro_idx); + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder)); + } else { + // Decode this field + const auto& field_projection = projections[proj_idx]; + const auto& expected_field = struct_type.fields()[proj_idx]; + const auto& avro_field_node = avro_node->leafAt(avro_idx); + auto* field_builder = struct_builder->field_builder(proj_idx); + + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + avro_field_node, decoder, field_projection, expected_field, field_builder)); + } + } + + // Handle null fields (fields in projection but not in Avro) + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kNull) { + auto* field_builder = struct_builder->field_builder(static_cast<int>(proj_idx)); + ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } + } + return {}; +} + +/// \brief Decode Avro array directly to Arrow list builder. +Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& element_projection, + const ListType& list_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); + } + + auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append()); + + auto* value_builder = list_builder->value_builder(); + const auto& element_node = avro_node->leafAt(0); + const auto& element_field = list_type.fields().back(); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + element_node, decoder, element_projection, element_field, value_builder)); + } + block_count = decoder.arrayNext(); + } + + return {}; +} + +/// \brief Decode Avro map directly to Arrow map builder. +Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& key_projection, + const FieldProjection& value_projection, + const MapType& map_type, ::arrow::ArrayBuilder* array_builder) { + auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder); + + if (avro_node->type() == ::avro::AVRO_MAP) { + // Handle regular Avro map: map<string, value> + const auto& key_node = avro_node->leafAt(0); + const auto& value_node = avro_node->leafAt(1); + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + // Read map block count + int64_t block_count = decoder.mapStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder)); + } + block_count = decoder.mapNext(); + } + + return {}; + } else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) { + // Handle array-based map: list<struct<key, value>> + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + const auto& record_node = avro_node->leafAt(0); + if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) { + return InvalidArgument( + "Array-based map must contain records with exactly 2 fields, got: {}", + ToString(record_node)); + } + const auto& key_node = record_node->leafAt(0); + const auto& value_node = record_node->leafAt(1); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder)); + } + block_count = decoder.arrayNext(); + } + + return {}; + } else { + return InvalidArgument("Expected Avro map or array with map logical type, got: {}", + ToString(avro_node)); + } +} + +/// \brief Decode nested Avro data directly to Arrow array builder. +Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const NestedType& projected_type, + ::arrow::ArrayBuilder* array_builder) { + switch (projected_type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast<const StructType&>(projected_type); + return DecodeStructToBuilder(avro_node, decoder, projections, struct_type, + array_builder); + } + + case TypeId::kList: { + if (projections.size() != 1) { + return InvalidArgument("Expected 1 projection for list, got: {}", + projections.size()); + } + const auto& list_type = internal::checked_cast<const ListType&>(projected_type); + return DecodeListToBuilder(avro_node, decoder, projections[0], list_type, + array_builder); + } + + case TypeId::kMap: { + if (projections.size() != 2) { + return InvalidArgument("Expected 2 projections for map, got: {}", + projections.size()); + } + const auto& map_type = internal::checked_cast<const MapType&>(projected_type); + return DecodeMapToBuilder(avro_node, decoder, projections[0], projections[1], + map_type, array_builder); + } + + default: + return InvalidArgument("Unsupported nested type: {}", projected_type.ToString()); + } +} + +Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder) { + const auto& projected_type = *projected_field.type(); + if (!projected_type.is_primitive()) { + return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString()); + } + + switch (projected_type.type_id()) { + case TypeId::kBoolean: { + if (avro_node->type() != ::avro::AVRO_BOOL) { + return InvalidArgument("Expected Avro boolean for boolean field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BooleanBuilder*>(array_builder); + bool value = decoder.decodeBool(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kInt: { + if (avro_node->type() != ::avro::AVRO_INT) { + return InvalidArgument("Expected Avro int for int field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Int32Builder*>(array_builder); + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kLong: { + auto* builder = internal::checked_cast<::arrow::Int64Builder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_LONG) { + int64_t value = decoder.decodeLong(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_INT) { + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<int64_t>(value))); + } else { + return InvalidArgument("Expected Avro int/long for long field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kFloat: { + if (avro_node->type() != ::avro::AVRO_FLOAT) { + return InvalidArgument("Expected Avro float for float field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::FloatBuilder*>(array_builder); + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kDouble: { + auto* builder = internal::checked_cast<::arrow::DoubleBuilder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_DOUBLE) { + double value = decoder.decodeDouble(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_FLOAT) { + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<double>(value))); + } else { + return InvalidArgument("Expected Avro float/double for double field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kString: { + if (avro_node->type() != ::avro::AVRO_STRING) { + return InvalidArgument("Expected Avro string for string field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder); + std::string value; + decoder.decodeString(value); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kBinary: { + if (avro_node->type() != ::avro::AVRO_BYTES) { + return InvalidArgument("Expected Avro bytes for binary field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder); + std::vector<uint8_t> bytes; Review Comment: Same for other temp vector and string variables. ########## src/iceberg/avro/avro_direct_decoder.cc: ########## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <avro/Decoder.hh> +#include <avro/Node.hh> +#include <avro/NodeImpl.hh> +#include <avro/Types.hh> + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/avro/avro_direct_decoder_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Skip an Avro value based on its schema without decoding +Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) { + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + decoder.decodeNull(); + return {}; + + case ::avro::AVRO_BOOL: + decoder.decodeBool(); + return {}; + + case ::avro::AVRO_INT: + decoder.decodeInt(); + return {}; + + case ::avro::AVRO_LONG: + decoder.decodeLong(); + return {}; + + case ::avro::AVRO_FLOAT: + decoder.decodeFloat(); + return {}; + + case ::avro::AVRO_DOUBLE: + decoder.decodeDouble(); + return {}; + + case ::avro::AVRO_STRING: + decoder.skipString(); + return {}; + + case ::avro::AVRO_BYTES: + decoder.skipBytes(); + return {}; + + case ::avro::AVRO_FIXED: + decoder.skipFixed(avro_node->fixedSize()); + return {}; + + case ::avro::AVRO_RECORD: { + // Skip all fields in order + for (size_t i = 0; i < avro_node->leaves(); ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), decoder)); + } + return {}; + } + + case ::avro::AVRO_ENUM: + decoder.decodeEnum(); + return {}; + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + // skipArray() returns count like arrayStart(), must handle all blocks + int64_t block_count = decoder.skipArray(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder)); + } + block_count = decoder.arrayNext(); + } + return {}; + } + + case ::avro::AVRO_MAP: { + const auto& value_node = avro_node->leafAt(1); + // skipMap() returns count like mapStart(), must handle all blocks + int64_t block_count = decoder.skipMap(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + decoder.skipString(); // Skip key (always string in Avro maps) + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder)); + } + block_count = decoder.mapNext(); + } + return {}; + } + + case ::avro::AVRO_UNION: { + const size_t branch_index = decoder.decodeUnionIndex(); + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, + num_branches); + } + return SkipAvroValue(avro_node->leafAt(branch_index), decoder); + } + + default: + return InvalidArgument("Unsupported Avro type for skipping: {}", + ToString(avro_node)); + } +} + +/// \brief Decode Avro record directly to Arrow struct builder. +Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + // Build a map from Avro field index to projection index + // -1 means the field should be skipped + std::vector<int> avro_to_projection(avro_node->leaves(), -1); + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = std::get<size_t>(field_projection.from); + avro_to_projection[avro_field_index] = static_cast<int>(proj_idx); + } + } + + // Read all Avro fields in order (must maintain decoder position) + for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) { + int proj_idx = avro_to_projection[avro_idx]; + + if (proj_idx < 0) { + // Skip this field - not in projection + const auto& avro_field_node = avro_node->leafAt(avro_idx); + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder)); + } else { + // Decode this field + const auto& field_projection = projections[proj_idx]; + const auto& expected_field = struct_type.fields()[proj_idx]; + const auto& avro_field_node = avro_node->leafAt(avro_idx); + auto* field_builder = struct_builder->field_builder(proj_idx); + + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + avro_field_node, decoder, field_projection, expected_field, field_builder)); + } + } + + // Handle null fields (fields in projection but not in Avro) + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kNull) { + auto* field_builder = struct_builder->field_builder(static_cast<int>(proj_idx)); + ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } + } + return {}; +} + +/// \brief Decode Avro array directly to Arrow list builder. +Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& element_projection, + const ListType& list_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); + } + + auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append()); + + auto* value_builder = list_builder->value_builder(); + const auto& element_node = avro_node->leafAt(0); + const auto& element_field = list_type.fields().back(); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + element_node, decoder, element_projection, element_field, value_builder)); + } + block_count = decoder.arrayNext(); + } + + return {}; +} + +/// \brief Decode Avro map directly to Arrow map builder. +Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& key_projection, + const FieldProjection& value_projection, + const MapType& map_type, ::arrow::ArrayBuilder* array_builder) { + auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder); + + if (avro_node->type() == ::avro::AVRO_MAP) { + // Handle regular Avro map: map<string, value> + const auto& key_node = avro_node->leafAt(0); + const auto& value_node = avro_node->leafAt(1); + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + // Read map block count + int64_t block_count = decoder.mapStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder)); + } + block_count = decoder.mapNext(); + } + + return {}; + } else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) { + // Handle array-based map: list<struct<key, value>> + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + const auto& record_node = avro_node->leafAt(0); + if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) { + return InvalidArgument( + "Array-based map must contain records with exactly 2 fields, got: {}", + ToString(record_node)); + } + const auto& key_node = record_node->leafAt(0); + const auto& value_node = record_node->leafAt(1); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder)); + } + block_count = decoder.arrayNext(); + } + + return {}; + } else { + return InvalidArgument("Expected Avro map or array with map logical type, got: {}", + ToString(avro_node)); + } +} + +/// \brief Decode nested Avro data directly to Arrow array builder. +Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const NestedType& projected_type, + ::arrow::ArrayBuilder* array_builder) { + switch (projected_type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast<const StructType&>(projected_type); + return DecodeStructToBuilder(avro_node, decoder, projections, struct_type, + array_builder); + } + + case TypeId::kList: { + if (projections.size() != 1) { + return InvalidArgument("Expected 1 projection for list, got: {}", + projections.size()); + } + const auto& list_type = internal::checked_cast<const ListType&>(projected_type); + return DecodeListToBuilder(avro_node, decoder, projections[0], list_type, + array_builder); + } + + case TypeId::kMap: { + if (projections.size() != 2) { + return InvalidArgument("Expected 2 projections for map, got: {}", + projections.size()); + } + const auto& map_type = internal::checked_cast<const MapType&>(projected_type); + return DecodeMapToBuilder(avro_node, decoder, projections[0], projections[1], + map_type, array_builder); + } + + default: + return InvalidArgument("Unsupported nested type: {}", projected_type.ToString()); + } +} + +Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder) { + const auto& projected_type = *projected_field.type(); + if (!projected_type.is_primitive()) { + return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString()); + } + + switch (projected_type.type_id()) { + case TypeId::kBoolean: { + if (avro_node->type() != ::avro::AVRO_BOOL) { + return InvalidArgument("Expected Avro boolean for boolean field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BooleanBuilder*>(array_builder); + bool value = decoder.decodeBool(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kInt: { + if (avro_node->type() != ::avro::AVRO_INT) { + return InvalidArgument("Expected Avro int for int field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Int32Builder*>(array_builder); + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kLong: { + auto* builder = internal::checked_cast<::arrow::Int64Builder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_LONG) { + int64_t value = decoder.decodeLong(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_INT) { + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<int64_t>(value))); + } else { + return InvalidArgument("Expected Avro int/long for long field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kFloat: { + if (avro_node->type() != ::avro::AVRO_FLOAT) { + return InvalidArgument("Expected Avro float for float field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::FloatBuilder*>(array_builder); + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kDouble: { + auto* builder = internal::checked_cast<::arrow::DoubleBuilder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_DOUBLE) { + double value = decoder.decodeDouble(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_FLOAT) { + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<double>(value))); + } else { + return InvalidArgument("Expected Avro float/double for double field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kString: { + if (avro_node->type() != ::avro::AVRO_STRING) { + return InvalidArgument("Expected Avro string for string field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder); + std::string value; + decoder.decodeString(value); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kBinary: { + if (avro_node->type() != ::avro::AVRO_BYTES) { + return InvalidArgument("Expected Avro bytes for binary field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder); + std::vector<uint8_t> bytes; + decoder.decodeBytes(bytes); + ICEBERG_ARROW_RETURN_NOT_OK( + builder->Append(bytes.data(), static_cast<int32_t>(bytes.size()))); + return {}; + } + + case TypeId::kFixed: { + if (avro_node->type() != ::avro::AVRO_FIXED) { + return InvalidArgument("Expected Avro fixed for fixed field, got: {}", + ToString(avro_node)); + } + const auto& fixed_type = internal::checked_cast<const FixedType&>(projected_type); + + std::vector<uint8_t> fixed_data(fixed_type.length()); + decoder.decodeFixed(fixed_type.length(), fixed_data); + + auto* builder = + internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(fixed_data.data())); + return {}; + } + + case TypeId::kUuid: { + if (avro_node->type() != ::avro::AVRO_FIXED || + avro_node->logicalType().type() != ::avro::LogicalType::UUID) { + return InvalidArgument("Expected Avro fixed for uuid field, got: {}", + ToString(avro_node)); + } + + auto* builder = + internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); + + std::vector<uint8_t> uuid_data(16); + decoder.decodeFixed(16, uuid_data); + + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(uuid_data.data())); + return {}; + } + + case TypeId::kDecimal: { + if (avro_node->type() != ::avro::AVRO_FIXED) { + return InvalidArgument("Expected Avro fixed for decimal field, got: {}", + ToString(avro_node)); + } + if (avro_node->logicalType().type() != ::avro::LogicalType::DECIMAL) { Review Comment: Can we combine the two `if`s above just like other branches? ########## src/iceberg/avro/avro_reader.cc: ########## @@ -214,6 +248,11 @@ class AvroReader::Impl { } context_->builder_ = builder_result.MoveValueUnsafe(); + // Initialize GenericDatum for legacy path Review Comment: ```suggestion ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -238,17 +277,54 @@ class AvroReader::Impl { return arrow_array; } + // Helper: Check if past sync point + bool IsPastSync() const { + if (!split_end_) return false; + return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value()) + : datum_reader_->pastSync(split_end_.value()); + } + + // Helper: Get metadata from appropriate reader + ::avro::Metadata GetReaderMetadata() const { + return use_direct_decoder_ ? base_reader_->metadata() : datum_reader_->metadata(); + } + + // Helper: Close the appropriate reader + void CloseReader() { + if (use_direct_decoder_) { + if (base_reader_) { + base_reader_->close(); + base_reader_.reset(); + } + } else { + if (datum_reader_) { + datum_reader_->close(); + datum_reader_.reset(); + } + } + } + + // Helper: Get reader schema + const ::avro::ValidSchema& GetReaderSchema() const { + return use_direct_decoder_ ? base_reader_->readerSchema() + : datum_reader_->readerSchema(); + } + private: // Max number of rows in the record batch to read. int64_t batch_size_{}; + // Whether to use direct decoder (true) or GenericDatum-based decoder (false). + bool use_direct_decoder_{true}; // The end of the split to read and used to terminate the reading. std::optional<int64_t> split_end_; // The schema to read. std::shared_ptr<::iceberg::Schema> read_schema_; // The projection result to apply to the read schema. SchemaProjection projection_; - // The avro reader to read the data into a datum. - std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_; + // The avro reader base - provides direct access to decoder (new path). Review Comment: I would rather not call them new or legacy path... ########## src/iceberg/avro/avro_reader.cc: ########## @@ -91,10 +94,21 @@ class AvroReader::Impl { ICEBERG_ASSIGN_OR_RAISE(auto input_stream, CreateInputStream(options, kDefaultBufferSize)); - // Create a base reader without setting reader schema to enable projection. - auto base_reader = - std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream)); - ::avro::ValidSchema file_schema = base_reader->dataSchema(); + ::avro::ValidSchema file_schema; + + if (use_direct_decoder_) { + // New path: Create base reader for direct decoder access + auto base_reader = + std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream)); + file_schema = base_reader->dataSchema(); + base_reader_ = std::move(base_reader); + } else { + // Legacy path: Create DataFileReader<GenericDatum> Review Comment: ```suggestion // Create a reader to read into generic datum ``` ########## src/iceberg/avro/avro_direct_decoder.cc: ########## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <avro/Decoder.hh> +#include <avro/Node.hh> +#include <avro/NodeImpl.hh> +#include <avro/Types.hh> + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/avro/avro_direct_decoder_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Skip an Avro value based on its schema without decoding +Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) { + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + decoder.decodeNull(); + return {}; + + case ::avro::AVRO_BOOL: + decoder.decodeBool(); + return {}; + + case ::avro::AVRO_INT: + decoder.decodeInt(); + return {}; + + case ::avro::AVRO_LONG: + decoder.decodeLong(); + return {}; + + case ::avro::AVRO_FLOAT: + decoder.decodeFloat(); + return {}; + + case ::avro::AVRO_DOUBLE: + decoder.decodeDouble(); + return {}; + + case ::avro::AVRO_STRING: + decoder.skipString(); + return {}; + + case ::avro::AVRO_BYTES: + decoder.skipBytes(); + return {}; + + case ::avro::AVRO_FIXED: + decoder.skipFixed(avro_node->fixedSize()); + return {}; + + case ::avro::AVRO_RECORD: { + // Skip all fields in order + for (size_t i = 0; i < avro_node->leaves(); ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), decoder)); + } + return {}; + } + + case ::avro::AVRO_ENUM: + decoder.decodeEnum(); + return {}; + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + // skipArray() returns count like arrayStart(), must handle all blocks + int64_t block_count = decoder.skipArray(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder)); + } + block_count = decoder.arrayNext(); + } + return {}; + } + + case ::avro::AVRO_MAP: { + const auto& value_node = avro_node->leafAt(1); + // skipMap() returns count like mapStart(), must handle all blocks + int64_t block_count = decoder.skipMap(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + decoder.skipString(); // Skip key (always string in Avro maps) + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder)); + } + block_count = decoder.mapNext(); + } + return {}; + } + + case ::avro::AVRO_UNION: { + const size_t branch_index = decoder.decodeUnionIndex(); + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, + num_branches); + } + return SkipAvroValue(avro_node->leafAt(branch_index), decoder); + } + + default: + return InvalidArgument("Unsupported Avro type for skipping: {}", + ToString(avro_node)); + } +} + +/// \brief Decode Avro record directly to Arrow struct builder. +Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + // Build a map from Avro field index to projection index + // -1 means the field should be skipped + std::vector<int> avro_to_projection(avro_node->leaves(), -1); + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = std::get<size_t>(field_projection.from); + avro_to_projection[avro_field_index] = static_cast<int>(proj_idx); + } + } + + // Read all Avro fields in order (must maintain decoder position) + for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) { + int proj_idx = avro_to_projection[avro_idx]; + + if (proj_idx < 0) { + // Skip this field - not in projection + const auto& avro_field_node = avro_node->leafAt(avro_idx); + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder)); + } else { + // Decode this field + const auto& field_projection = projections[proj_idx]; + const auto& expected_field = struct_type.fields()[proj_idx]; + const auto& avro_field_node = avro_node->leafAt(avro_idx); + auto* field_builder = struct_builder->field_builder(proj_idx); + + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + avro_field_node, decoder, field_projection, expected_field, field_builder)); + } + } + + // Handle null fields (fields in projection but not in Avro) + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kNull) { + auto* field_builder = struct_builder->field_builder(static_cast<int>(proj_idx)); + ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } + } + return {}; +} + +/// \brief Decode Avro array directly to Arrow list builder. +Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& element_projection, + const ListType& list_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); + } + + auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append()); + + auto* value_builder = list_builder->value_builder(); + const auto& element_node = avro_node->leafAt(0); + const auto& element_field = list_type.fields().back(); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + element_node, decoder, element_projection, element_field, value_builder)); + } + block_count = decoder.arrayNext(); + } + + return {}; +} + +/// \brief Decode Avro map directly to Arrow map builder. +Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& key_projection, + const FieldProjection& value_projection, + const MapType& map_type, ::arrow::ArrayBuilder* array_builder) { + auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder); + + if (avro_node->type() == ::avro::AVRO_MAP) { + // Handle regular Avro map: map<string, value> + const auto& key_node = avro_node->leafAt(0); + const auto& value_node = avro_node->leafAt(1); + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + // Read map block count + int64_t block_count = decoder.mapStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder)); + } + block_count = decoder.mapNext(); + } + + return {}; + } else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) { + // Handle array-based map: list<struct<key, value>> + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + const auto& record_node = avro_node->leafAt(0); + if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) { + return InvalidArgument( + "Array-based map must contain records with exactly 2 fields, got: {}", + ToString(record_node)); + } + const auto& key_node = record_node->leafAt(0); + const auto& value_node = record_node->leafAt(1); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder)); + } + block_count = decoder.arrayNext(); + } + + return {}; + } else { + return InvalidArgument("Expected Avro map or array with map logical type, got: {}", + ToString(avro_node)); + } +} + +/// \brief Decode nested Avro data directly to Arrow array builder. +Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const NestedType& projected_type, + ::arrow::ArrayBuilder* array_builder) { + switch (projected_type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast<const StructType&>(projected_type); + return DecodeStructToBuilder(avro_node, decoder, projections, struct_type, + array_builder); + } + + case TypeId::kList: { + if (projections.size() != 1) { + return InvalidArgument("Expected 1 projection for list, got: {}", + projections.size()); + } + const auto& list_type = internal::checked_cast<const ListType&>(projected_type); + return DecodeListToBuilder(avro_node, decoder, projections[0], list_type, + array_builder); + } + + case TypeId::kMap: { + if (projections.size() != 2) { + return InvalidArgument("Expected 2 projections for map, got: {}", + projections.size()); + } + const auto& map_type = internal::checked_cast<const MapType&>(projected_type); + return DecodeMapToBuilder(avro_node, decoder, projections[0], projections[1], + map_type, array_builder); + } + + default: + return InvalidArgument("Unsupported nested type: {}", projected_type.ToString()); + } +} + +Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder) { + const auto& projected_type = *projected_field.type(); + if (!projected_type.is_primitive()) { + return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString()); + } + + switch (projected_type.type_id()) { + case TypeId::kBoolean: { + if (avro_node->type() != ::avro::AVRO_BOOL) { + return InvalidArgument("Expected Avro boolean for boolean field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BooleanBuilder*>(array_builder); + bool value = decoder.decodeBool(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kInt: { + if (avro_node->type() != ::avro::AVRO_INT) { + return InvalidArgument("Expected Avro int for int field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Int32Builder*>(array_builder); + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kLong: { + auto* builder = internal::checked_cast<::arrow::Int64Builder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_LONG) { + int64_t value = decoder.decodeLong(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_INT) { + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<int64_t>(value))); + } else { + return InvalidArgument("Expected Avro int/long for long field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kFloat: { + if (avro_node->type() != ::avro::AVRO_FLOAT) { + return InvalidArgument("Expected Avro float for float field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::FloatBuilder*>(array_builder); + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kDouble: { + auto* builder = internal::checked_cast<::arrow::DoubleBuilder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_DOUBLE) { + double value = decoder.decodeDouble(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_FLOAT) { + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<double>(value))); + } else { + return InvalidArgument("Expected Avro float/double for double field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kString: { + if (avro_node->type() != ::avro::AVRO_STRING) { + return InvalidArgument("Expected Avro string for string field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder); + std::string value; + decoder.decodeString(value); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kBinary: { + if (avro_node->type() != ::avro::AVRO_BYTES) { + return InvalidArgument("Expected Avro bytes for binary field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder); + std::vector<uint8_t> bytes; Review Comment: Do we want to avoid frequent small object allocation like this? Perhaps we can reuse it by adding a `class DecodeContext` where a `std::vector<uint8_t> scratch` object is supposed to be used from it. ########## src/iceberg/avro/avro_direct_decoder.cc: ########## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <avro/Decoder.hh> +#include <avro/Node.hh> +#include <avro/NodeImpl.hh> +#include <avro/Types.hh> + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/avro/avro_direct_decoder_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Skip an Avro value based on its schema without decoding +Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) { + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + decoder.decodeNull(); + return {}; + + case ::avro::AVRO_BOOL: + decoder.decodeBool(); + return {}; + + case ::avro::AVRO_INT: + decoder.decodeInt(); + return {}; + + case ::avro::AVRO_LONG: + decoder.decodeLong(); + return {}; + + case ::avro::AVRO_FLOAT: + decoder.decodeFloat(); + return {}; + + case ::avro::AVRO_DOUBLE: + decoder.decodeDouble(); + return {}; + + case ::avro::AVRO_STRING: + decoder.skipString(); + return {}; + + case ::avro::AVRO_BYTES: + decoder.skipBytes(); + return {}; + + case ::avro::AVRO_FIXED: + decoder.skipFixed(avro_node->fixedSize()); + return {}; + + case ::avro::AVRO_RECORD: { + // Skip all fields in order + for (size_t i = 0; i < avro_node->leaves(); ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), decoder)); + } + return {}; + } + + case ::avro::AVRO_ENUM: + decoder.decodeEnum(); + return {}; + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + // skipArray() returns count like arrayStart(), must handle all blocks + int64_t block_count = decoder.skipArray(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder)); + } + block_count = decoder.arrayNext(); + } + return {}; + } + + case ::avro::AVRO_MAP: { + const auto& value_node = avro_node->leafAt(1); + // skipMap() returns count like mapStart(), must handle all blocks + int64_t block_count = decoder.skipMap(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + decoder.skipString(); // Skip key (always string in Avro maps) + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder)); + } + block_count = decoder.mapNext(); + } + return {}; + } + + case ::avro::AVRO_UNION: { + const size_t branch_index = decoder.decodeUnionIndex(); + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, + num_branches); + } + return SkipAvroValue(avro_node->leafAt(branch_index), decoder); + } + + default: + return InvalidArgument("Unsupported Avro type for skipping: {}", + ToString(avro_node)); + } +} + +/// \brief Decode Avro record directly to Arrow struct builder. +Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const std::span<const FieldProjection>& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + // Build a map from Avro field index to projection index + // -1 means the field should be skipped + std::vector<int> avro_to_projection(avro_node->leaves(), -1); + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = std::get<size_t>(field_projection.from); + avro_to_projection[avro_field_index] = static_cast<int>(proj_idx); + } + } + + // Read all Avro fields in order (must maintain decoder position) + for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) { + int proj_idx = avro_to_projection[avro_idx]; + + if (proj_idx < 0) { + // Skip this field - not in projection + const auto& avro_field_node = avro_node->leafAt(avro_idx); + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder)); + } else { + // Decode this field + const auto& field_projection = projections[proj_idx]; + const auto& expected_field = struct_type.fields()[proj_idx]; + const auto& avro_field_node = avro_node->leafAt(avro_idx); + auto* field_builder = struct_builder->field_builder(proj_idx); + + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + avro_field_node, decoder, field_projection, expected_field, field_builder)); + } + } + + // Handle null fields (fields in projection but not in Avro) + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kNull) { Review Comment: Return error for other unsupported `kind`? ########## src/iceberg/avro/avro_reader.cc: ########## @@ -91,10 +94,21 @@ class AvroReader::Impl { ICEBERG_ASSIGN_OR_RAISE(auto input_stream, CreateInputStream(options, kDefaultBufferSize)); - // Create a base reader without setting reader schema to enable projection. - auto base_reader = - std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream)); - ::avro::ValidSchema file_schema = base_reader->dataSchema(); + ::avro::ValidSchema file_schema; + + if (use_direct_decoder_) { + // New path: Create base reader for direct decoder access Review Comment: ```suggestion // Create a base reader to directly use its decoder ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -137,25 +160,37 @@ class AvroReader::Impl { } while (context_->builder_->length() < batch_size_) { - if (split_end_ && reader_->pastSync(split_end_.value())) { + if (IsPastSync()) { break; } - if (!reader_->read(*context_->datum_)) { - break; + + if (use_direct_decoder_) { + // New path: Use direct decoder + if (!base_reader_->hasMore()) { + break; + } + base_reader_->decr(); + + ICEBERG_RETURN_UNEXPECTED( + DecodeAvroToBuilder(GetReaderSchema().root(), base_reader_->decoder(), + projection_, *read_schema_, context_->builder_.get())); + } else { + // Legacy path: Use GenericDatum Review Comment: ```suggestion ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -238,17 +277,54 @@ class AvroReader::Impl { return arrow_array; } + // Helper: Check if past sync point Review Comment: ```suggestion ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -238,17 +277,54 @@ class AvroReader::Impl { return arrow_array; } + // Helper: Check if past sync point + bool IsPastSync() const { + if (!split_end_) return false; + return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value()) + : datum_reader_->pastSync(split_end_.value()); + } + + // Helper: Get metadata from appropriate reader + ::avro::Metadata GetReaderMetadata() const { + return use_direct_decoder_ ? base_reader_->metadata() : datum_reader_->metadata(); + } + + // Helper: Close the appropriate reader + void CloseReader() { + if (use_direct_decoder_) { + if (base_reader_) { + base_reader_->close(); + base_reader_.reset(); + } + } else { + if (datum_reader_) { + datum_reader_->close(); + datum_reader_.reset(); + } + } + } + + // Helper: Get reader schema Review Comment: ```suggestion ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -238,17 +277,54 @@ class AvroReader::Impl { return arrow_array; } + // Helper: Check if past sync point + bool IsPastSync() const { + if (!split_end_) return false; + return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value()) + : datum_reader_->pastSync(split_end_.value()); + } + + // Helper: Get metadata from appropriate reader + ::avro::Metadata GetReaderMetadata() const { + return use_direct_decoder_ ? base_reader_->metadata() : datum_reader_->metadata(); + } + + // Helper: Close the appropriate reader Review Comment: ```suggestion ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -238,17 +277,54 @@ class AvroReader::Impl { return arrow_array; } + // Helper: Check if past sync point + bool IsPastSync() const { + if (!split_end_) return false; + return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value()) + : datum_reader_->pastSync(split_end_.value()); + } + + // Helper: Get metadata from appropriate reader Review Comment: ```suggestion ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -238,17 +277,54 @@ class AvroReader::Impl { return arrow_array; } + // Helper: Check if past sync point + bool IsPastSync() const { + if (!split_end_) return false; Review Comment: ```suggestion if (!split_end_) { return false; } ``` ########## src/iceberg/file_reader.h: ########## @@ -76,6 +76,10 @@ class ReaderProperties : public ConfigBase<ReaderProperties> { /// \brief The batch size to read. inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096}; + /// \brief Use direct Avro decoder (true) or GenericDatum-based decoder (false). + /// Default: true (use direct decoder for better performance). + inline static Entry<bool> kAvroUseDirectDecoder{"avro.use-direct-decoder", true}; Review Comment: I would call it `read.avro.skip-datum`, WDYT? ########## src/iceberg/test/avro_test.cc: ########## @@ -244,4 +246,234 @@ TEST_F(AvroReaderTest, AvroWriterNestedType) { WriteAndVerify(schema, expected_string); } +// Comprehensive tests using in-memory MockFileIO + +TEST_F(AvroReaderTest, AllPrimitiveTypes) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "bool_col", std::make_shared<BooleanType>()), + SchemaField::MakeRequired(2, "int_col", std::make_shared<IntType>()), + SchemaField::MakeRequired(3, "long_col", std::make_shared<LongType>()), + SchemaField::MakeRequired(4, "float_col", std::make_shared<FloatType>()), + SchemaField::MakeRequired(5, "double_col", std::make_shared<DoubleType>()), + SchemaField::MakeRequired(6, "string_col", std::make_shared<StringType>()), + SchemaField::MakeRequired(7, "binary_col", std::make_shared<BinaryType>())}); + + std::string expected_string = R"([ + [true, 42, 1234567890, 3.14, 2.71828, "test", "AQID"], + [false, -100, -9876543210, -1.5, 0.0, "hello", "BAUG"] + ])"; + + WriteAndVerify(schema, expected_string); +} + +// Skipping DecimalType test - requires specific decimal encoding in JSON + +TEST_F(AvroReaderTest, DateTimeTypes) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "date_col", std::make_shared<DateType>()), + SchemaField::MakeRequired(2, "time_col", std::make_shared<TimeType>()), + SchemaField::MakeRequired(3, "timestamp_col", std::make_shared<TimestampType>())}); + + // Dates as days since epoch, time/timestamps as microseconds + std::string expected_string = R"([ + [18628, 43200000000, 1640995200000000], + [18629, 86399000000, 1641081599000000] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, NestedStruct) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired( + 2, "person", + std::make_shared<iceberg::StructType>(std::vector<SchemaField>{ + SchemaField::MakeRequired(3, "name", std::make_shared<StringType>()), + SchemaField::MakeRequired(4, "age", std::make_shared<IntType>()), + SchemaField::MakeOptional( + 5, "address", + std::make_shared<iceberg::StructType>(std::vector<SchemaField>{ + SchemaField::MakeRequired(6, "street", + std::make_shared<StringType>()), + SchemaField::MakeRequired(7, "city", + std::make_shared<StringType>())}))}))}); + + std::string expected_string = R"([ + [1, ["Alice", 30, ["123 Main St", "NYC"]]], + [2, ["Bob", 25, ["456 Oak Ave", "LA"]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, ListType) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired(2, "tags", + std::make_shared<ListType>(SchemaField::MakeRequired( + 3, "element", std::make_shared<StringType>())))}); + + std::string expected_string = R"([ + [1, ["tag1", "tag2", "tag3"]], + [2, ["foo", "bar"]], + [3, []] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, MapType) { + auto schema = std::make_shared<iceberg::Schema>( + std::vector<SchemaField>{SchemaField::MakeRequired( + 1, "properties", + std::make_shared<MapType>( + SchemaField::MakeRequired(2, "key", std::make_shared<StringType>()), + SchemaField::MakeRequired(3, "value", std::make_shared<IntType>())))}); + + std::string expected_string = R"([ + [[["key1", 100], ["key2", 200]]], + [[["a", 1], ["b", 2], ["c", 3]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, ComplexNestedTypes) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired(2, "nested_list", + std::make_shared<ListType>(SchemaField::MakeRequired( + 3, "element", + std::make_shared<ListType>(SchemaField::MakeRequired( + 4, "element", std::make_shared<IntType>())))))}); + + std::string expected_string = R"([ + [1, [[1, 2], [3, 4]]], + [2, [[5], [6, 7, 8]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, OptionalFieldsWithNulls) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeOptional(2, "name", std::make_shared<StringType>()), + SchemaField::MakeOptional(3, "age", std::make_shared<IntType>())}); + + std::string expected_string = R"([ + [1, "Alice", 30], + [2, null, 25], + [3, "Charlie", null], + [4, null, null] + ])"; + + WriteAndVerify(schema, expected_string); +} + +// Test both direct decoder and GenericDatum paths +TEST_F(AvroReaderTest, DirectDecoderVsGenericDatum) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeOptional(2, "name", std::make_shared<StringType>()), + SchemaField::MakeRequired( + 3, "nested", + std::make_shared<iceberg::StructType>(std::vector<SchemaField>{ + SchemaField::MakeRequired(4, "value", std::make_shared<DoubleType>())}))}); + + std::string expected_string = R"([ + [1, "Alice", [3.14]], + [2, null, [2.71]], + [3, "Bob", [1.41]] + ])"; + + // Test with direct decoder (default) + { + temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro"); + WriteAndVerify(schema, expected_string); + } + + // Test with GenericDatum decoder + { + temp_avro_file_ = CreateNewTempFilePathWithSuffix("_generic.avro"); + auto reader_properties = ReaderProperties::default_properties(); + reader_properties->Set(ReaderProperties::kAvroUseDirectDecoder, false); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema); + ASSERT_TRUE(arrow_schema_result.ok()); + auto arrow_schema = arrow_schema_result.ValueOrDie(); + + auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, expected_string); + ASSERT_TRUE(array_result.ok()); + auto array = array_result.ValueOrDie(); + + struct ArrowArray arrow_array; + auto export_result = ::arrow::ExportArray(*array, &arrow_array); + ASSERT_TRUE(export_result.ok()); + + std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"}}; + + auto writer_result = + WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = temp_avro_file_, + .schema = schema, + .io = file_io_, + .metadata = metadata}); + ASSERT_TRUE(writer_result.has_value()); + auto writer = std::move(writer_result.value()); + ASSERT_THAT(writer->Write(&arrow_array), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_); + ASSERT_TRUE(file_info_result.ok()); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, {.path = temp_avro_file_, + .length = file_info_result->size(), + .io = file_io_, + .projection = schema, + .properties = std::move(reader_properties)}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string)); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); + } +} + +TEST_F(AvroReaderTest, LargeDataset) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<LongType>()), + SchemaField::MakeRequired(2, "value", std::make_shared<DoubleType>())}); + + // Generate large dataset JSON + std::ostringstream json; + json << "["; + for (int i = 0; i < 1000; ++i) { + if (i > 0) json << ", "; + json << "[" << i << ", " << (i * 1.5) << "]"; + } + json << "]"; + + WriteAndVerify(schema, json.str()); +} + +TEST_F(AvroReaderTest, EmptyCollections) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired(2, "list_col", + std::make_shared<ListType>(SchemaField::MakeRequired( + 3, "element", std::make_shared<IntType>())))}); + + std::string expected_string = R"([ + [1, []], + [2, [10, 20, 30]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +// Skip Fixed and UUID tests for now - they require specific binary encoding Review Comment: ```suggestion ``` ########## src/iceberg/test/avro_test.cc: ########## @@ -244,4 +246,234 @@ TEST_F(AvroReaderTest, AvroWriterNestedType) { WriteAndVerify(schema, expected_string); } +// Comprehensive tests using in-memory MockFileIO Review Comment: ```suggestion ``` ########## src/iceberg/test/avro_test.cc: ########## @@ -244,4 +246,234 @@ TEST_F(AvroReaderTest, AvroWriterNestedType) { WriteAndVerify(schema, expected_string); } +// Comprehensive tests using in-memory MockFileIO + +TEST_F(AvroReaderTest, AllPrimitiveTypes) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "bool_col", std::make_shared<BooleanType>()), + SchemaField::MakeRequired(2, "int_col", std::make_shared<IntType>()), + SchemaField::MakeRequired(3, "long_col", std::make_shared<LongType>()), + SchemaField::MakeRequired(4, "float_col", std::make_shared<FloatType>()), + SchemaField::MakeRequired(5, "double_col", std::make_shared<DoubleType>()), + SchemaField::MakeRequired(6, "string_col", std::make_shared<StringType>()), + SchemaField::MakeRequired(7, "binary_col", std::make_shared<BinaryType>())}); + + std::string expected_string = R"([ + [true, 42, 1234567890, 3.14, 2.71828, "test", "AQID"], + [false, -100, -9876543210, -1.5, 0.0, "hello", "BAUG"] + ])"; + + WriteAndVerify(schema, expected_string); +} + +// Skipping DecimalType test - requires specific decimal encoding in JSON + +TEST_F(AvroReaderTest, DateTimeTypes) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "date_col", std::make_shared<DateType>()), + SchemaField::MakeRequired(2, "time_col", std::make_shared<TimeType>()), + SchemaField::MakeRequired(3, "timestamp_col", std::make_shared<TimestampType>())}); + + // Dates as days since epoch, time/timestamps as microseconds + std::string expected_string = R"([ + [18628, 43200000000, 1640995200000000], + [18629, 86399000000, 1641081599000000] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, NestedStruct) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired( + 2, "person", + std::make_shared<iceberg::StructType>(std::vector<SchemaField>{ + SchemaField::MakeRequired(3, "name", std::make_shared<StringType>()), + SchemaField::MakeRequired(4, "age", std::make_shared<IntType>()), + SchemaField::MakeOptional( + 5, "address", + std::make_shared<iceberg::StructType>(std::vector<SchemaField>{ + SchemaField::MakeRequired(6, "street", + std::make_shared<StringType>()), + SchemaField::MakeRequired(7, "city", + std::make_shared<StringType>())}))}))}); + + std::string expected_string = R"([ + [1, ["Alice", 30, ["123 Main St", "NYC"]]], + [2, ["Bob", 25, ["456 Oak Ave", "LA"]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, ListType) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired(2, "tags", + std::make_shared<ListType>(SchemaField::MakeRequired( + 3, "element", std::make_shared<StringType>())))}); + + std::string expected_string = R"([ + [1, ["tag1", "tag2", "tag3"]], + [2, ["foo", "bar"]], + [3, []] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, MapType) { + auto schema = std::make_shared<iceberg::Schema>( + std::vector<SchemaField>{SchemaField::MakeRequired( + 1, "properties", + std::make_shared<MapType>( + SchemaField::MakeRequired(2, "key", std::make_shared<StringType>()), + SchemaField::MakeRequired(3, "value", std::make_shared<IntType>())))}); + + std::string expected_string = R"([ + [[["key1", 100], ["key2", 200]]], + [[["a", 1], ["b", 2], ["c", 3]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, ComplexNestedTypes) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeRequired(2, "nested_list", + std::make_shared<ListType>(SchemaField::MakeRequired( + 3, "element", + std::make_shared<ListType>(SchemaField::MakeRequired( + 4, "element", std::make_shared<IntType>())))))}); + + std::string expected_string = R"([ + [1, [[1, 2], [3, 4]]], + [2, [[5], [6, 7, 8]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, OptionalFieldsWithNulls) { + auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{ + SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), + SchemaField::MakeOptional(2, "name", std::make_shared<StringType>()), + SchemaField::MakeOptional(3, "age", std::make_shared<IntType>())}); + + std::string expected_string = R"([ + [1, "Alice", 30], + [2, null, 25], + [3, "Charlie", null], + [4, null, null] + ])"; + + WriteAndVerify(schema, expected_string); +} + +// Test both direct decoder and GenericDatum paths +TEST_F(AvroReaderTest, DirectDecoderVsGenericDatum) { Review Comment: I would recommend changing it to parameterized test to enable/disable direct decoder for all cases in this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
