gty404 commented on code in PR #150:
URL: https://github.com/apache/iceberg-cpp/pull/150#discussion_r2230287638


##########
src/iceberg/avro/avro_register.cc:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "avro_register.h"

Review Comment:
   ```suggestion
   #include "iceberg/avro/avro_register.h"
   ```



##########
src/iceberg/manifest_reader_internal.cc:
##########
@@ -134,112 +227,336 @@ Result<std::vector<ManifestFile>> 
ParseManifestListEntry(ArrowSchema* schema,
     auto field_name = field.value().get().name();
     bool required = !field.value().get().optional();
     auto view_of_column = array_view.children[idx];
-
-#define PARSE_PRIMITIVE_FIELD(item, type)                                      
         \
-  for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {      
         \
-    if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {                      
         \
-      auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx);        
         \
-      manifest_files[row_idx].item = static_cast<type>(value);                 
         \
-    } else if (required) {                                                     
         \
-      return InvalidManifestList("Field {} is required but null at row {}", 
field_name, \
-                                 row_idx);                                     
         \
-    }                                                                          
         \
-  }
-
     switch (idx) {
       case 0:
-        for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
-          if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
-            auto value = ArrowArrayViewGetStringUnsafe(view_of_column, 
row_idx);
-            std::string path_str(value.data, value.size_bytes);
-            manifest_files[row_idx].manifest_path = path_str;
-          }
-        }
+        PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, 
view_of_column);
         break;
       case 1:
-        PARSE_PRIMITIVE_FIELD(manifest_length, int64_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, 
view_of_column,
+                              int64_t);
         break;
       case 2:
-        PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, 
view_of_column,
+                              int32_t);
         break;
       case 3:
-        for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
-          if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
-            auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx);
-            manifest_files[row_idx].content = 
static_cast<ManifestFile::Content>(value);
-          }
-        }
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
+                              ManifestFile::Content);
         break;
       case 4:
-        PARSE_PRIMITIVE_FIELD(sequence_number, int64_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, 
view_of_column,
+                              int64_t);
         break;
       case 5:
-        PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, 
view_of_column,
+                              int64_t);
         break;
       case 6:
-        PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, 
view_of_column,
+                              int64_t);
         break;
       case 7:
-        PARSE_PRIMITIVE_FIELD(added_files_count, int32_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, 
view_of_column,
+                              int32_t);
         break;
       case 8:
-        PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
+                              view_of_column, int32_t);
         break;
       case 9:
-        PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, 
view_of_column,
+                              int32_t);
         break;
       case 10:
-        PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, 
view_of_column,
+                              int64_t);
         break;
       case 11:
-        PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, 
view_of_column,
+                              int64_t);
         break;
       case 12:
-        PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t);
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, 
view_of_column,
+                              int64_t);
         break;
       case 13:
         ICEBERG_RETURN_UNEXPECTED(
             ParsePartitionFieldSummaryList(view_of_column, manifest_files));
         break;
       case 14:
