This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch orc in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push: new 0e53506 [Feature] Implements selection vector for ORC lazy materialization. (#62) 0e53506 is described below commit 0e53506146c965a5a71f0582691ab2ea148dae7c Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Sat May 6 15:46:41 2023 +0800 [Feature] Implements selection vector for ORC lazy materialization. (#62) 1. Implements selection vector for ORC lazy materialization. From the test, currently implements float/double, date/timestamp, decimal, string dict types for better performance, and other types have performance penalty. 2. Decrease `loadStripIndex()` call count. 3. Adjust code format. --- c++/include/orc/Reader.hh | 35 +-- c++/include/orc/Type.hh | 48 ++-- c++/src/ColumnReader.cc | 672 +++++++++++++++++++++++++++++++++++++++------- c++/src/ColumnReader.hh | 23 +- c++/src/Reader.cc | 222 +++++++-------- c++/src/Reader.hh | 45 ++-- c++/src/TypeImpl.cc | 14 +- 7 files changed, 784 insertions(+), 275 deletions(-) diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index 2666b4d..429c1ed 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -186,26 +186,25 @@ namespace orc { RowReaderOptions& includeTypes(const std::list<uint64_t>& types); /** - * For files that have structs as the top-level object, filter the fields. - * by index. The first field is 0, the second 1, and so on. By default, - * all columns are read. This option clears any previous setting of - * the selected columns. - * @param filterColIndexes a list of fields to read - * @return this - */ + * For files that have structs as the top-level object, filter the fields. + * by index. The first field is 0, the second 1, and so on. By default, + * all columns are read. This option clears any previous setting of + * the selected columns. + * @param filterColIndexes a list of fields to read + * @return this + */ RowReaderOptions& filter(const std::list<uint64_t>& filterColIndexes); /** - * For files that have structs as the top-level object, filter the fields - * by name. By default, all columns are read. This option clears - * any previous setting of the selected columns. - * @param filterColNames a list of fields to read - * @return this - */ + * For files that have structs as the top-level object, filter the fields + * by name. By default, all columns are read. This option clears + * any previous setting of the selected columns. + * @param filterColNames a list of fields to read + * @return this + */ RowReaderOptions& filter(const std::list<std::string>& filterColNames); - - /** + /** * A map type of <typeId, ReadIntent>. */ typedef std::map<uint64_t, ReadIntent> IdReadIntentMap; @@ -367,7 +366,8 @@ namespace orc { class ORCFilter { public: virtual ~ORCFilter() = default; - virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg = nullptr) const = 0; + virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size, + void* arg = nullptr) const = 0; }; class RowReader; @@ -562,7 +562,8 @@ namespace orc { * @param options RowReader Options * @return a RowReader to read the rows */ - virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options, const ORCFilter* filter = nullptr) const = 0; + virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options, + const ORCFilter* filter = nullptr) const = 0; /** * Get the name of the input stream. diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh index 73b813e..1f0901e 100644 --- a/c++/include/orc/Type.hh +++ b/c++/include/orc/Type.hh @@ -27,37 +27,37 @@ namespace orc { enum class ReaderCategory { - FILTER_CHILD, // Primitive type that is a filter column - FILTER_PARENT, // Compound type with filter children - NON_FILTER // Non-filter column + FILTER_CHILD, // Primitive type that is a filter column + FILTER_PARENT, // Compound type with filter children + NON_FILTER // Non-filter column }; class ReadPhase { public: - static const int NUM_CATEGORIES = 3; // Number of values in ReaderCategory - std::bitset<NUM_CATEGORIES> categories; - - static const ReadPhase ALL; - static const ReadPhase LEADERS; - static const ReadPhase FOLLOWERS; - static const ReadPhase LEADER_PARENTS; - static const ReadPhase FOLLOWERS_AND_PARENTS; - - static ReadPhase fromCategories(const std::unordered_set<ReaderCategory>& cats) { - ReadPhase phase; - for (ReaderCategory cat : cats) { - phase.categories.set(static_cast<size_t>(cat)); - } - return phase; + static const int NUM_CATEGORIES = 3; // Number of values in ReaderCategory + std::bitset<NUM_CATEGORIES> categories; + + static const ReadPhase ALL; + static const ReadPhase LEADERS; + static const ReadPhase FOLLOWERS; + static const ReadPhase LEADER_PARENTS; + static const ReadPhase FOLLOWERS_AND_PARENTS; + + static ReadPhase fromCategories(const std::unordered_set<ReaderCategory>& cats) { + ReadPhase phase; + for (ReaderCategory cat : cats) { + phase.categories.set(static_cast<size_t>(cat)); } + return phase; + } - bool contains(ReaderCategory cat) const { - return categories.test(static_cast<size_t>(cat)); - } + bool contains(ReaderCategory cat) const { + return categories.test(static_cast<size_t>(cat)); + } - bool operator==(const ReadPhase& other) const { - return categories == other.categories; - } + bool operator==(const ReadPhase& other) const { + return categories == other.categories; + } }; enum TypeKind { diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc index cabcdbe..8c6e166 100644 --- a/c++/src/ColumnReader.cc +++ b/c++/src/ColumnReader.cc @@ -85,7 +85,8 @@ namespace orc { return numValues; } - void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* incomingMask, const ReadPhase& readPhase) { + void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* incomingMask, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) { if (numValues > rowBatch.capacity) { rowBatch.resize(numValues); } @@ -101,19 +102,17 @@ namespace orc { return; } } - rowBatch.hasNulls = false; } else if (incomingMask) { // If we don't have a notNull stream, copy the incomingMask rowBatch.hasNulls = true; memcpy(rowBatch.notNull.data(), incomingMask, numValues); return; - } else { - rowBatch.hasNulls = false; - memset(rowBatch.notNull.data(), 1, numValues); } + rowBatch.hasNulls = false; } - void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) { if (notNullDecoder.get()) { notNullDecoder->seek(positions.at(columnId)); } @@ -148,9 +147,11 @@ namespace orc { uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; }; template <typename BatchType> @@ -176,7 +177,8 @@ namespace orc { template <typename BatchType> void BooleanColumnReader<BatchType>::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); // Since the byte rle places the output in a char* and BatchType here may be // LongVectorBatch with long*. We cheat here in that case and use the long* @@ -215,7 +217,8 @@ namespace orc { return numValues; } - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override { + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override { ColumnReader::next(rowBatch, numValues, notNull, readPhase); // Since the byte rle places the output in a char* instead of long*, // we cheat here and use the long* and then expand it in a second pass. @@ -225,7 +228,8 @@ namespace orc { expandBytesToIntegers(ptr, numValues); } - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override { + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override { ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); } @@ -255,13 +259,15 @@ namespace orc { return numValues; } - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override { + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override { ColumnReader::next(rowBatch, numValues, notNull, readPhase); rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr); } - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override { + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override { ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); } @@ -276,15 +282,24 @@ namespace orc { const int64_t epochOffset; const bool sameTimezone; + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase); + void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size); + uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase); + public: TimestampColumnReader(const Type& type, StripeStreams& stripe, bool isInstantType); ~TimestampColumnReader() override; uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; }; TimestampColumnReader::TimestampColumnReader(const Type& type, StripeStreams& stripe, @@ -310,12 +325,28 @@ namespace orc { uint64_t TimestampColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { numValues = ColumnReader::skip(numValues, readPhase); + numValues = skipInternal(numValues, readPhase); + return numValues; + } + + uint64_t TimestampColumnReader::skipInternal(uint64_t numValues, const ReadPhase& readPhase) { secondsRle->skip(numValues); nanoRle->skip(numValues); return numValues; } - void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + if (sel_rowid_idx == nullptr) { + nextInternal(rowBatch, numValues, notNull, readPhase); + } else { + nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + } + } + + void TimestampColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, + char* notNull, const ReadPhase& readPhase) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; TimestampVectorBatch& timestampBatch = dynamic_cast<TimestampVectorBatch&>(rowBatch); @@ -356,6 +387,51 @@ namespace orc { } } + void TimestampColumnReader::nextInternalWithFilter(ColumnVectorBatch& rowBatch, + uint64_t numValues, char* notNull, + const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); + notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; + TimestampVectorBatch& timestampBatch = dynamic_cast<TimestampVectorBatch&>(rowBatch); + int64_t* secsBuffer = timestampBatch.data.data(); + secondsRle->next(secsBuffer, numValues, notNull); + int64_t* nanoBuffer = timestampBatch.nanoseconds.data(); + nanoRle->next(nanoBuffer, numValues, notNull); + + // Construct the values + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (notNull == nullptr || notNull[idx]) { + uint64_t zeros = nanoBuffer[idx] & 0x7; + nanoBuffer[idx] >>= 3; + if (zeros != 0) { + for (uint64_t j = 0; j <= zeros; ++j) { + nanoBuffer[idx] *= 10; + } + } + int64_t writerTime = secsBuffer[idx] + epochOffset; + if (!sameTimezone) { + // adjust timestamp value to same wall clock time if writer and reader + // time zones have different rules, which is required for Apache Orc. + const auto& wv = writerTimezone.getVariant(writerTime); + const auto& rv = readerTimezone.getVariant(writerTime); + if (!wv.hasSameTzRule(rv)) { + // If the timezone adjustment moves the millis across a DST boundary, + // we need to reevaluate the offsets. + int64_t adjustedTime = writerTime + wv.gmtOffset - rv.gmtOffset; + const auto& adjustedReader = readerTimezone.getVariant(adjustedTime); + writerTime = writerTime + wv.gmtOffset - adjustedReader.gmtOffset; + } + } + secsBuffer[idx] = writerTime; + if (secsBuffer[idx] < 0 && nanoBuffer[i] > 999999) { + secsBuffer[idx] -= 1; + } + } + } + } + void TimestampColumnReader::seekToRowGroup( std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { ColumnReader::seekToRowGroup(positions, readPhase); @@ -371,9 +447,11 @@ namespace orc { uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; private: std::unique_ptr<SeekableInputStream> inputStream; @@ -381,6 +459,13 @@ namespace orc { const char* bufferPointer; const char* bufferEnd; + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase); + + void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size); + unsigned char readByte() { if (bufferPointer == bufferEnd) { int length; @@ -442,6 +527,8 @@ namespace orc { } return static_cast<FloatType>(*result); } + + uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase); }; template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> @@ -456,7 +543,12 @@ namespace orc { uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::skip( uint64_t numValues, const ReadPhase& readPhase) { numValues = ColumnReader::skip(numValues, readPhase); + return skipInternal(numValues, readPhase); + } + template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> + uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::skipInternal( + uint64_t numValues, const ReadPhase& readPhase) { if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue * numValues) { bufferPointer += bytesPerValue * numValues; } else { @@ -477,6 +569,17 @@ namespace orc { template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::next( + ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + if (sel_rowid_idx == nullptr) { + nextInternal(rowBatch, numValues, notNull, readPhase); + } else { + nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + } + } + + template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> + void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::nextInternal( ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); // update the notNull from the parent class @@ -521,6 +624,68 @@ namespace orc { } } + template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> + void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::nextInternalWithFilter( + ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + // update the notNull from the parent class + notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; + ValueType* outArray = + reinterpret_cast<ValueType*>(dynamic_cast<BatchType&>(rowBatch).data.data()); + uint16_t previousIdx = 0; + + if constexpr (columnKind == FLOAT) { + if (notNull) { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), readPhase); + } + if (notNull[idx]) { + outArray[idx] = readFloat<ValueType>(); + } + previousIdx = idx + 1; + } + skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), readPhase); + } else { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(idx - previousIdx, readPhase); + } + outArray[idx] = readFloat<ValueType>(); + previousIdx = idx + 1; + } + skipInternal(numValues - previousIdx, readPhase); + } + } else { + if (notNull) { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), readPhase); + } + if (notNull[idx]) { + outArray[idx] = readDouble<ValueType>(); + } + previousIdx = idx + 1; + } + skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), readPhase); + } else { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(idx - previousIdx, readPhase); + } + outArray[idx] = readDouble<ValueType>(); + previousIdx = idx + 1; + } + skipInternal(numValues - previousIdx, readPhase); + } + } + } + template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::seekToRowGroup( std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { @@ -552,17 +717,26 @@ namespace orc { std::shared_ptr<StringDictionary> dictionary; std::unique_ptr<RleDecoder> rle; + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase); + void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size); + public: StringDictionaryColumnReader(const Type& type, StripeStreams& stipe); ~StringDictionaryColumnReader() override; uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; }; StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type, @@ -613,7 +787,17 @@ namespace orc { } void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + if (sel_rowid_idx == nullptr) { + nextInternal(rowBatch, numValues, notNull, readPhase); + } else { + nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + } + } + + void StringDictionaryColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, + char* notNull, const ReadPhase& readPhase) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); // update the notNull from the parent class notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; @@ -647,8 +831,58 @@ namespace orc { } } + void StringDictionaryColumnReader::nextInternalWithFilter(ColumnVectorBatch& rowBatch, + uint64_t numValues, char* notNull, + const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, + size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); + // update the notNull from the parent class + notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; + StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch); + char* blob = dictionary->dictionaryBlob.data(); + int64_t* dictionaryOffsets = dictionary->dictionaryOffset.data(); + char** outputStarts = byteBatch.data.data(); + int64_t* outputLengths = byteBatch.length.data(); + std::unique_ptr<int64_t[]> tmpOutputLengths(new int64_t[byteBatch.length.size()]); + rle->next(tmpOutputLengths.get(), numValues, notNull); + uint64_t dictionaryCount = dictionary->dictionaryOffset.size() - 1; + if (notNull) { + for (size_t i = 0; i < numValues; i++) { + outputStarts[i] = nullptr; + outputLengths[i] = 0; + } + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (notNull[idx]) { + int64_t entry = tmpOutputLengths[idx]; + if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) { + throw ParseError("Entry index out of range in StringDictionaryColumn"); + } + outputStarts[idx] = blob + dictionaryOffsets[entry]; + outputLengths[idx] = dictionaryOffsets[entry + 1] - dictionaryOffsets[entry]; + } + } + } else { + for (size_t i = 0; i < numValues; i++) { + outputStarts[i] = nullptr; + outputLengths[i] = 0; + } + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + int64_t entry = tmpOutputLengths[idx]; + if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) { + throw ParseError("Entry index out of range in StringDictionaryColumn"); + } + outputStarts[idx] = blob + dictionaryOffsets[entry]; + outputLengths[idx] = dictionaryOffsets[entry + 1] - dictionaryOffsets[entry]; + } + } + } + void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; rowBatch.isEncoded = true; @@ -688,9 +922,11 @@ namespace orc { uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; }; StringDirectColumnReader::StringDirectColumnReader(const Type& type, StripeStreams& stripe) @@ -760,7 +996,8 @@ namespace orc { } void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); // update the notNull from the parent class notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; @@ -836,15 +1073,19 @@ namespace orc { uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; private: template <bool encoded> - void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase); + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size); }; StructColumnReader::StructColumnReader(const Type& type, StripeStreams& stripe, @@ -879,35 +1120,40 @@ namespace orc { return numValues; } - void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { - nextInternal<false>(rowBatch, numValues, notNull, readPhase); + void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + nextInternal<false>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); } void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { - nextInternal<true>(rowBatch, numValues, notNull, readPhase); + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); } template <bool encoded> void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { - ColumnReader::next(rowBatch, numValues, notNull, readPhase); + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); uint64_t i = 0; notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) { if (shouldProcessChild((*iter)->getType().getReaderCategory(), readPhase)) { if (encoded) { - (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch &>(rowBatch).fields[i]), numValues, - notNull, readPhase); + (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), numValues, + notNull, readPhase, sel_rowid_idx, sel_size); } else { - (*iter)->next(*(dynamic_cast<StructVectorBatch &>(rowBatch).fields[i]), numValues, notNull, readPhase); + (*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), numValues, notNull, + readPhase, sel_rowid_idx, sel_size); } } } } - void StructColumnReader::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + void StructColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) { ColumnReader::seekToRowGroup(positions, readPhase); for (auto& ptr : children) { @@ -928,15 +1174,19 @@ namespace orc { uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; private: template <bool encoded> - void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase); + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size); }; ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe, @@ -982,19 +1232,23 @@ namespace orc { return numValues; } - void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { - nextInternal<false>(rowBatch, numValues, notNull, readPhase); + void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + nextInternal<false>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); } - void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { - nextInternal<true>(rowBatch, numValues, notNull, readPhase); + void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); } template <bool encoded> void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { - ColumnReader::next(rowBatch, numValues, notNull, readPhase); + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); ListVectorBatch& listBatch = dynamic_cast<ListVectorBatch&>(rowBatch); int64_t* offsets = listBatch.offsets.data(); notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr; @@ -1021,14 +1275,17 @@ namespace orc { ColumnReader* childReader = child.get(); if (childReader) { if (encoded) { - childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, nullptr, readPhase); + childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, nullptr, readPhase, + sel_rowid_idx, sel_size); } else { - childReader->next(*(listBatch.elements.get()), totalChildren, nullptr, readPhase); + childReader->next(*(listBatch.elements.get()), totalChildren, nullptr, readPhase, + sel_rowid_idx, sel_size); } } } - void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) { ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); if (child.get()) { @@ -1048,15 +1305,19 @@ namespace orc { uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; private: template <bool encoded> - void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase); + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size); }; MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe, @@ -1112,19 +1373,22 @@ namespace orc { return numValues; } - void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { - nextInternal<false>(rowBatch, numValues, notNull, readPhase); + void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) { + nextInternal<false>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); } - void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { - nextInternal<true>(rowBatch, numValues, notNull, readPhase); + void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); } template <bool encoded> - void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { - ColumnReader::next(rowBatch, numValues, notNull, readPhase); + void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); MapVectorBatch& mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch); int64_t* offsets = mapBatch.offsets.data(); notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr; @@ -1159,14 +1423,16 @@ namespace orc { ColumnReader* rawElementReader = elementReader.get(); if (rawElementReader) { if (encoded) { - rawElementReader->nextEncoded(*(mapBatch.elements.get()), totalChildren, nullptr, readPhase); + rawElementReader->nextEncoded(*(mapBatch.elements.get()), totalChildren, nullptr, + readPhase); } else { rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr, readPhase); } } } - void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) { ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); if (keyReader.get()) { @@ -1189,15 +1455,19 @@ namespace orc { uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; private: template <bool encoded> - void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase); + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size); }; UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe, @@ -1237,26 +1507,30 @@ namespace orc { lengthsRead += chunk; } for (size_t i = 0; i < numChildren; ++i) { - if (counts[i] != 0 && childrenReader[i] != nullptr - && shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { + if (counts[i] != 0 && childrenReader[i] != nullptr && + shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { childrenReader[i]->skip(static_cast<uint64_t>(counts[i]), readPhase); } } return numValues; } - void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { - nextInternal<false>(rowBatch, numValues, notNull, readPhase); + void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + nextInternal<false>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); } void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { - nextInternal<true>(rowBatch, numValues, notNull, readPhase); + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); } template <bool encoded> void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch); uint64_t* offsets = unionBatch.offsets.data(); @@ -1279,7 +1553,8 @@ namespace orc { } // read the right number of each child column for (size_t i = 0; i < numChildren; ++i) { - if (childrenReader[i] != nullptr && shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { + if (childrenReader[i] != nullptr && + shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { if (encoded) { childrenReader[i]->nextEncoded(*(unionBatch.children[i]), static_cast<uint64_t>(counts[i]), nullptr, readPhase); @@ -1291,12 +1566,13 @@ namespace orc { } } - void UnionColumnReader::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + void UnionColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) { ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); for (size_t i = 0; i < numChildren; ++i) { - if (childrenReader[i] != nullptr && shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { + if (childrenReader[i] != nullptr && + shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { childrenReader[i]->seekToRowGroup(positions, readPhase); } } @@ -1367,15 +1643,24 @@ namespace orc { } } + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase); + void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size); + uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase); + public: Decimal64ColumnReader(const Type& type, StripeStreams& stipe); ~Decimal64ColumnReader() override; uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase) override; }; const uint32_t Decimal64ColumnReader::MAX_PRECISION_64; const uint32_t Decimal64ColumnReader::MAX_PRECISION_128; @@ -1420,6 +1705,12 @@ namespace orc { uint64_t Decimal64ColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { numValues = ColumnReader::skip(numValues, readPhase); + numValues = skipInternal(numValues, readPhase); + scaleDecoder->skip(numValues); + return numValues; + } + + uint64_t Decimal64ColumnReader::skipInternal(uint64_t numValues, const ReadPhase& readPhase) { uint64_t skipped = 0; while (skipped < numValues) { readBuffer(); @@ -1427,11 +1718,21 @@ namespace orc { skipped += 1; } } - scaleDecoder->skip(numValues); return numValues; } - void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + if (sel_rowid_idx == nullptr) { + nextInternal(rowBatch, numValues, notNull, readPhase); + } else { + nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + } + } + + void Decimal64ColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, + char* notNull, const ReadPhase& readPhase) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch); @@ -1454,6 +1755,46 @@ namespace orc { } } + void Decimal64ColumnReader::nextInternalWithFilter(ColumnVectorBatch& rowBatch, + uint64_t numValues, char* notNull, + const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); + notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; + Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch); + int64_t* values = batch.values.data(); + int64_t* scaleBuffer = batch.readScales.data(); + scaleDecoder->next(scaleBuffer, numValues, notNull); + batch.precision = precision; + batch.scale = scale; + + uint16_t previousIdx = 0; + if (notNull) { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), readPhase); + } + if (notNull[idx]) { + readInt64(values[idx], static_cast<int32_t>(scaleBuffer[idx])); + ; + } + previousIdx = idx + 1; + } + skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), readPhase); + } else { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(idx - previousIdx, readPhase); + } + readInt64(values[idx], static_cast<int32_t>(scaleBuffer[idx])); + previousIdx = idx + 1; + } + skipInternal(numValues - previousIdx, readPhase); + } + } + void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) { if (scale > currentScale) { while (scale > currentScale) { @@ -1488,7 +1829,8 @@ namespace orc { Decimal128ColumnReader(const Type& type, StripeStreams& stipe); ~Decimal128ColumnReader() override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; private: void readInt128(Int128& value, int32_t currentScale) { @@ -1509,6 +1851,13 @@ namespace orc { unZigZagInt128(value); scaleInt128(value, static_cast<uint32_t>(scale), static_cast<uint32_t>(currentScale)); } + + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase); + + void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size); }; Decimal128ColumnReader::Decimal128ColumnReader(const Type& type, StripeStreams& stripe) @@ -1520,8 +1869,18 @@ namespace orc { // PASS } - void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { + void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { + if (sel_rowid_idx == nullptr) { + nextInternal(rowBatch, numValues, notNull, readPhase); + } else { + nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + } + } + + void Decimal128ColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, + char* notNull, const ReadPhase& readPhase) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch); @@ -1544,6 +1903,46 @@ namespace orc { } } + void Decimal128ColumnReader::nextInternalWithFilter(ColumnVectorBatch& rowBatch, + uint64_t numValues, char* notNull, + const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); + notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; + Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch); + Int128* values = batch.values.data(); + // read the next group of scales + int64_t* scaleBuffer = batch.readScales.data(); + scaleDecoder->next(scaleBuffer, numValues, notNull); + batch.precision = precision; + batch.scale = scale; + + uint16_t previousIdx = 0; + if (notNull) { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), readPhase); + } + if (notNull[idx]) { + readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx])); + } + previousIdx = idx + 1; + } + skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), readPhase); + } else { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(idx - previousIdx, readPhase); + } + readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx])); + previousIdx = idx + 1; + } + skipInternal(numValues - previousIdx, readPhase); + } + } + class Decimal64ColumnReaderV2 : public ColumnReader { protected: std::unique_ptr<RleDecoder> valueDecoder; @@ -1556,7 +1955,8 @@ namespace orc { uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; }; Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe) @@ -1583,8 +1983,9 @@ namespace orc { return numValues; } - void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { + void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch); @@ -1635,11 +2036,19 @@ namespace orc { return value >= MIN_VALUE && value <= MAX_VALUE; } + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase); + + void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, + size_t sel_size); + public: DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe); ~DecimalHive11ColumnReader() override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t sel_size) override; }; DecimalHive11ColumnReader::DecimalHive11ColumnReader(const Type& type, StripeStreams& stripe) @@ -1654,7 +2063,17 @@ namespace orc { } void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull, const ReadPhase& readPhase) { + char* notNull, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + if (sel_rowid_idx == nullptr) { + nextInternal(rowBatch, numValues, notNull, readPhase); + } else { + nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, sel_size); + } + } + + void DecimalHive11ColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, + char* notNull, const ReadPhase& readPhase) { ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch); @@ -1698,6 +2117,67 @@ namespace orc { } } + void DecimalHive11ColumnReader::nextInternalWithFilter(ColumnVectorBatch& rowBatch, + uint64_t numValues, char* notNull, + const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, size_t sel_size) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); + notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; + Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch); + Int128* values = batch.values.data(); + // read the next group of scales + int64_t* scaleBuffer = batch.readScales.data(); + + scaleDecoder->next(scaleBuffer, numValues, notNull); + + batch.precision = precision; + batch.scale = scale; + + uint16_t previousIdx = 0; + if (notNull) { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), readPhase); + } + if (notNull[idx]) { + if (!readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx]))) { + if (throwOnOverflow) { + throw ParseError("Hive 0.11 decimal was more than 38 digits."); + } else { + *errorStream << "Warning: " + << "Hive 0.11 decimal with more than 38 digits " + << "replaced by NULL.\n"; + notNull[idx] = false; + } + } + } + previousIdx = idx + 1; + } + skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), readPhase); + } else { + for (size_t i = 0; i < sel_size; i++) { + uint16_t idx = sel_rowid_idx[i]; + if (idx - previousIdx > 0) { + skipInternal(idx - previousIdx, readPhase); + } + if (!readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx]))) { + if (throwOnOverflow) { + throw ParseError("Hive 0.11 decimal was more than 38 digits."); + } else { + *errorStream << "Warning: " + << "Hive 0.11 decimal with more than 38 digits " + << "replaced by NULL.\n"; + batch.hasNulls = true; + batch.notNull[idx] = false; + } + } + previousIdx = idx + 1; + } + skipInternal(numValues - previousIdx, readPhase); + } + } + static bool isLittleEndian() { static union { uint32_t i; diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh index 25363e2..fd89489 100644 --- a/c++/src/ColumnReader.hh +++ b/c++/src/ColumnReader.hh @@ -118,6 +118,16 @@ namespace orc { return readPhase.contains(readerCategory) || readerCategory == ReaderCategory::FILTER_PARENT; } + static int countNonNullRowsInRange(char* notNull, uint16_t start, uint16_t end) { + int result = 0; + while (start < end) { + if (notNull[start++]) { + result++; + } + } + return result; + } + public: ColumnReader(const Type& type, StripeStreams& stipe); @@ -142,7 +152,9 @@ namespace orc { * a mask (with at least numValues bytes) for which values to * set. */ - virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase = ReadPhase::ALL); + virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase = ReadPhase::ALL, + uint16_t* sel_rowid_idx = nullptr, size_t sel_size = 0); /** * Read the next group of values without decoding @@ -152,16 +164,19 @@ namespace orc { * a mask (with at least numValues bytes) for which values to * set. */ - virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase = ReadPhase::ALL) { + virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, + const ReadPhase& readPhase = ReadPhase::ALL, + uint16_t* sel_rowid_idx = nullptr, size_t sel_size = 0) { rowBatch.isEncoded = false; - next(rowBatch, numValues, notNull, readPhase); + next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx); } /** * Seek to beginning of a row group in the current stripe * @param positions a list of PositionProviders storing the positions */ - virtual void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase = ReadPhase::ALL); + virtual void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, + const ReadPhase& readPhase = ReadPhase::ALL); }; /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 52052fb..c132e46 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -247,8 +247,7 @@ namespace orc { } RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents, - const RowReaderOptions& opts, - const ORCFilter* _filter) + const RowReaderOptions& opts, const ORCFilter* _filter) : localTimezone(getLocalTimezone()), contents(_contents), throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()), @@ -319,7 +318,7 @@ namespace orc { std::unordered_set<int> filterColIds; if (!filterCols.empty()) { - for (auto& colName: filterCols) { + for (auto& colName : filterCols) { auto iter = nameTypeMap.find(colName); if (iter != nameTypeMap.end()) { Type* type = iter->second; @@ -405,8 +404,7 @@ namespace orc { return columnPath.substr(0, columnPath.length() - 1); } - - CompressionKind RowReaderImpl::getCompression() const { + CompressionKind RowReaderImpl::getCompression() const { return contents->compression; } @@ -900,7 +898,8 @@ namespace orc { return createRowReader(defaultOpts, filter); } - std::unique_ptr<RowReader> ReaderImpl::createRowReader(const RowReaderOptions& opts, const ORCFilter* filter) const { + std::unique_ptr<RowReader> ReaderImpl::createRowReader(const RowReaderOptions& opts, + const ORCFilter* filter) const { if (opts.getSearchArgument() && !isMetadataLoaded) { // load stripe statistics for PPD readMetadata(); @@ -1121,9 +1120,6 @@ namespace orc { rowsInCurrentStripe = currentStripeInfo.numberofrows(); processingStripe = currentStripe; - // read row group statistics and bloom filters of current stripe - loadStripeIndex(); - if (sargsApplier) { bool isStripeNeeded = true; if (contents->metadata) { @@ -1137,6 +1133,8 @@ namespace orc { } if (isStripeNeeded) { + // read row group statistics and bloom filters of current stripe + loadStripeIndex(); // select row groups to read in the current stripe sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, bloomFilterIndex); if (sargsApplier->hasSelectedFrom(currentRowInStripe)) { @@ -1170,7 +1168,8 @@ namespace orc { sargsApplier->getNextSkippedRows()); previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1; if (currentRowInStripe > 0) { - seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()), readPhase); + seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()), + readPhase); } } } else { @@ -1200,29 +1199,29 @@ namespace orc { followRowInStripe = currentRowInStripe; } rowsToRead = - std::min(static_cast<uint64_t>(data.capacity), - rowsInCurrentStripe - currentRowInStripe); + std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe - currentRowInStripe); if (sargsApplier) { - rowsToRead = computeBatchSize(rowsToRead, - currentRowInStripe, - rowsInCurrentStripe, - footer->rowindexstride(), - sargsApplier->getNextSkippedRows()); + rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe, rowsInCurrentStripe, + footer->rowindexstride(), sargsApplier->getNextSkippedRows()); } if (rowsToRead == 0) { markEndOfFile(); return readRows; } - uint16_t sel_rowid_idx[rowsToRead]; - nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx, arg); - - if (startReadPhase == ReadPhase::LEADERS && data.numElements > 0) { - // At least 1 row has been selected and as a result we read the follow columns into the - // row batch - nextBatch(data, rowsToRead, - prepareFollowReaders(footer->rowindexstride(), - currentRowInStripe, followRowInStripe), sel_rowid_idx, arg); - followRowInStripe = currentRowInStripe + rowsToRead; + if (startReadPhase == ReadPhase::LEADERS) { + auto sel_rowid_idx = std::make_unique<uint16_t[]>(rowsToRead); + nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx.get(), arg); + if (data.numElements > 0) { + // At least 1 row has been selected and as a result we read the follow columns into the + // row batch + nextBatch( + data, rowsToRead, + prepareFollowReaders(footer->rowindexstride(), currentRowInStripe, followRowInStripe), + sel_rowid_idx.get(), arg); + followRowInStripe = currentRowInStripe + rowsToRead; + } + } else { + nextBatch(data, rowsToRead, startReadPhase, nullptr, arg); } // update row number @@ -1230,11 +1229,11 @@ namespace orc { currentRowInStripe += rowsToRead; readRows += rowsToRead; - // check if we need to advance to next selected row group - if (sargsApplier) { - uint64_t nextRowToRead = - advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe, footer->rowindexstride(), - sargsApplier->getNextSkippedRows()); + // check if we need to advance to next selected row group + if (sargsApplier) { + uint64_t nextRowToRead = + advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe, footer->rowindexstride(), + sargsApplier->getNextSkippedRows()); if (currentRowInStripe != nextRowToRead) { // it is guaranteed to be at start of a row group currentRowInStripe = nextRowToRead; @@ -1253,16 +1252,22 @@ namespace orc { return readRows; } - void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& readPhase, uint16_t* sel_rowid_idx, void* arg) { - if (enableEncodedBlock) { - reader->nextEncoded(data, batchSize, nullptr, readPhase); - } - else { - reader->next(data, batchSize, nullptr, readPhase); - } + void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, void* arg) { if (readPhase == ReadPhase::ALL || readPhase == ReadPhase::LEADERS) { + if (enableEncodedBlock) { + reader->nextEncoded(data, batchSize, nullptr, readPhase); + } else { + reader->next(data, batchSize, nullptr, readPhase); + } // Set the batch size when reading everything or when reading FILTER columns data.numElements = batchSize; + } else { + if (enableEncodedBlock) { + reader->nextEncoded(data, batchSize, nullptr, readPhase, sel_rowid_idx, data.numElements); + } else { + reader->next(data, batchSize, nullptr, readPhase, sel_rowid_idx, data.numElements); + } } if (readPhase == ReadPhase::LEADERS) { @@ -1274,80 +1279,81 @@ namespace orc { } } - /** + /** * Determine the RowGroup based on the supplied row id. * @param rowIdx Row for which the row group is being determined * @return Id of the RowGroup that the row belongs to */ - int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) { - return rowIndexStride == 0 ? 0 : (int) (rowIdx / rowIndexStride); - } - - /** - * This method prepares the non-filter column readers for next batch. This involves the following - * 1. Determine position - * 2. Perform IO if required - * 3. Position the non-filter readers - * - * This method is repositioning the non-filter columns and as such this method shall never have to - * deal with navigating the stripe forward or skipping row groups, all of this should have already - * taken place based on the filter columns. - * @param toFollowRow The rowIdx identifies the required row position within the stripe for - * follow read - * @param fromFollowRow Indicates the current position of the follow read, exclusive - * @return the read phase for reading non-filter columns, this shall be FOLLOWERS_AND_PARENTS in - * case of a seek otherwise will be FOLLOWERS - */ - ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow, long fromFollowRow) { - // 1. Determine the required row group and skip rows needed from the RG start - int needRG = computeRGIdx(rowIndexStride, toFollowRow); - // The current row is not yet read so we -1 to compute the previously read row group - int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1); - long skipRows; - if (needRG == readRG && toFollowRow >= fromFollowRow) { - // In case we are skipping forward within the same row group, we compute skip rows from the - // current position - skipRows = toFollowRow - fromFollowRow; - } else { - // In all other cases including seeking backwards, we compute the skip rows from the start of - // the required row group - skipRows = toFollowRow - (needRG * rowIndexStride); - } - - // 2. Plan the row group idx for the non-filter columns if this has not already taken place - if (needsFollowColumnsRead) { - needsFollowColumnsRead = false; - } + int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) { + return rowIndexStride == 0 ? 0 : (int)(rowIdx / rowIndexStride); + } - // 3. Position the non-filter readers to the required RG and skipRows - ReadPhase result = ReadPhase::FOLLOWERS; - if (needRG != readRG || toFollowRow < fromFollowRow) { - // When having to change a row group or in case of back navigation, seek both the filter - // parents and non-filter. This will re-position the parents present vector. This is needed - // to determine the number of non-null values to skip on the non-filter columns. - seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS); - // skip rows on both the filter parents and non-filter as both have been positioned in the - // previous step - reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS); - result = ReadPhase::FOLLOWERS_AND_PARENTS; - } else if (skipRows > 0) { - // in case we are only skipping within the row group, position the filter parents back to the - // position of the follow. This is required to determine the non-null values to skip on the - // non-filter columns. - seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS); - reader->skip(fromFollowRow - (readRG * rowIndexStride), ReadPhase::LEADER_PARENTS); - // Move both the filter parents and non-filter forward, this will compute the correct - // non-null skips on follow children - reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS); - result = ReadPhase::FOLLOWERS_AND_PARENTS; - } - // Identifies the read level that should be performed for the read - // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both non-filter and filter parents - // FOLLOWERS indicates read only of the non-filter level without the parents, which is used during - // contiguous read. During a contiguous read no skips are needed and the non-null information of - // the parent is available in the column vector for use during non-filter read - return result; - } + /** + * This method prepares the non-filter column readers for next batch. This involves the following + * 1. Determine position + * 2. Perform IO if required + * 3. Position the non-filter readers + * + * This method is repositioning the non-filter columns and as such this method shall never have to + * deal with navigating the stripe forward or skipping row groups, all of this should have already + * taken place based on the filter columns. + * @param toFollowRow The rowIdx identifies the required row position within the stripe for + * follow read + * @param fromFollowRow Indicates the current position of the follow read, exclusive + * @return the read phase for reading non-filter columns, this shall be FOLLOWERS_AND_PARENTS in + * case of a seek otherwise will be FOLLOWERS + */ + ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow, + long fromFollowRow) { + // 1. Determine the required row group and skip rows needed from the RG start + int needRG = computeRGIdx(rowIndexStride, toFollowRow); + // The current row is not yet read so we -1 to compute the previously read row group + int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1); + long skipRows; + if (needRG == readRG && toFollowRow >= fromFollowRow) { + // In case we are skipping forward within the same row group, we compute skip rows from the + // current position + skipRows = toFollowRow - fromFollowRow; + } else { + // In all other cases including seeking backwards, we compute the skip rows from the start of + // the required row group + skipRows = toFollowRow - (needRG * rowIndexStride); + } + + // 2. Plan the row group idx for the non-filter columns if this has not already taken place + if (needsFollowColumnsRead) { + needsFollowColumnsRead = false; + } + + // 3. Position the non-filter readers to the required RG and skipRows + ReadPhase result = ReadPhase::FOLLOWERS; + if (needRG != readRG || toFollowRow < fromFollowRow) { + // When having to change a row group or in case of back navigation, seek both the filter + // parents and non-filter. This will re-position the parents present vector. This is needed + // to determine the number of non-null values to skip on the non-filter columns. + seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS); + // skip rows on both the filter parents and non-filter as both have been positioned in the + // previous step + reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS); + result = ReadPhase::FOLLOWERS_AND_PARENTS; + } else if (skipRows > 0) { + // in case we are only skipping within the row group, position the filter parents back to the + // position of the follow. This is required to determine the non-null values to skip on the + // non-filter columns. + seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS); + reader->skip(fromFollowRow - (readRG * rowIndexStride), ReadPhase::LEADER_PARENTS); + // Move both the filter parents and non-filter forward, this will compute the correct + // non-null skips on follow children + reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS); + result = ReadPhase::FOLLOWERS_AND_PARENTS; + } + // Identifies the read level that should be performed for the read + // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both non-filter and filter parents + // FOLLOWERS indicates read only of the non-filter level without the parents, which is used + // during contiguous read. During a contiguous read no skips are needed and the non-null + // information of the parent is available in the column vector for use during non-filter read + return result; + } uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t currentRowInStripe, uint64_t rowsInCurrentStripe, uint64_t rowIndexStride, diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index 0c52387..d84532c 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -35,22 +35,23 @@ namespace orc { static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024; class ReaderContext { - public: - ReaderContext() = default; - - const ORCFilter* getFilterCallback() const { - return filter; - } - - ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds, const ORCFilter* _filter) { - this->filterColumnIds = std::move(_filterColumnIds); - this->filter = _filter; - return *this; - } - - private: - std::unordered_set<int> filterColumnIds; - const ORCFilter* filter; + public: + ReaderContext() = default; + + const ORCFilter* getFilterCallback() const { + return filter; + } + + ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds, + const ORCFilter* _filter) { + this->filterColumnIds = std::move(_filterColumnIds); + this->filter = _filter; + return *this; + } + + private: + std::unordered_set<int> filterColumnIds; + const ORCFilter* filter; }; /** @@ -237,25 +238,26 @@ namespace orc { */ bool hasBadBloomFilters(); - // build map from type name and id, id to Type void buildTypeNameIdMap(Type* type); std::string toDotColumnPath(); - void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& readPhase, uint16_t* sel_rowid_idx, void* arg); + void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& readPhase, + uint16_t* sel_rowid_idx, void* arg); int computeRGIdx(uint64_t rowIndexStride, long rowIdx); ReadPhase prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow, long fromFollowRow); - public: + public: /** * Constructor that lets the user specify additional options. * @param contents of the file * @param options options for reading */ - RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& options, const ORCFilter* filter = nullptr); + RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& options, + const ORCFilter* filter = nullptr); // Select the columns from the options object const std::vector<bool> getSelectedColumns() const override; @@ -357,7 +359,8 @@ namespace orc { std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter = nullptr) const override; - std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options, const ORCFilter* filter = nullptr) const override; + std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options, + const ORCFilter* filter = nullptr) const override; uint64_t getContentLength() const override; uint64_t getStripeStatisticsLength() const override; diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc index 47095b3..3f28dce 100644 --- a/c++/src/TypeImpl.cc +++ b/c++/src/TypeImpl.cc @@ -25,11 +25,15 @@ namespace orc { - const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories({ ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT, ReaderCategory::NON_FILTER }); - const ReadPhase ReadPhase::LEADERS = ReadPhase::fromCategories({ ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT }); - const ReadPhase ReadPhase::FOLLOWERS = ReadPhase::fromCategories({ ReaderCategory::NON_FILTER }); - const ReadPhase ReadPhase::LEADER_PARENTS = ReadPhase::fromCategories({ ReaderCategory::FILTER_PARENT }); - const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS = ReadPhase::fromCategories({ ReaderCategory::FILTER_PARENT, ReaderCategory::NON_FILTER }); + const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories( + {ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT, ReaderCategory::NON_FILTER}); + const ReadPhase ReadPhase::LEADERS = + ReadPhase::fromCategories({ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT}); + const ReadPhase ReadPhase::FOLLOWERS = ReadPhase::fromCategories({ReaderCategory::NON_FILTER}); + const ReadPhase ReadPhase::LEADER_PARENTS = + ReadPhase::fromCategories({ReaderCategory::FILTER_PARENT}); + const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS = + ReadPhase::fromCategories({ReaderCategory::FILTER_PARENT, ReaderCategory::NON_FILTER}); Type::~Type() { // PASS --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org