This is an automated email from the ASF dual-hosted git repository.

kakachen pushed a commit to branch fix_complex_type_late_materializtion
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git

commit 3c10aff4b22cdb9700a7fcc5637a95aa293d6f9e
Author: kakachen <che...@selectdb.com>
AuthorDate: Fri Apr 25 01:40:15 2025 +0800

    [fix] Fix complex type late materializtion.
---
 c++/src/ColumnReader.cc | 21 ++++++++++++++++++---
 c++/src/Reader.cc       | 17 +++++++++++------
 2 files changed, 29 insertions(+), 9 deletions(-)

diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 875ce81a9de..4dde99917c0 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -1158,7 +1158,9 @@ namespace orc {
   }
 
   uint64_t StructColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
-    numValues = ColumnReader::skip(numValues, readPhase);
+    if (readPhase.contains(this->type.getReaderCategory())) {
+      numValues = ColumnReader::skip(numValues, readPhase);
+    }
     for (auto& ptr : children) {
       if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) {
         ptr->skip(numValues, readPhase);
@@ -1183,7 +1185,9 @@ namespace orc {
   void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
                                         char* notNull, const ReadPhase& 
readPhase,
                                         uint16_t* sel_rowid_idx, size_t 
sel_size) {
-    ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
+    if (readPhase.contains(this->type.getReaderCategory())) {
+      ColumnReader::next(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
+    }
     uint64_t i = 0;
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
@@ -1201,7 +1205,9 @@ namespace orc {
 
   void StructColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions,
                                           const ReadPhase& readPhase) {
-    ColumnReader::seekToRowGroup(positions, readPhase);
+    if (readPhase.contains(this->type.getReaderCategory())) {
+      ColumnReader::seekToRowGroup(positions, readPhase);
+    }
 
     for (auto& ptr : children) {
       if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) {
@@ -1579,6 +1585,9 @@ namespace orc {
   }
 
   uint64_t UnionColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
+    if (!readPhase.contains(this->type.getReaderCategory())) {
+      throw NotImplementedYet("Not implemented yet");
+    }
     numValues = ColumnReader::skip(numValues, readPhase);
     const uint64_t BUFFER_SIZE = 1024;
     char buffer[BUFFER_SIZE];
@@ -1618,6 +1627,9 @@ namespace orc {
   void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
                                        char* notNull, const ReadPhase& 
readPhase,
                                        uint16_t* sel_rowid_idx, size_t 
sel_size) {
+    if (!readPhase.contains(this->type.getReaderCategory())) {
+      throw NotImplementedYet("Not implemented yet");
+    }
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
     uint64_t* offsets = unionBatch.offsets.data();
@@ -1655,6 +1667,9 @@ namespace orc {
 
   void UnionColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions,
                                          const ReadPhase& readPhase) {
+    if (!readPhase.contains(this->type.getReaderCategory())) {
+      throw NotImplementedYet("Not implemented yet");
+    }
     ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
     for (size_t i = 0; i < numChildren; ++i) {
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 542eda5c9e2..bdff1a0d7ce 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -335,7 +335,8 @@ namespace orc {
         // Set FILTER_CHILD for leaf nodes and FILTER_PARENT for non-leaf nodes
         Type* current = type;
         while (current != nullptr) {
-          if (current->getSubtypeCount() == 0) {
+          if (current->getSubtypeCount() == 0 || current->getKind() == 
TypeKind::LIST
+              || current->getKind() == TypeKind::MAP) {
             current->setReaderCategory(ReaderCategory::FILTER_CHILD);
           } else {
             current->setReaderCategory(ReaderCategory::FILTER_PARENT);
@@ -345,26 +346,29 @@ namespace orc {
         }
 
         // Process all child nodes of the current node
-        // For child nodes: set FILTER_PARENT if it's a leaf, FILTER_CHILD if 
it has children
+        // For child nodes: set FILTER_CHILD if it's a leaf, FILTER_PARENT if 
it has children
         std::function<void(Type*)> processChildren = [&processChildren](Type* 
node) {
           if (node == nullptr) return;
 
           // Iterate through all child nodes
           for (int i = 0; i < node->getSubtypeCount(); ++i) {
             Type* child = node->getSubtype(i);
-            if (child->getSubtypeCount() == 0) {
+            if (child->getSubtypeCount() == 0 || child->getKind() == 
TypeKind::LIST
+                || child->getKind() == TypeKind::MAP) {
               // Leaf node (no children)
-              child->setReaderCategory(ReaderCategory::FILTER_PARENT);
+              child->setReaderCategory(ReaderCategory::FILTER_CHILD);
             } else {
               // Non-leaf node (has children)
-              child->setReaderCategory(ReaderCategory::FILTER_CHILD);
+              child->setReaderCategory(ReaderCategory::FILTER_PARENT);
               // Recursively process its children
               processChildren(child);
             }
           }
         };
 
-        processChildren(type);
+        if (type->getReaderCategory() == ReaderCategory::FILTER_PARENT) {
+          processChildren(type);
+        }
       }
 
       startReadPhase = ReadPhase::LEADERS;
@@ -1341,6 +1345,7 @@ namespace orc {
               prepareFollowReaders(footer->rowindexstride(), 
currentRowInStripe, followRowInStripe),
               sel_rowid_idx.get(), arg);
           followRowInStripe = currentRowInStripe + rowsToRead;
+          data.numElements = rowsToRead;
         }
       } else {
         nextBatch(data, rowsToRead, startReadPhase, nullptr, arg);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to