This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch orc in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push: new ff8c18e [Feature] Implements ORC lazy materialization. (#56) ff8c18e is described below commit ff8c18e96d55942f3767a2d41960cabfb0efd144 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Thu Apr 27 13:56:21 2023 +0800 [Feature] Implements ORC lazy materialization. (#56) Refer to orc java implementation: LazyIO of non-filter columns in the presence of filters Note: Row-level filtering by selection vector is currently not implemented, will do it in future PR. --- .clang-format | 26 ++++ c++/include/orc/Reader.hh | 48 ++++++- c++/include/orc/Type.hh | 39 +++++ c++/src/ColumnReader.cc | 357 ++++++++++++++++++++++++---------------------- c++/src/ColumnReader.hh | 19 ++- c++/src/Options.hh | 28 ++++ c++/src/Reader.cc | 295 +++++++++++++++++++++++++++++++------- c++/src/Reader.hh | 55 ++++++- c++/src/TypeImpl.cc | 29 +++- c++/src/TypeImpl.hh | 9 ++ 10 files changed, 663 insertions(+), 242 deletions(-) diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..0779071 --- /dev/null +++ b/.clang-format @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +Language: Cpp +BasedOnStyle: Google +ColumnLimit: 100 +IndentWidth: 2 +NamespaceIndentation: All +UseTab: Never +AllowShortFunctionsOnASingleLine: Empty +DerivePointerAlignment: false +IncludeBlocks: Preserve diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index d8f83f9..2666b4d 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -186,6 +186,26 @@ namespace orc { RowReaderOptions& includeTypes(const std::list<uint64_t>& types); /** + * For files that have structs as the top-level object, filter the fields. + * by index. The first field is 0, the second 1, and so on. By default, + * all columns are read. This option clears any previous setting of + * the selected columns. + * @param filterColIndexes a list of fields to read + * @return this + */ + RowReaderOptions& filter(const std::list<uint64_t>& filterColIndexes); + + /** + * For files that have structs as the top-level object, filter the fields + * by name. By default, all columns are read. This option clears + * any previous setting of the selected columns. + * @param filterColNames a list of fields to read + * @return this + */ + RowReaderOptions& filter(const std::list<std::string>& filterColNames); + + + /** * A map type of <typeId, ReadIntent>. */ typedef std::map<uint64_t, ReadIntent> IdReadIntentMap; @@ -281,6 +301,12 @@ namespace orc { */ const std::list<std::string>& getIncludeNames() const; + /** + * Get the list of selected columns to read. All children of the selected + * columns are also selected. + */ + const std::list<std::string>& getFilterColNames() const; + /** * Get the start of the range for the data being processed. * @return if not set, return 0 @@ -338,6 +364,12 @@ namespace orc { bool getUseTightNumericVector() const; }; + class ORCFilter { + public: + virtual ~ORCFilter() = default; + virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg = nullptr) const = 0; + }; + class RowReader; /** @@ -523,14 +555,14 @@ namespace orc { * Create a RowReader based on this reader with the default options. * @return a RowReader to read the rows */ - virtual std::unique_ptr<RowReader> createRowReader() const = 0; + virtual std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter = nullptr) const = 0; /** * Create a RowReader based on this reader. * @param options RowReader Options * @return a RowReader to read the rows */ - virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options) const = 0; + virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options, const ORCFilter* filter = nullptr) const = 0; /** * Get the name of the input stream. @@ -616,13 +648,23 @@ namespace orc { */ virtual std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size) const = 0; + /** + * Read the next row batch from the current position. + * Caller must look at numElements in the row batch to determine how + * many rows were read. + * @param data the row batch to read into. + * @param arg argument. + * @return number of rows. + */ + virtual uint64_t nextBatch(ColumnVectorBatch& data, void* arg = nullptr) = 0; + /** * Read the next row batch from the current position. * Caller must look at numElements in the row batch to determine how * many rows were read. * @param data the row batch to read into. * @return true if a non-zero number of rows were read or false if the - * end of the file was reached. + * end of the file was reached. */ virtual bool next(ColumnVectorBatch& data) = 0; diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh index c8ada75..73b813e 100644 --- a/c++/include/orc/Type.hh +++ b/c++/include/orc/Type.hh @@ -19,11 +19,46 @@ #ifndef ORC_TYPE_HH #define ORC_TYPE_HH +#include <bitset> +#include <unordered_set> #include "MemoryPool.hh" #include "orc/Vector.hh" #include "orc/orc-config.hh" namespace orc { + enum class ReaderCategory { + FILTER_CHILD, // Primitive type that is a filter column + FILTER_PARENT, // Compound type with filter children + NON_FILTER // Non-filter column + }; + + class ReadPhase { + public: + static const int NUM_CATEGORIES = 3; // Number of values in ReaderCategory + std::bitset<NUM_CATEGORIES> categories; + + static const ReadPhase ALL; + static const ReadPhase LEADERS; + static const ReadPhase FOLLOWERS; + static const ReadPhase LEADER_PARENTS; + static const ReadPhase FOLLOWERS_AND_PARENTS; + + static ReadPhase fromCategories(const std::unordered_set<ReaderCategory>& cats) { + ReadPhase phase; + for (ReaderCategory cat : cats) { + phase.categories.set(static_cast<size_t>(cat)); + } + return phase; + } + + bool contains(ReaderCategory cat) const { + return categories.test(static_cast<size_t>(cat)); + } + + bool operator==(const ReadPhase& other) const { + return categories == other.categories; + } + }; enum TypeKind { BOOLEAN = 0, @@ -54,7 +89,9 @@ namespace orc { virtual uint64_t getMaximumColumnId() const = 0; virtual TypeKind getKind() const = 0; virtual uint64_t getSubtypeCount() const = 0; + virtual Type* getParent() const = 0; virtual const Type* getSubtype(uint64_t childId) const = 0; + virtual Type* getSubtype(uint64_t childId) = 0; virtual const std::string& getFieldName(uint64_t childId) const = 0; virtual uint64_t getMaximumLength() const = 0; virtual uint64_t getPrecision() const = 0; @@ -64,6 +101,8 @@ namespace orc { virtual Type& removeAttribute(const std::string& key) = 0; virtual std::vector<std::string> getAttributeKeys() const = 0; virtual std::string getAttributeValue(const std::string& key) const = 0; + virtual ReaderCategory getReaderCategory() const = 0; + virtual void setReaderCategory(ReaderCategory readerCategory) = 0; virtual std::string toString() const = 0; /** * Get the Type with the given column ID diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc index 2a72c80..cabcdbe 100644 --- a/c++/src/ColumnReader.cc +++ b/c++/src/ColumnReader.cc @@ -46,8 +46,9 @@ namespace orc { } } - ColumnReader::ColumnReader(const Type& type, StripeStreams& stripe) - : columnId(type.getColumnId()), + ColumnReader::ColumnReader(const Type& _type, StripeStreams& stripe) + : type(_type), + columnId(type.getColumnId()), memoryPool(stripe.getMemoryPool()), metrics(stripe.getReaderMetrics()) { std::unique_ptr<SeekableInputStream> stream = @@ -61,7 +62,7 @@ namespace orc { // PASS } - uint64_t ColumnReader::skip(uint64_t numValues) { + uint64_t ColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { ByteRleDecoder* decoder = notNullDecoder.get(); if (decoder) { // page through the values that we want to skip @@ -84,7 +85,7 @@ namespace orc { return numValues; } - void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* incomingMask) { + void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* incomingMask, const ReadPhase& readPhase) { if (numValues > rowBatch.capacity) { rowBatch.resize(numValues); } @@ -100,16 +101,19 @@ namespace orc { return; } } + rowBatch.hasNulls = false; } else if (incomingMask) { // If we don't have a notNull stream, copy the incomingMask rowBatch.hasNulls = true; memcpy(rowBatch.notNull.data(), incomingMask, numValues); return; + } else { + rowBatch.hasNulls = false; + memset(rowBatch.notNull.data(), 1, numValues); } - rowBatch.hasNulls = false; } - void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) { + void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { if (notNullDecoder.get()) { notNullDecoder->seek(positions.at(columnId)); } @@ -142,11 +146,11 @@ namespace orc { BooleanColumnReader(const Type& type, StripeStreams& stipe); ~BooleanColumnReader() override; - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; }; template <typename BatchType> @@ -164,16 +168,16 @@ namespace orc { } template <typename BatchType> - uint64_t BooleanColumnReader<BatchType>::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t BooleanColumnReader<BatchType>::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); rle->skip(numValues); return numValues; } template <typename BatchType> void BooleanColumnReader<BatchType>::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); // Since the byte rle places the output in a char* and BatchType here may be // LongVectorBatch with long*. We cheat here in that case and use the long* // and then expand it in a second pass.. @@ -185,8 +189,8 @@ namespace orc { template <typename BatchType> void BooleanColumnReader<BatchType>::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); } @@ -205,14 +209,14 @@ namespace orc { ~ByteColumnReader() override = default; - uint64_t skip(uint64_t numValues) override { - numValues = ColumnReader::skip(numValues); + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override { + numValues = ColumnReader::skip(numValues, readPhase); rle->skip(numValues); return numValues; } - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override { - ColumnReader::next(rowBatch, numValues, notNull); + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); // Since the byte rle places the output in a char* instead of long*, // we cheat here and use the long* and then expand it in a second pass. auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data(); @@ -221,8 +225,8 @@ namespace orc { expandBytesToIntegers(ptr, numValues); } - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override { - ColumnReader::seekToRowGroup(positions); + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override { + ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); } }; @@ -245,20 +249,20 @@ namespace orc { // PASS } - uint64_t skip(uint64_t numValues) override { - numValues = ColumnReader::skip(numValues); + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override { + numValues = ColumnReader::skip(numValues, readPhase); rle->skip(numValues); return numValues; } - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override { - ColumnReader::next(rowBatch, numValues, notNull); + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr); } - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override { - ColumnReader::seekToRowGroup(positions); + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override { + ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); } }; @@ -276,11 +280,11 @@ namespace orc { TimestampColumnReader(const Type& type, StripeStreams& stripe, bool isInstantType); ~TimestampColumnReader() override; - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; }; TimestampColumnReader::TimestampColumnReader(const Type& type, StripeStreams& stripe, @@ -304,15 +308,15 @@ namespace orc { // PASS } - uint64_t TimestampColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t TimestampColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); secondsRle->skip(numValues); nanoRle->skip(numValues); return numValues; } - void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; TimestampVectorBatch& timestampBatch = dynamic_cast<TimestampVectorBatch&>(rowBatch); int64_t* secsBuffer = timestampBatch.data.data(); @@ -353,8 +357,8 @@ namespace orc { } void TimestampColumnReader::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); secondsRle->seek(positions.at(columnId)); nanoRle->seek(positions.at(columnId)); } @@ -365,11 +369,11 @@ namespace orc { DoubleColumnReader(const Type& type, StripeStreams& stripe); ~DoubleColumnReader() override {} - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; private: std::unique_ptr<SeekableInputStream> inputStream; @@ -450,8 +454,8 @@ namespace orc { template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::skip( - uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue * numValues) { bufferPointer += bytesPerValue * numValues; @@ -473,8 +477,8 @@ namespace orc { template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::next( - ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); // update the notNull from the parent class notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; ValueType* outArray = @@ -519,8 +523,8 @@ namespace orc { template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType> void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); inputStream->seek(positions.at(columnId)); // clear buffer state after seek bufferEnd = nullptr; @@ -552,13 +556,13 @@ namespace orc { StringDictionaryColumnReader(const Type& type, StripeStreams& stipe); ~StringDictionaryColumnReader() override; - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; }; StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type, @@ -602,15 +606,15 @@ namespace orc { // PASS } - uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t StringDictionaryColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); rle->skip(numValues); return numValues; } void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); // update the notNull from the parent class notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch); @@ -644,8 +648,8 @@ namespace orc { } void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; rowBatch.isEncoded = true; @@ -657,8 +661,8 @@ namespace orc { } void StringDictionaryColumnReader::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); } @@ -682,11 +686,11 @@ namespace orc { StringDirectColumnReader(const Type& type, StripeStreams& stipe); ~StringDirectColumnReader() override; - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; }; StringDirectColumnReader::StringDirectColumnReader(const Type& type, StripeStreams& stripe) @@ -706,9 +710,9 @@ namespace orc { // PASS } - uint64_t StringDirectColumnReader::skip(uint64_t numValues) { + uint64_t StringDirectColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { const size_t BUFFER_SIZE = 1024; - numValues = ColumnReader::skip(numValues); + numValues = ColumnReader::skip(numValues, readPhase); int64_t buffer[BUFFER_SIZE]; uint64_t done = 0; size_t totalBytes = 0; @@ -756,8 +760,8 @@ namespace orc { } void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); // update the notNull from the parent class notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch); @@ -814,8 +818,8 @@ namespace orc { } void StringDirectColumnReader::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); blobStream->seek(positions.at(columnId)); lengthRle->seek(positions.at(columnId)); // clear buffer state after seek @@ -830,17 +834,17 @@ namespace orc { public: StructColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false); - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; private: template <bool encoded> - void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull); + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase); }; StructColumnReader::StructColumnReader(const Type& type, StripeStreams& stripe, @@ -865,45 +869,51 @@ namespace orc { } } - uint64_t StructColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t StructColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); for (auto& ptr : children) { - ptr->skip(numValues); + if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) { + ptr->skip(numValues, readPhase); + } } return numValues; } - void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) { - nextInternal<false>(rowBatch, numValues, notNull); + void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + nextInternal<false>(rowBatch, numValues, notNull, readPhase); } void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - nextInternal<true>(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + nextInternal<true>(rowBatch, numValues, notNull, readPhase); } template <bool encoded> void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); uint64_t i = 0; notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) { - if (encoded) { - (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), numValues, - notNull); - } else { - (*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), numValues, notNull); + if (shouldProcessChild((*iter)->getType().getReaderCategory(), readPhase)) { + if (encoded) { + (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch &>(rowBatch).fields[i]), numValues, + notNull, readPhase); + } else { + (*iter)->next(*(dynamic_cast<StructVectorBatch &>(rowBatch).fields[i]), numValues, notNull, readPhase); + } } } } void StructColumnReader::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); for (auto& ptr : children) { - ptr->seekToRowGroup(positions); + if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) { + ptr->seekToRowGroup(positions, readPhase); + } } } @@ -916,17 +926,17 @@ namespace orc { ListColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false); ~ListColumnReader() override; - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; private: template <bool encoded> - void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull); + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase); }; ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe, @@ -949,8 +959,8 @@ namespace orc { // PASS } - uint64_t ListColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t ListColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); ColumnReader* childReader = child.get(); if (childReader) { const uint64_t BUFFER_SIZE = 1024; @@ -965,26 +975,26 @@ namespace orc { } lengthsRead += chunk; } - childReader->skip(childrenElements); + childReader->skip(childrenElements, readPhase); } else { rle->skip(numValues); } return numValues; } - void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) { - nextInternal<false>(rowBatch, numValues, notNull); + void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + nextInternal<false>(rowBatch, numValues, notNull, readPhase); } void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - nextInternal<true>(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + nextInternal<true>(rowBatch, numValues, notNull, readPhase); } template <bool encoded> void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); ListVectorBatch& listBatch = dynamic_cast<ListVectorBatch&>(rowBatch); int64_t* offsets = listBatch.offsets.data(); notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr; @@ -1011,18 +1021,18 @@ namespace orc { ColumnReader* childReader = child.get(); if (childReader) { if (encoded) { - childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, nullptr); + childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, nullptr, readPhase); } else { - childReader->next(*(listBatch.elements.get()), totalChildren, nullptr); + childReader->next(*(listBatch.elements.get()), totalChildren, nullptr, readPhase); } } } - void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); if (child.get()) { - child->seekToRowGroup(positions); + child->seekToRowGroup(positions, readPhase); } } @@ -1036,17 +1046,17 @@ namespace orc { MapColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false); ~MapColumnReader() override; - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; private: template <bool encoded> - void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull); + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase); }; MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe, @@ -1073,8 +1083,8 @@ namespace orc { // PASS } - uint64_t MapColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t MapColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); ColumnReader* rawKeyReader = keyReader.get(); ColumnReader* rawElementReader = elementReader.get(); if (rawKeyReader || rawElementReader) { @@ -1091,10 +1101,10 @@ namespace orc { lengthsRead += chunk; } if (rawKeyReader) { - rawKeyReader->skip(childrenElements); + rawKeyReader->skip(childrenElements, readPhase); } if (rawElementReader) { - rawElementReader->skip(childrenElements); + rawElementReader->skip(childrenElements, readPhase); } } else { rle->skip(numValues); @@ -1102,19 +1112,19 @@ namespace orc { return numValues; } - void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) { - nextInternal<false>(rowBatch, numValues, notNull); + void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + nextInternal<false>(rowBatch, numValues, notNull, readPhase); } void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - nextInternal<true>(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + nextInternal<true>(rowBatch, numValues, notNull, readPhase); } template <bool encoded> void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); MapVectorBatch& mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch); int64_t* offsets = mapBatch.offsets.data(); notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr; @@ -1141,29 +1151,29 @@ namespace orc { ColumnReader* rawKeyReader = keyReader.get(); if (rawKeyReader) { if (encoded) { - rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren, nullptr); + rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren, nullptr, readPhase); } else { - rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr); + rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr, readPhase); } } ColumnReader* rawElementReader = elementReader.get(); if (rawElementReader) { if (encoded) { - rawElementReader->nextEncoded(*(mapBatch.elements.get()), totalChildren, nullptr); + rawElementReader->nextEncoded(*(mapBatch.elements.get()), totalChildren, nullptr, readPhase); } else { - rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr); + rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr, readPhase); } } } - void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); if (keyReader.get()) { - keyReader->seekToRowGroup(positions); + keyReader->seekToRowGroup(positions, readPhase); } if (elementReader.get()) { - elementReader->seekToRowGroup(positions); + elementReader->seekToRowGroup(positions, readPhase); } } @@ -1177,17 +1187,17 @@ namespace orc { public: UnionColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false); - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; private: template <bool encoded> - void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull); + void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase); }; UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe, @@ -1211,8 +1221,8 @@ namespace orc { } } - uint64_t UnionColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t UnionColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); const uint64_t BUFFER_SIZE = 1024; char buffer[BUFFER_SIZE]; uint64_t lengthsRead = 0; @@ -1227,26 +1237,27 @@ namespace orc { lengthsRead += chunk; } for (size_t i = 0; i < numChildren; ++i) { - if (counts[i] != 0 && childrenReader[i] != nullptr) { - childrenReader[i]->skip(static_cast<uint64_t>(counts[i])); + if (counts[i] != 0 && childrenReader[i] != nullptr + && shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { + childrenReader[i]->skip(static_cast<uint64_t>(counts[i]), readPhase); } } return numValues; } - void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) { - nextInternal<false>(rowBatch, numValues, notNull); + void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + nextInternal<false>(rowBatch, numValues, notNull, readPhase); } void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - nextInternal<true>(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + nextInternal<true>(rowBatch, numValues, notNull, readPhase); } template <bool encoded> void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch); uint64_t* offsets = unionBatch.offsets.data(); int64_t* counts = childrenCounts.data(); @@ -1268,25 +1279,25 @@ namespace orc { } // read the right number of each child column for (size_t i = 0; i < numChildren; ++i) { - if (childrenReader[i] != nullptr) { + if (childrenReader[i] != nullptr && shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { if (encoded) { childrenReader[i]->nextEncoded(*(unionBatch.children[i]), - static_cast<uint64_t>(counts[i]), nullptr); + static_cast<uint64_t>(counts[i]), nullptr, readPhase); } else { childrenReader[i]->next(*(unionBatch.children[i]), static_cast<uint64_t>(counts[i]), - nullptr); + nullptr, readPhase); } } } } void UnionColumnReader::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); rle->seek(positions.at(columnId)); for (size_t i = 0; i < numChildren; ++i) { - if (childrenReader[i] != nullptr) { - childrenReader[i]->seekToRowGroup(positions); + if (childrenReader[i] != nullptr && shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), readPhase)) { + childrenReader[i]->seekToRowGroup(positions, readPhase); } } } @@ -1360,11 +1371,11 @@ namespace orc { Decimal64ColumnReader(const Type& type, StripeStreams& stipe); ~Decimal64ColumnReader() override; - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; - void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override; + void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) override; }; const uint32_t Decimal64ColumnReader::MAX_PRECISION_64; const uint32_t Decimal64ColumnReader::MAX_PRECISION_128; @@ -1407,8 +1418,8 @@ namespace orc { // PASS } - uint64_t Decimal64ColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t Decimal64ColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); uint64_t skipped = 0; while (skipped < numValues) { readBuffer(); @@ -1420,8 +1431,8 @@ namespace orc { return numValues; } - void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch); int64_t* values = batch.values.data(); @@ -1463,8 +1474,8 @@ namespace orc { } void Decimal64ColumnReader::seekToRowGroup( - std::unordered_map<uint64_t, PositionProvider>& positions) { - ColumnReader::seekToRowGroup(positions); + std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase) { + ColumnReader::seekToRowGroup(positions, readPhase); valueStream->seek(positions.at(columnId)); scaleDecoder->seek(positions.at(columnId)); // clear buffer state after seek @@ -1477,7 +1488,7 @@ namespace orc { Decimal128ColumnReader(const Type& type, StripeStreams& stipe); ~Decimal128ColumnReader() override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; private: void readInt128(Int128& value, int32_t currentScale) { @@ -1510,8 +1521,8 @@ namespace orc { } void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch); Int128* values = batch.values.data(); @@ -1543,9 +1554,9 @@ namespace orc { Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe); ~Decimal64ColumnReaderV2() override; - uint64_t skip(uint64_t numValues) override; + uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; }; Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe) @@ -1566,15 +1577,15 @@ namespace orc { // PASS } - uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues, const ReadPhase& readPhase) { + numValues = ColumnReader::skip(numValues, readPhase); valueDecoder->skip(numValues); return numValues; } void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch); valueDecoder->next(batch.values.data(), numValues, notNull); @@ -1628,7 +1639,7 @@ namespace orc { DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe); ~DecimalHive11ColumnReader() override; - void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase) override; }; DecimalHive11ColumnReader::DecimalHive11ColumnReader(const Type& type, StripeStreams& stripe) @@ -1643,8 +1654,8 @@ namespace orc { } void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, - char* notNull) { - ColumnReader::next(rowBatch, numValues, notNull); + char* notNull, const ReadPhase& readPhase) { + ColumnReader::next(rowBatch, numValues, notNull, readPhase); notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch); Int128* values = batch.values.data(); diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh index 3b765cb..25363e2 100644 --- a/c++/src/ColumnReader.hh +++ b/c++/src/ColumnReader.hh @@ -109,21 +109,30 @@ namespace orc { class ColumnReader { protected: std::unique_ptr<ByteRleDecoder> notNullDecoder; + const Type& type; uint64_t columnId; MemoryPool& memoryPool; ReaderMetrics* metrics; + static bool shouldProcessChild(ReaderCategory readerCategory, const ReadPhase& readPhase) { + return readPhase.contains(readerCategory) || readerCategory == ReaderCategory::FILTER_PARENT; + } + public: ColumnReader(const Type& type, StripeStreams& stipe); virtual ~ColumnReader(); + const Type& getType() const { + return type; + } + /** * Skip number of specified rows. * @param numValues the number of values to skip * @return the number of non-null values skipped */ - virtual uint64_t skip(uint64_t numValues); + virtual uint64_t skip(uint64_t numValues, const ReadPhase& readPhase = ReadPhase::ALL); /** * Read the next group of values into this rowBatch. @@ -133,7 +142,7 @@ namespace orc { * a mask (with at least numValues bytes) for which values to * set. */ - virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull); + virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase = ReadPhase::ALL); /** * Read the next group of values without decoding @@ -143,16 +152,16 @@ namespace orc { * a mask (with at least numValues bytes) for which values to * set. */ - virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) { + virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const ReadPhase& readPhase = ReadPhase::ALL) { rowBatch.isEncoded = false; - next(rowBatch, numValues, notNull); + next(rowBatch, numValues, notNull, readPhase); } /** * Seek to beginning of a row group in the current stripe * @param positions a list of PositionProviders storing the positions */ - virtual void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions); + virtual void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions, const ReadPhase& readPhase = ReadPhase::ALL); }; /** diff --git a/c++/src/Options.hh b/c++/src/Options.hh index 151434e..40a583e 100644 --- a/c++/src/Options.hh +++ b/c++/src/Options.hh @@ -34,6 +34,13 @@ namespace orc { ColumnSelection_TYPE_IDS = 3, }; + enum ColumnFilter { + ColumnFilter_NONE = 0, + ColumnFilter_NAMES = 1, + ColumnFilter_FIELD_IDS = 2, + ColumnFilter_TYPE_IDS = 3, + }; + /** * ReaderOptions Implementation */ @@ -130,6 +137,9 @@ namespace orc { ColumnSelection selection; std::list<uint64_t> includedColumnIndexes; std::list<std::string> includedColumnNames; + ColumnFilter filter; + std::list<uint64_t> filterColumnIndexes; + std::list<std::string> filterColumnNames; uint64_t dataStart; uint64_t dataLength; bool throwOnHive11DecimalOverflow; @@ -214,6 +224,20 @@ namespace orc { return *this; } + RowReaderOptions& RowReaderOptions::filter(const std::list<uint64_t>& filterColIndexes) { + privateBits->filter = ColumnFilter_FIELD_IDS; + privateBits->filterColumnIndexes.assign(filterColIndexes.begin(), filterColIndexes.end()); + privateBits->filterColumnNames.clear(); + return *this; + } + + RowReaderOptions& RowReaderOptions::filter(const std::list<std::string>& filterColNames) { + privateBits->filter = ColumnFilter_NAMES; + privateBits->filterColumnNames.assign(filterColNames.begin(), filterColNames.end()); + privateBits->filterColumnIndexes.clear(); + return *this; + } + RowReaderOptions& RowReaderOptions::range(uint64_t offset, uint64_t length) { privateBits->dataStart = offset; privateBits->dataLength = length; @@ -240,6 +264,10 @@ namespace orc { return privateBits->includedColumnNames; } + const std::list<std::string>& RowReaderOptions::getFilterColNames() const { + return privateBits->filterColumnNames; + } + uint64_t RowReaderOptions::getOffset() const { return privateBits->dataStart; } diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 8ccb0ec..52052fb 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -247,7 +247,8 @@ namespace orc { } RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents, - const RowReaderOptions& opts) + const RowReaderOptions& opts, + const ORCFilter* _filter) : localTimezone(getLocalTimezone()), contents(_contents), throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()), @@ -255,7 +256,8 @@ namespace orc { footer(contents->footer.get()), firstRowOfStripe(*contents->pool, 0), enableEncodedBlock(opts.getEnableLazyDecoding()), - readerTimezone(getTimezoneByName(opts.getTimezoneName())) { + readerTimezone(getTimezoneByName(opts.getTimezoneName())), + filter(_filter) { uint64_t numberOfStripes; numberOfStripes = static_cast<uint64_t>(footer->stripes_size()); currentStripe = numberOfStripes; @@ -309,6 +311,37 @@ namespace orc { } skipBloomFilters = hasBadBloomFilters(); + + const std::list<std::string>& filterCols = opts.getFilterColNames(); + + // Map columnNames to ColumnIds + buildTypeNameIdMap(contents->schema.get()); + + std::unordered_set<int> filterColIds; + if (!filterCols.empty()) { + for (auto& colName: filterCols) { + auto iter = nameTypeMap.find(colName); + if (iter != nameTypeMap.end()) { + Type* type = iter->second; + while (type != nullptr) { + if (type->getSubtypeCount() == 0) { + type->setReaderCategory(ReaderCategory::FILTER_CHILD); + } else { + type->setReaderCategory(ReaderCategory::FILTER_PARENT); + } + filterColIds.emplace(type->getColumnId()); + type = type->getParent(); + } + } else { + throw ParseError("Invalid column selected " + colName); + } + } + startReadPhase = ReadPhase::LEADERS; + readerContext = std::unique_ptr<ReaderContext>(new ReaderContext()); + readerContext->setFilterCallback(std::move(filterColIds), filter); + } else { + startReadPhase = ReadPhase::ALL; + } } // Check if the file has inconsistent bloom filters. @@ -337,7 +370,43 @@ namespace orc { return false; } - CompressionKind RowReaderImpl::getCompression() const { + /** + * Recurses over a type tree and build two maps + * map<TypeName, TypeId>, map<TypeId, Type> + */ + void RowReaderImpl::buildTypeNameIdMap(Type* type) { + // map<type_id, Type*> + idTypeMap[type->getColumnId()] = type; + + if (STRUCT == type->getKind()) { + for (size_t i = 0; i < type->getSubtypeCount(); ++i) { + const std::string& fieldName = type->getFieldName(i); + columns.push_back(fieldName); + nameTypeMap[toDotColumnPath()] = type->getSubtype(i); + buildTypeNameIdMap(type->getSubtype(i)); + columns.pop_back(); + } + } else { + // other non-primitive type + for (size_t j = 0; j < type->getSubtypeCount(); ++j) { + buildTypeNameIdMap(type->getSubtype(j)); + } + } + } + + std::string RowReaderImpl::toDotColumnPath() { + if (columns.empty()) { + return std::string(); + } + std::ostringstream columnStream; + std::copy(columns.begin(), columns.end(), + std::ostream_iterator<std::string>(columnStream, ".")); + std::string columnPath = columnStream.str(); + return columnPath.substr(0, columnPath.length() - 1); + } + + + CompressionKind RowReaderImpl::getCompression() const { return contents->compression; } @@ -401,7 +470,7 @@ namespace orc { // current stripe doesn't have row indexes currentStripe = seekToStripe; currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe]; - startNextStripe(); + startNextStripe(startReadPhase); if (currentStripe >= lastStripe) { return; } @@ -422,14 +491,14 @@ namespace orc { loadStripeIndex(); } // TODO(ORC-1175): process the failures of loadStripeIndex() call - seekToRowGroup(static_cast<uint32_t>(rowsToSkip / rowIndexStride)); + seekToRowGroup(static_cast<uint32_t>(rowsToSkip / rowIndexStride), startReadPhase); // skip leading rows in the target row group rowsToSkip %= rowIndexStride; } // 'reader' is reset in startNextStripe(). It could be nullptr if 'rowsToSkip' is 0, // e.g. when startNextStripe() skips all remaining rows of the file. if (rowsToSkip > 0) { - reader->skip(rowsToSkip); + reader->skip(rowsToSkip, startReadPhase); } } @@ -477,7 +546,7 @@ namespace orc { } } - void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) { + void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId, const ReadPhase& readPhase) { // store positions for selected columns std::list<std::list<uint64_t>> positions; // store position providers for selected colimns @@ -497,7 +566,7 @@ namespace orc { positionProviders.insert(std::make_pair(colId, PositionProvider(position))); } - reader->seekToRowGroup(positionProviders); + reader->seekToRowGroup(positionProviders, readPhase); } const FileContents& RowReaderImpl::getFileContents() const { @@ -826,17 +895,17 @@ namespace orc { } } - std::unique_ptr<RowReader> ReaderImpl::createRowReader() const { + std::unique_ptr<RowReader> ReaderImpl::createRowReader(const ORCFilter* filter) const { RowReaderOptions defaultOpts; - return createRowReader(defaultOpts); + return createRowReader(defaultOpts, filter); } - std::unique_ptr<RowReader> ReaderImpl::createRowReader(const RowReaderOptions& opts) const { + std::unique_ptr<RowReader> ReaderImpl::createRowReader(const RowReaderOptions& opts, const ORCFilter* filter) const { if (opts.getSearchArgument() && !isMetadataLoaded) { // load stripe statistics for PPD readMetadata(); } - return std::make_unique<RowReaderImpl>(contents, opts); + return std::make_unique<RowReaderImpl>(contents, opts, filter); } uint64_t maxStreamsForType(const proto::Type& type) { @@ -1020,10 +1089,11 @@ namespace orc { } } - void RowReaderImpl::startNextStripe() { + void RowReaderImpl::startNextStripe(const ReadPhase& readPhase) { reader.reset(); // ColumnReaders use lots of memory; free old memory first rowIndexes.clear(); bloomFilterIndex.clear(); + followRowInStripe = 0; // evaluate file statistics if it exists if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer, numRowGroupsInStripeRange)) { @@ -1051,6 +1121,9 @@ namespace orc { rowsInCurrentStripe = currentStripeInfo.numberofrows(); processingStripe = currentStripe; + // read row group statistics and bloom filters of current stripe + loadStripeIndex(); + if (sargsApplier) { bool isStripeNeeded = true; if (contents->metadata) { @@ -1064,9 +1137,6 @@ namespace orc { } if (isStripeNeeded) { - // read row group statistics and bloom filters of current stripe - loadStripeIndex(); - // select row groups to read in the current stripe sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, bloomFilterIndex); if (sargsApplier->hasSelectedFrom(currentRowInStripe)) { @@ -1100,7 +1170,7 @@ namespace orc { sargsApplier->getNextSkippedRows()); previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1; if (currentRowInStripe > 0) { - seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride())); + seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()), readPhase); } } } else { @@ -1110,56 +1180,175 @@ namespace orc { } bool RowReaderImpl::next(ColumnVectorBatch& data) { + return nextBatch(data, nullptr) != 0; + } + + uint64_t RowReaderImpl::nextBatch(ColumnVectorBatch& data, void* arg) { SCOPED_STOPWATCH(contents->readerMetrics, ReaderInclusiveLatencyUs, ReaderCall); - if (currentStripe >= lastStripe) { - data.numElements = 0; - markEndOfFile(); - return false; - } - if (currentRowInStripe == 0) { - startNextStripe(); - } - uint64_t rowsToRead = - std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe - currentRowInStripe); - if (sargsApplier && rowsToRead > 0) { - rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe, rowsInCurrentStripe, - footer->rowindexstride(), sargsApplier->getNextSkippedRows()); - } - data.numElements = rowsToRead; - if (rowsToRead == 0) { - markEndOfFile(); - return false; - } - if (enableEncodedBlock) { - reader->nextEncoded(data, rowsToRead, nullptr); - } else { - reader->next(data, rowsToRead, nullptr); - } - // update row number - previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe; - currentRowInStripe += rowsToRead; + uint64_t readRows = 0; + uint64_t rowsToRead = 0; + // do...while is required to handle the case where the filter eliminates all rows in the + // batch, we never return an empty batch unless the file is exhausted + do { + if (currentStripe >= lastStripe) { + data.numElements = 0; + markEndOfFile(); + return readRows; + } + if (currentRowInStripe == 0) { + startNextStripe(startReadPhase); + followRowInStripe = currentRowInStripe; + } + rowsToRead = + std::min(static_cast<uint64_t>(data.capacity), + rowsInCurrentStripe - currentRowInStripe); + if (sargsApplier) { + rowsToRead = computeBatchSize(rowsToRead, + currentRowInStripe, + rowsInCurrentStripe, + footer->rowindexstride(), + sargsApplier->getNextSkippedRows()); + } + if (rowsToRead == 0) { + markEndOfFile(); + return readRows; + } + uint16_t sel_rowid_idx[rowsToRead]; + nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx, arg); + + if (startReadPhase == ReadPhase::LEADERS && data.numElements > 0) { + // At least 1 row has been selected and as a result we read the follow columns into the + // row batch + nextBatch(data, rowsToRead, + prepareFollowReaders(footer->rowindexstride(), + currentRowInStripe, followRowInStripe), sel_rowid_idx, arg); + followRowInStripe = currentRowInStripe + rowsToRead; + } + + // update row number + previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe; + currentRowInStripe += rowsToRead; + readRows += rowsToRead; // check if we need to advance to next selected row group if (sargsApplier) { uint64_t nextRowToRead = advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe, footer->rowindexstride(), sargsApplier->getNextSkippedRows()); - if (currentRowInStripe != nextRowToRead) { - // it is guaranteed to be at start of a row group - currentRowInStripe = nextRowToRead; - if (currentRowInStripe < rowsInCurrentStripe) { - seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride())); + if (currentRowInStripe != nextRowToRead) { + // it is guaranteed to be at start of a row group + currentRowInStripe = nextRowToRead; + if (currentRowInStripe < rowsInCurrentStripe) { + seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()), + startReadPhase); + } } } + + if (currentRowInStripe >= rowsInCurrentStripe) { + currentStripe += 1; + currentRowInStripe = 0; + } + } while (rowsToRead != 0 && data.numElements == 0); + return readRows; + } + + void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& readPhase, uint16_t* sel_rowid_idx, void* arg) { + if (enableEncodedBlock) { + reader->nextEncoded(data, batchSize, nullptr, readPhase); + } + else { + reader->next(data, batchSize, nullptr, readPhase); + } + if (readPhase == ReadPhase::ALL || readPhase == ReadPhase::LEADERS) { + // Set the batch size when reading everything or when reading FILTER columns + data.numElements = batchSize; } - if (currentRowInStripe >= rowsInCurrentStripe) { - currentStripe += 1; - currentRowInStripe = 0; + if (readPhase == ReadPhase::LEADERS) { + // Apply filter callback to reduce number of # rows selected for decoding in the next + // TreeReaders + if (readerContext->getFilterCallback()) { + readerContext->getFilterCallback()->filter(data, sel_rowid_idx, batchSize, arg); + } } - return rowsToRead != 0; } + /** + * Determine the RowGroup based on the supplied row id. + * @param rowIdx Row for which the row group is being determined + * @return Id of the RowGroup that the row belongs to + */ + int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) { + return rowIndexStride == 0 ? 0 : (int) (rowIdx / rowIndexStride); + } + + /** + * This method prepares the non-filter column readers for next batch. This involves the following + * 1. Determine position + * 2. Perform IO if required + * 3. Position the non-filter readers + * + * This method is repositioning the non-filter columns and as such this method shall never have to + * deal with navigating the stripe forward or skipping row groups, all of this should have already + * taken place based on the filter columns. + * @param toFollowRow The rowIdx identifies the required row position within the stripe for + * follow read + * @param fromFollowRow Indicates the current position of the follow read, exclusive + * @return the read phase for reading non-filter columns, this shall be FOLLOWERS_AND_PARENTS in + * case of a seek otherwise will be FOLLOWERS + */ + ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow, long fromFollowRow) { + // 1. Determine the required row group and skip rows needed from the RG start + int needRG = computeRGIdx(rowIndexStride, toFollowRow); + // The current row is not yet read so we -1 to compute the previously read row group + int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1); + long skipRows; + if (needRG == readRG && toFollowRow >= fromFollowRow) { + // In case we are skipping forward within the same row group, we compute skip rows from the + // current position + skipRows = toFollowRow - fromFollowRow; + } else { + // In all other cases including seeking backwards, we compute the skip rows from the start of + // the required row group + skipRows = toFollowRow - (needRG * rowIndexStride); + } + + // 2. Plan the row group idx for the non-filter columns if this has not already taken place + if (needsFollowColumnsRead) { + needsFollowColumnsRead = false; + } + + // 3. Position the non-filter readers to the required RG and skipRows + ReadPhase result = ReadPhase::FOLLOWERS; + if (needRG != readRG || toFollowRow < fromFollowRow) { + // When having to change a row group or in case of back navigation, seek both the filter + // parents and non-filter. This will re-position the parents present vector. This is needed + // to determine the number of non-null values to skip on the non-filter columns. + seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS); + // skip rows on both the filter parents and non-filter as both have been positioned in the + // previous step + reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS); + result = ReadPhase::FOLLOWERS_AND_PARENTS; + } else if (skipRows > 0) { + // in case we are only skipping within the row group, position the filter parents back to the + // position of the follow. This is required to determine the non-null values to skip on the + // non-filter columns. + seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS); + reader->skip(fromFollowRow - (readRG * rowIndexStride), ReadPhase::LEADER_PARENTS); + // Move both the filter parents and non-filter forward, this will compute the correct + // non-null skips on follow children + reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS); + result = ReadPhase::FOLLOWERS_AND_PARENTS; + } + // Identifies the read level that should be performed for the read + // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both non-filter and filter parents + // FOLLOWERS indicates read only of the non-filter level without the parents, which is used during + // contiguous read. During a contiguous read no skips are needed and the non-null information of + // the parent is available in the column vector for use during non-filter read + return result; + } + uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t currentRowInStripe, uint64_t rowsInCurrentStripe, uint64_t rowIndexStride, const std::vector<uint64_t>& nextSkippedRows) { diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index ea6db3a..0c52387 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -34,6 +34,25 @@ namespace orc { static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024; + class ReaderContext { + public: + ReaderContext() = default; + + const ORCFilter* getFilterCallback() const { + return filter; + } + + ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds, const ORCFilter* _filter) { + this->filterColumnIds = std::move(_filterColumnIds); + this->filter = _filter; + return *this; + } + + private: + std::unordered_set<int> filterColumnIds; + const ORCFilter* filter; + }; + /** * WriterVersion Implementation */ @@ -150,6 +169,7 @@ namespace orc { uint64_t lastStripe; // the stripe AFTER the last one uint64_t processingStripe; uint64_t currentRowInStripe; + uint64_t followRowInStripe; uint64_t rowsInCurrentStripe; // number of row groups between first stripe and last stripe uint64_t numRowGroupsInStripeRange; @@ -160,7 +180,7 @@ namespace orc { bool enableEncodedBlock; bool useTightNumericVector; // internal methods - void startNextStripe(); + void startNextStripe(const ReadPhase& readPhase); inline void markEndOfFile(); // row index of current stripe with column id as the key @@ -172,6 +192,15 @@ namespace orc { // desired timezone to return data of timestamp types. const Timezone& readerTimezone; + std::unique_ptr<ReaderContext> readerContext; + const ORCFilter* filter; + ReadPhase startReadPhase; + bool needsFollowColumnsRead; + + std::map<uint64_t, const Type*> idTypeMap; + std::map<std::string, Type*> nameTypeMap; + std::vector<std::string> columns; + // load stripe index if not done so void loadStripeIndex(); @@ -199,7 +228,7 @@ namespace orc { * Seek to the start of a row group in the current stripe * @param rowGroupEntryId the row group id to seek to */ - void seekToRowGroup(uint32_t rowGroupEntryId); + void seekToRowGroup(uint32_t rowGroupEntryId, const ReadPhase& readPhase); /** * Check if the file has bad bloom filters. We will skip using them in the @@ -208,13 +237,25 @@ namespace orc { */ bool hasBadBloomFilters(); - public: + + // build map from type name and id, id to Type + void buildTypeNameIdMap(Type* type); + + std::string toDotColumnPath(); + + void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& readPhase, uint16_t* sel_rowid_idx, void* arg); + + int computeRGIdx(uint64_t rowIndexStride, long rowIdx); + + ReadPhase prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow, long fromFollowRow); + + public: /** * Constructor that lets the user specify additional options. * @param contents of the file * @param options options for reading */ - RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& options); + RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& options, const ORCFilter* filter = nullptr); // Select the columns from the options object const std::vector<bool> getSelectedColumns() const override; @@ -223,6 +264,8 @@ namespace orc { std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size) const override; + uint64_t nextBatch(ColumnVectorBatch& data, void* arg = nullptr) override; + bool next(ColumnVectorBatch& data) override; CompressionKind getCompression() const; @@ -312,9 +355,9 @@ namespace orc { std::unique_ptr<StripeStatistics> getStripeStatistics(uint64_t stripeIndex) const override; - std::unique_ptr<RowReader> createRowReader() const override; + std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter = nullptr) const override; - std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options) const override; + std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options, const ORCFilter* filter = nullptr) const override; uint64_t getContentLength() const override; uint64_t getStripeStatisticsLength() const override; diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc index 0075d04..47095b3 100644 --- a/c++/src/TypeImpl.cc +++ b/c++/src/TypeImpl.cc @@ -25,6 +25,12 @@ namespace orc { + const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories({ ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT, ReaderCategory::NON_FILTER }); + const ReadPhase ReadPhase::LEADERS = ReadPhase::fromCategories({ ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT }); + const ReadPhase ReadPhase::FOLLOWERS = ReadPhase::fromCategories({ ReaderCategory::NON_FILTER }); + const ReadPhase ReadPhase::LEADER_PARENTS = ReadPhase::fromCategories({ ReaderCategory::FILTER_PARENT }); + const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS = ReadPhase::fromCategories({ ReaderCategory::FILTER_PARENT, ReaderCategory::NON_FILTER }); + Type::~Type() { // PASS } @@ -38,6 +44,7 @@ namespace orc { precision = 0; scale = 0; subtypeCount = 0; + readerCategory = ReaderCategory::NON_FILTER; } TypeImpl::TypeImpl(TypeKind _kind, uint64_t _maxLength) { @@ -49,6 +56,7 @@ namespace orc { precision = 0; scale = 0; subtypeCount = 0; + readerCategory = ReaderCategory::NON_FILTER; } TypeImpl::TypeImpl(TypeKind _kind, uint64_t _precision, uint64_t _scale) { @@ -60,6 +68,7 @@ namespace orc { precision = _precision; scale = _scale; subtypeCount = 0; + readerCategory = ReaderCategory::NON_FILTER; } uint64_t TypeImpl::assignIds(uint64_t root) const { @@ -75,8 +84,8 @@ namespace orc { void TypeImpl::ensureIdAssigned() const { if (columnId == -1) { const TypeImpl* root = this; - while (root->parent != nullptr) { - root = root->parent; + while (root->getParent() != nullptr) { + root = dynamic_cast<const TypeImpl*>(root->getParent()); } root->assignIds(0); } @@ -100,10 +109,18 @@ namespace orc { return subtypeCount; } + Type* TypeImpl::getParent() const { + return parent; + } + const Type* TypeImpl::getSubtype(uint64_t i) const { return subTypes[i].get(); } + Type* TypeImpl::getSubtype(uint64_t i) { + return subTypes[i].get(); + } + const std::string& TypeImpl::getFieldName(uint64_t i) const { return fieldNames[i]; } @@ -155,6 +172,14 @@ namespace orc { return it->second; } + ReaderCategory TypeImpl::getReaderCategory() const { + return readerCategory; + } + + void TypeImpl::setReaderCategory(ReaderCategory _readerCategory) { + readerCategory = _readerCategory; + } + void TypeImpl::setIds(uint64_t _columnId, uint64_t _maxColumnId) { columnId = static_cast<int64_t>(_columnId); maximumColumnId = static_cast<int64_t>(_maxColumnId); diff --git a/c++/src/TypeImpl.hh b/c++/src/TypeImpl.hh index 6d07437..6bc4a84 100644 --- a/c++/src/TypeImpl.hh +++ b/c++/src/TypeImpl.hh @@ -41,6 +41,7 @@ namespace orc { uint64_t precision; uint64_t scale; std::map<std::string, std::string> attributes; + ReaderCategory readerCategory; public: /** @@ -66,8 +67,12 @@ namespace orc { uint64_t getSubtypeCount() const override; + Type* getParent() const override; + const Type* getSubtype(uint64_t i) const override; + Type* getSubtype(uint64_t i) override; + const std::string& getFieldName(uint64_t i) const override; uint64_t getMaximumLength() const override; @@ -86,6 +91,10 @@ namespace orc { std::string getAttributeValue(const std::string& key) const override; + ReaderCategory getReaderCategory() const override; + + void setReaderCategory(ReaderCategory _readerCategory) override; + std::string toString() const override; const Type* getTypeByColumnId(uint64_t colIdx) const override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org