mapleFU commented on code in PR #127: URL: https://github.com/apache/iceberg-cpp/pull/127#discussion_r2163038798
########## src/iceberg/avro/avro_reader.cc: ########## @@ -96,11 +99,22 @@ class AvroBatchReader::Impl { // Validate field ids in the file schema. HasIdVisitor has_id_visitor; ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema)); + if (has_id_visitor.HasNoIds()) { - // TODO(gangwu): support applying field-ids based on name mapping - return NotImplemented("Avro file schema has no field IDs"); - } - if (!has_id_visitor.AllHaveIds()) { + // Apply field IDs based on name mapping if available + auto name_mapping_iter = options.properties.find("name_mapping"); + if (name_mapping_iter != options.properties.end()) { Review Comment: ``` if (auto name_mapping_iter = options.properties.find(); name_mapping_iter != options.properties.end()) ``` ########## src/iceberg/avro/avro_reader.cc: ########## @@ -195,6 +209,147 @@ class AvroBatchReader::Impl { return arrow_array; } + // Apply field IDs to Avro schema nodes based on name mapping + Status ApplyFieldIdsFromNameMapping(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + switch (node->type()) { + case ::avro::AVRO_RECORD: + return ApplyFieldIdsToRecord(node, name_mapping); + case ::avro::AVRO_ARRAY: + return ApplyFieldIdsToArray(node, name_mapping); + case ::avro::AVRO_MAP: + return ApplyFieldIdsToMap(node, name_mapping); + case ::avro::AVRO_UNION: + return ApplyFieldIdsToUnion(node, name_mapping); + case ::avro::AVRO_BOOL: + case ::avro::AVRO_INT: + case ::avro::AVRO_LONG: + case ::avro::AVRO_FLOAT: + case ::avro::AVRO_DOUBLE: + case ::avro::AVRO_STRING: + case ::avro::AVRO_BYTES: + case ::avro::AVRO_FIXED: + return {}; + case ::avro::AVRO_NULL: + case ::avro::AVRO_ENUM: + default: + return InvalidSchema("Unsupported Avro type for field ID application: {}", + static_cast<int>(node->type())); + } + } + + Status ApplyFieldIdsToRecord(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + for (size_t i = 0; i < node->leaves(); ++i) { + const std::string& field_name = node->nameAt(i); + ::avro::NodePtr field_node = node->leafAt(i); Review Comment: ```suggestion const ::avro::NodePtr& field_node = node->leafAt(i); ``` ? ########## src/iceberg/avro/avro_reader.cc: ########## @@ -195,6 +209,147 @@ class AvroBatchReader::Impl { return arrow_array; } + // Apply field IDs to Avro schema nodes based on name mapping + Status ApplyFieldIdsFromNameMapping(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { Review Comment: Can the arguments be like: ```c++ Status ApplyFieldIdsFromNameMapping(const NameMapping& name_mapping, ::avro::Node* node) { ``` `const NodePtr&` is a bit confusing here since we don't need rebind to other field? ########## src/iceberg/avro/avro_reader.cc: ########## @@ -195,6 +209,147 @@ class AvroBatchReader::Impl { return arrow_array; } + // Apply field IDs to Avro schema nodes based on name mapping + Status ApplyFieldIdsFromNameMapping(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + switch (node->type()) { + case ::avro::AVRO_RECORD: + return ApplyFieldIdsToRecord(node, name_mapping); + case ::avro::AVRO_ARRAY: + return ApplyFieldIdsToArray(node, name_mapping); + case ::avro::AVRO_MAP: + return ApplyFieldIdsToMap(node, name_mapping); + case ::avro::AVRO_UNION: + return ApplyFieldIdsToUnion(node, name_mapping); + case ::avro::AVRO_BOOL: + case ::avro::AVRO_INT: + case ::avro::AVRO_LONG: + case ::avro::AVRO_FLOAT: + case ::avro::AVRO_DOUBLE: + case ::avro::AVRO_STRING: + case ::avro::AVRO_BYTES: + case ::avro::AVRO_FIXED: + return {}; + case ::avro::AVRO_NULL: + case ::avro::AVRO_ENUM: + default: + return InvalidSchema("Unsupported Avro type for field ID application: {}", + static_cast<int>(node->type())); + } + } + + Status ApplyFieldIdsToRecord(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + for (size_t i = 0; i < node->leaves(); ++i) { + const std::string& field_name = node->nameAt(i); + ::avro::NodePtr field_node = node->leafAt(i); + + // Try to find field ID by name in the name mapping + if (auto field_ref = name_mapping.Find(field_name)) { + if (field_ref->get().field_id.has_value()) { + // Add field ID attribute to the node + ::avro::CustomAttributes attributes; + attributes.addAttribute( + "field-id", std::to_string(field_ref->get().field_id.value()), false); + node->addCustomAttributesForField(attributes); + } + + // Recursively apply field IDs to nested fields if they exist + if (field_ref->get().nested_mapping && + field_node->type() == ::avro::AVRO_RECORD) { + const auto& nested_mapping = field_ref->get().nested_mapping; + auto fields_span = nested_mapping->fields(); + std::vector<MappedField> fields_vector(fields_span.begin(), fields_span.end()); + auto nested_name_mapping = NameMapping::Make(std::move(fields_vector)); + ICEBERG_RETURN_UNEXPECTED( + ApplyFieldIdsFromNameMapping(field_node, *nested_name_mapping)); + } else { + // Recursively apply field IDs to child nodes (only if not already handled by + // nested mapping) + ICEBERG_RETURN_UNEXPECTED( + ApplyFieldIdsFromNameMapping(field_node, name_mapping)); + } + } else { + // Recursively apply field IDs to child nodes even if no mapping found + ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(field_node, name_mapping)); + } + } + return {}; + } + + Status ApplyFieldIdsToArray(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + if (node->leaves() != 1) { + return InvalidSchema("Array type must have exactly one leaf"); + } + + // Check if this is a map represented as array + if (node->logicalType().type() == ::avro::LogicalType::CUSTOM && + node->logicalType().customLogicalType() != nullptr && + node->logicalType().customLogicalType()->name() == "map") { + return ApplyFieldIdsFromNameMapping(node->leafAt(0), name_mapping); + } + + // For regular arrays, try to find element field ID + if (auto element_field = name_mapping.Find("element")) { + if (element_field->get().field_id.has_value()) { + ::avro::CustomAttributes attributes; + attributes.addAttribute( + "element-id", std::to_string(element_field->get().field_id.value()), false); + node->addCustomAttributesForField(attributes); + } + } + + return ApplyFieldIdsFromNameMapping(node->leafAt(0), name_mapping); + } + + Status ApplyFieldIdsToMap(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + if (node->leaves() != 2) { + return InvalidSchema("Map type must have exactly two leaves"); + } + + // Try to find key and value field IDs + if (auto key_field = name_mapping.Find("key")) { + if (key_field->get().field_id.has_value()) { + ::avro::CustomAttributes attributes; + attributes.addAttribute("key-id", + std::to_string(key_field->get().field_id.value()), false); + node->addCustomAttributesForField(attributes); + } + } + + if (auto value_field = name_mapping.Find("value")) { + if (value_field->get().field_id.has_value()) { + ::avro::CustomAttributes attributes; + attributes.addAttribute( + "value-id", std::to_string(value_field->get().field_id.value()), false); + node->addCustomAttributesForField(attributes); + } + } + + return ApplyFieldIdsFromNameMapping(node->leafAt(1), name_mapping); + } + + Status ApplyFieldIdsToUnion(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + if (node->leaves() != 2) { + return InvalidSchema("Union type must have exactly two branches"); + } + + const auto& branch_0 = node->leafAt(0); + const auto& branch_1 = node->leafAt(1); + + if (branch_0->type() == ::avro::AVRO_NULL) { + return ApplyFieldIdsFromNameMapping(branch_1, name_mapping); + } + if (branch_1->type() == ::avro::AVRO_NULL) { + return ApplyFieldIdsFromNameMapping(branch_0, name_mapping); + } + + return InvalidSchema("Union type must have exactly one null branch"); Review Comment: The branch above doesn't check "exactly once", isn't they? ########## src/iceberg/avro/avro_reader.cc: ########## @@ -195,6 +209,147 @@ class AvroBatchReader::Impl { return arrow_array; } + // Apply field IDs to Avro schema nodes based on name mapping + Status ApplyFieldIdsFromNameMapping(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + switch (node->type()) { + case ::avro::AVRO_RECORD: + return ApplyFieldIdsToRecord(node, name_mapping); + case ::avro::AVRO_ARRAY: + return ApplyFieldIdsToArray(node, name_mapping); + case ::avro::AVRO_MAP: + return ApplyFieldIdsToMap(node, name_mapping); + case ::avro::AVRO_UNION: + return ApplyFieldIdsToUnion(node, name_mapping); + case ::avro::AVRO_BOOL: + case ::avro::AVRO_INT: + case ::avro::AVRO_LONG: + case ::avro::AVRO_FLOAT: + case ::avro::AVRO_DOUBLE: + case ::avro::AVRO_STRING: + case ::avro::AVRO_BYTES: + case ::avro::AVRO_FIXED: + return {}; + case ::avro::AVRO_NULL: + case ::avro::AVRO_ENUM: + default: + return InvalidSchema("Unsupported Avro type for field ID application: {}", + static_cast<int>(node->type())); Review Comment: @wgtmac so here needs your `ToString`? ########## src/iceberg/avro/avro_reader.cc: ########## @@ -195,6 +209,147 @@ class AvroBatchReader::Impl { return arrow_array; } + // Apply field IDs to Avro schema nodes based on name mapping + Status ApplyFieldIdsFromNameMapping(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + switch (node->type()) { + case ::avro::AVRO_RECORD: + return ApplyFieldIdsToRecord(node, name_mapping); + case ::avro::AVRO_ARRAY: + return ApplyFieldIdsToArray(node, name_mapping); + case ::avro::AVRO_MAP: + return ApplyFieldIdsToMap(node, name_mapping); + case ::avro::AVRO_UNION: + return ApplyFieldIdsToUnion(node, name_mapping); + case ::avro::AVRO_BOOL: + case ::avro::AVRO_INT: + case ::avro::AVRO_LONG: + case ::avro::AVRO_FLOAT: + case ::avro::AVRO_DOUBLE: + case ::avro::AVRO_STRING: + case ::avro::AVRO_BYTES: + case ::avro::AVRO_FIXED: + return {}; + case ::avro::AVRO_NULL: + case ::avro::AVRO_ENUM: + default: + return InvalidSchema("Unsupported Avro type for field ID application: {}", + static_cast<int>(node->type())); + } + } + + Status ApplyFieldIdsToRecord(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + for (size_t i = 0; i < node->leaves(); ++i) { + const std::string& field_name = node->nameAt(i); + ::avro::NodePtr field_node = node->leafAt(i); + + // Try to find field ID by name in the name mapping + if (auto field_ref = name_mapping.Find(field_name)) { + if (field_ref->get().field_id.has_value()) { + // Add field ID attribute to the node + ::avro::CustomAttributes attributes; + attributes.addAttribute( + "field-id", std::to_string(field_ref->get().field_id.value()), false); + node->addCustomAttributesForField(attributes); + } + + // Recursively apply field IDs to nested fields if they exist + if (field_ref->get().nested_mapping && + field_node->type() == ::avro::AVRO_RECORD) { + const auto& nested_mapping = field_ref->get().nested_mapping; + auto fields_span = nested_mapping->fields(); + std::vector<MappedField> fields_vector(fields_span.begin(), fields_span.end()); + auto nested_name_mapping = NameMapping::Make(std::move(fields_vector)); + ICEBERG_RETURN_UNEXPECTED( + ApplyFieldIdsFromNameMapping(field_node, *nested_name_mapping)); + } else { + // Recursively apply field IDs to child nodes (only if not already handled by + // nested mapping) + ICEBERG_RETURN_UNEXPECTED( + ApplyFieldIdsFromNameMapping(field_node, name_mapping)); + } + } else { + // Recursively apply field IDs to child nodes even if no mapping found + ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(field_node, name_mapping)); + } + } + return {}; + } + + Status ApplyFieldIdsToArray(const ::avro::NodePtr& node, + const NameMapping& name_mapping) { + if (node->leaves() != 1) { + return InvalidSchema("Array type must have exactly one leaf"); Review Comment: Maybe we can add a Todo, and print the `node` in the future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org