This is an automated email from the ASF dual-hosted git repository. kakachen pushed a commit to branch fix_complex_type_late_materializtion in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
commit 3c10aff4b22cdb9700a7fcc5637a95aa293d6f9e Author: kakachen <che...@selectdb.com> AuthorDate: Fri Apr 25 01:40:15 2025 +0800 [fix] Fix complex type late materializtion. --- c++/src/ColumnReader.cc | 21 ++++++++++++++++++--- c++/src/Reader.cc | 17 +++++++++++------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc index 875ce81a9de..4dde99917c0 100644 --- a/c++/src/ColumnReader.cc +++ b/c++/src/ColumnReader.cc @@ -1158,7 +1158,9 @@ namespace orc { } uint64_t StructColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { - numValues = ColumnReader::skip(numValues, readPhase); + if (readPhase.contains(this->type.getReaderCategory())) { + numValues = ColumnReader::skip(numValues, readPhase); + } for (auto& ptr : children) { if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) { ptr->skip(numValues, readPhase); @@ -1183,7 +1185,9 @@ namespace orc { void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) { - ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + if (readPhase.contains(this->type.getReaderCategory())) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + } uint64_t i = 0; notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) { @@ -1201,7 +1205,9 @@ namespace orc { void StructColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { - ColumnReader::seekToRowGroup(positions, readPhase); + if (readPhase.contains(this->type.getReaderCategory())) { + ColumnReader::seekToRowGroup(positions, readPhase); + } for (auto& ptr : children) { if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) { @@ -1579,6 +1585,9 @@ namespace orc { } uint64_t UnionColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { + if (!readPhase.contains(this->type.getReaderCategory())) { + throw NotImplementedYet("Not implemented yet"); + } numValues = ColumnReader::skip(numValues, readPhase); const uint64_t BUFFER_SIZE = 1024; char buffer[BUFFER_SIZE]; @@ -1618,6 +1627,9 @@ namespace orc { void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) { + if (!readPhase.contains(this->type.getReaderCategory())) { + throw NotImplementedYet("Not implemented yet"); + } ColumnReader::next(rowBatch, numValues, notNull, readPhase); UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch); uint64_t* offsets = unionBatch.offsets.data(); @@ -1655,6 +1667,9 @@ namespace orc { void UnionColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + if (!readPhase.contains(this->type.getReaderCategory())) { + throw NotImplementedYet("Not implemented yet"); + } ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); for (size_t i = 0; i < numChildren; ++i) { diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 542eda5c9e2..bdff1a0d7ce 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -335,7 +335,8 @@ namespace orc { // Set FILTER_CHILD for leaf nodes and FILTER_PARENT for non-leaf nodes Type* current = type; while (current != nullptr) { - if (current->getSubtypeCount() == 0) { + if (current->getSubtypeCount() == 0 || current->getKind() == TypeKind::LIST + || current->getKind() == TypeKind::MAP) { current->setReaderCategory(ReaderCategory::FILTER_CHILD); } else { current->setReaderCategory(ReaderCategory::FILTER_PARENT); @@ -345,26 +346,29 @@ namespace orc { } // Process all child nodes of the current node - // For child nodes: set FILTER_PARENT if it's a leaf, FILTER_CHILD if it has children + // For child nodes: set FILTER_CHILD if it's a leaf, FILTER_PARENT if it has children std::function<void(Type*)> processChildren = [&processChildren](Type* node) { if (node == nullptr) return; // Iterate through all child nodes for (int i = 0; i < node->getSubtypeCount(); ++i) { Type* child = node->getSubtype(i); - if (child->getSubtypeCount() == 0) { + if (child->getSubtypeCount() == 0 || child->getKind() == TypeKind::LIST + || child->getKind() == TypeKind::MAP) { // Leaf node (no children) - child->setReaderCategory(ReaderCategory::FILTER_PARENT); + child->setReaderCategory(ReaderCategory::FILTER_CHILD); } else { // Non-leaf node (has children) - child->setReaderCategory(ReaderCategory::FILTER_CHILD); + child->setReaderCategory(ReaderCategory::FILTER_PARENT); // Recursively process its children processChildren(child); } } }; - processChildren(type); + if (type->getReaderCategory() == ReaderCategory::FILTER_PARENT) { + processChildren(type); + } } startReadPhase = ReadPhase::LEADERS; @@ -1341,6 +1345,7 @@ namespace orc { prepareFollowReaders(footer->rowindexstride(), currentRowInStripe, followRowInStripe), sel_rowid_idx.get(), arg); followRowInStripe = currentRowInStripe + rowsToRead; + data.numElements = rowsToRead; } } else { nextBatch(data, rowsToRead, startReadPhase, nullptr, arg); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org