-        for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
-          if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
-            auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, 
row_idx);
-            manifest_files[row_idx].key_metadata = std::vector<uint8_t>(
-                buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
+        PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, 
view_of_column);
+        break;
+      case 15:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, 
view_of_column,
+                              int64_t);
+        break;
+      default:
+        return InvalidManifestList("Unsupported field: {} in manifest file.", 
field_name);
+    }
+  }
+  return manifest_files;
+}
+
+Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx,
+                    std::vector<ManifestEntry>& manifest_entries) {
+  if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
+    auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
+    manifest_entries[row_idx].data_file->partition.emplace_back(
+        Literal::Boolean(value != 0));
+  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_INT32) {
+    auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
+    
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Int(value));
+  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_INT64) {
+    auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
+    
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Long(value));
+  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_FLOAT) {
+    auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
+    
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Float(value));
+  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_DOUBLE) {
+    auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
+    
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Double(value));
+  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_STRING) {
+    auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx);
+    manifest_entries[row_idx].data_file->partition.emplace_back(
+        Literal::String(std::string(value.data, value.size_bytes)));
+  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_BINARY) {
+    auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx);
+    manifest_entries[row_idx].data_file->partition.emplace_back(
+        Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
+                                             buffer.data.as_char + 
buffer.size_bytes)));
+  } else {
+    return InvalidManifest("Unsupported field type: {} in data file 
partition.",
+                           
static_cast<int32_t>(view_of_partition->storage_type));
+  }
+  return {};
+}
+
+Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
+                     ArrowArrayView* view_of_column,
+                     std::vector<ManifestEntry>& manifest_entries) {
+  if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
+    return InvalidManifest("DataFile field should be a struct.");
+  }
+  if (view_of_column->n_children != data_file_schema->fields().size()) {
+    return InvalidManifest("DataFile schema size:{} not match with ArrayArray 
columns:{}",
+                           data_file_schema->fields().size(), 
view_of_column->n_children);
+  }
+  for (int64_t col_idx = 0; col_idx < view_of_column->n_children; ++col_idx) {
+    auto field_name = 
data_file_schema->GetFieldByIndex(col_idx).value().get().name();
+    auto required = 
!data_file_schema->GetFieldByIndex(col_idx).value().get().optional();
+    auto view_of_file_field = view_of_column->children[col_idx];
+    auto manifest_entry_count = view_of_file_field->length;
+
+    switch (col_idx) {
+      case 0:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content,
+                              view_of_file_field, DataFile::Content);
+        break;
+      case 1:
+        PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path,
+                           view_of_file_field);
+        break;
+      case 2:
+        for (size_t row_idx = 0; row_idx < view_of_file_field->length; 
row_idx++) {
+          if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
+            auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, 
row_idx);
+            std::string_view path_str(value.data, value.size_bytes);
+            
ICEBERG_ASSIGN_OR_RAISE(manifest_entries[row_idx].data_file->file_format,
+                                    FileFormatTypeFromString(path_str));
+          }
+        }
+        break;
+      case 3: {
+        if (view_of_file_field->storage_type != 
ArrowType::NANOARROW_TYPE_STRUCT) {
+          return InvalidManifest("Field:{} should be a list.", field_name);
+        }
+        auto view_of_partition = view_of_file_field->children[0];
+        for (size_t row_idx = 0; row_idx < view_of_partition->length; 
row_idx++) {
+          if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
+            break;
           }
+          ICEBERG_RETURN_UNEXPECTED(
+              ParseLiteral(view_of_partition, row_idx, manifest_entries));
         }
+      } break;
+      case 4:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->record_count,
+                              view_of_file_field, int64_t);
+        break;
+      case 5:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->file_size_in_bytes,
+                              view_of_file_field, int64_t);
+        break;
+      case 6:
+        // key&value should have the same offset
+        // HACK(xiao.dong) workaround for arrow bug:
+        // ArrowArrayViewListChildOffset can not get the correct offset for map
+        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes,
+                                 manifest_entry_count, view_of_file_field);
+        break;
+      case 7:
+        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts,
+                                 manifest_entry_count, view_of_file_field);
+        break;
+      case 8:
+        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts,
+                                 manifest_entry_count, view_of_file_field);
+        break;
+      case 9:
+        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts,
+                                 manifest_entry_count, view_of_file_field);
+        break;
+      case 10:
+        
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds,
+                                   manifest_entry_count, view_of_file_field);
+        break;
+      case 11:
+        
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds,
+                                   manifest_entry_count, view_of_file_field);
+        break;
+      case 12:
+        PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata,
+                           view_of_file_field);
+        break;
+      case 13:
+        PARSE_INTEGER_VECTOR_FIELD(
+            manifest_entries[manifest_idx].data_file->split_offsets, 
manifest_entry_count,
+            view_of_file_field, int64_t);
+        break;
+      case 14:
+        
PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids,
+                                   manifest_entry_count, view_of_file_field, 
int32_t);
         break;
       case 15:
-        PARSE_PRIMITIVE_FIELD(first_row_id, int64_t);
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
+                              view_of_file_field, int32_t);
+        break;
+      case 16:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id,
+                              view_of_file_field, int64_t);
+        break;
+      case 17:
+        
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file,
+                           view_of_file_field);
+        break;
+      case 18:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_offset,
+                              view_of_file_field, int64_t);
+        break;
+      case 19:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_size_in_bytes,
+                              view_of_file_field, int64_t);
         break;
       default:
-        return InvalidManifestList("Unsupported type: {}", field_name);
+        return InvalidManifest("Unsupported field: {} in data file.", 
field_name);
     }
   }
-  return manifest_files;
+  return {};
 }
 
-Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const { 
return {}; }
+Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
+                                                      ArrowArray* array_in,
+                                                      const Schema& 
iceberg_schema) {
+  if (schema->n_children != array_in->n_children) {
+    return InvalidManifest("Columns size not match between schema:{} and 
array:{}",
+                           schema->n_children, array_in->n_children);
+  }
+  if (iceberg_schema.fields().size() != array_in->n_children) {
+    return InvalidManifest("Columns size not match between schema:{} and 
array:{}",
+                           iceberg_schema.fields().size(), 
array_in->n_children);
+  }
+
+  ArrowError error;
+  ArrowArrayView array_view;
+  auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
+  NANOARROW_RETURN_IF_NOT_OK(status, error);
+  internal::ArrowArrayViewGuard view_guard(&array_view);
+  status = ArrowArrayViewSetArray(&array_view, array_in, &error);
+  NANOARROW_RETURN_IF_NOT_OK(status, error);
+  status = ArrowArrayViewValidate(&array_view, 
NANOARROW_VALIDATION_LEVEL_FULL, &error);
+  NANOARROW_RETURN_IF_NOT_OK(status, error);
+
+  std::vector<ManifestEntry> manifest_entries;
+  manifest_entries.resize(array_in->length);
+  for (size_t i = 0; i < array_in->length; i++) {
+    manifest_entries[i].data_file = std::make_shared<DataFile>();
+  }
+
+  for (int64_t idx = 0; idx < array_in->n_children; idx++) {
+    const auto& field = iceberg_schema.GetFieldByIndex(idx);
+    if (!field.has_value()) {
+      return InvalidManifest("Field not found in schema: {}", idx);
+    }
+    auto field_name = field.value().get().name();
+    bool required = !field.value().get().optional();
+    auto view_of_column = array_view.children[idx];
+
+    switch (idx) {
+      case 0:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column,
+                              ManifestStatus);
+        break;
+      case 1:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id, 
view_of_column,
+                              int64_t);
+        break;
+      case 2:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].sequence_number, 
view_of_column,
+                              int64_t);
+        break;
+      case 3:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].file_sequence_number,
+                              view_of_column, int64_t);
+        break;
+      case 4: {
+        auto data_file_schema =
+            dynamic_pointer_cast<StructType>(field.value().get().type());
+        ICEBERG_RETURN_UNEXPECTED(
+            ParseDataFile(data_file_schema, view_of_column, manifest_entries));
+        break;
+      }
+      default:
+        return InvalidManifest("Unsupported field: {} in manifest entry.", 
field_name);
+    }
+  }
+  return manifest_entries;
+}
+
+Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
+  std::vector<ManifestEntry> manifest_entries;
+  ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
+  internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+  while (true) {
+    ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
+    if (result.has_value()) {
+      internal::ArrowArrayGuard array_guard(&result.value());
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto parse_result,
+          ParseManifestEntry(&arrow_schema, &result.value(), *schema_));
+      manifest_entries.insert(manifest_entries.end(),
+                              std::make_move_iterator(parse_result.begin()),
+                              std::make_move_iterator(parse_result.end()));
+    } else {
+      // eof
+      break;
+    }
+  }
+  return manifest_entries;
+}
 
 Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
   std::vector<ManifestFile> manifest_files;
   ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
   internal::ArrowSchemaGuard schema_guard(&arrow_schema);
   while (true) {
-    auto result = reader_->Next();
-    if (!result.has_value()) {
-      return InvalidManifestList("Failed to read manifest list entry:{}",
-                                 result.error().message);
-    }
-    if (result.value().has_value()) {
-      internal::ArrowArrayGuard array_guard(&result.value().value());
+    ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
+    if (result.has_value()) {

Review Comment:
   Here, `result.has_value()` is definitely true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to