This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/orc by this push:
     new 0e53506  [Feature] Implements selection vector for ORC lazy 
materialization. (#62)
0e53506 is described below

commit 0e53506146c965a5a71f0582691ab2ea148dae7c
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Sat May 6 15:46:41 2023 +0800

    [Feature] Implements selection vector for ORC lazy materialization. (#62)
    
    1. Implements selection vector for ORC lazy materialization.
    From the test, currently implements float/double, date/timestamp, decimal, 
string dict types for better performance,
    and other types have performance penalty.
    2. Decrease `loadStripIndex()` call count.
    3. Adjust code format.
---
 c++/include/orc/Reader.hh |  35 +--
 c++/include/orc/Type.hh   |  48 ++--
 c++/src/ColumnReader.cc   | 672 +++++++++++++++++++++++++++++++++++++++-------
 c++/src/ColumnReader.hh   |  23 +-
 c++/src/Reader.cc         | 222 +++++++--------
 c++/src/Reader.hh         |  45 ++--
 c++/src/TypeImpl.cc       |  14 +-
 7 files changed, 784 insertions(+), 275 deletions(-)

diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 2666b4d..429c1ed 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -186,26 +186,25 @@ namespace orc {
     RowReaderOptions& includeTypes(const std::list<uint64_t>& types);
 
     /**
-      * For files that have structs as the top-level object, filter the fields.
-      * by index. The first field is 0, the second 1, and so on. By default,
-      * all columns are read. This option clears any previous setting of
-      * the selected columns.
-      * @param filterColIndexes a list of fields to read
-      * @return this
-      */
+     * For files that have structs as the top-level object, filter the fields.
+     * by index. The first field is 0, the second 1, and so on. By default,
+     * all columns are read. This option clears any previous setting of
+     * the selected columns.
+     * @param filterColIndexes a list of fields to read
+     * @return this
+     */
     RowReaderOptions& filter(const std::list<uint64_t>& filterColIndexes);
 
     /**
-      * For files that have structs as the top-level object, filter the fields
-      * by name. By default, all columns are read. This option clears
-      * any previous setting of the selected columns.
-      * @param filterColNames a list of fields to read
-      * @return this
-      */
+     * For files that have structs as the top-level object, filter the fields
+     * by name. By default, all columns are read. This option clears
+     * any previous setting of the selected columns.
+     * @param filterColNames a list of fields to read
+     * @return this
+     */
     RowReaderOptions& filter(const std::list<std::string>& filterColNames);
 
-
-      /**
+    /**
      * A map type of <typeId, ReadIntent>.
      */
     typedef std::map<uint64_t, ReadIntent> IdReadIntentMap;
@@ -367,7 +366,8 @@ namespace orc {
   class ORCFilter {
    public:
     virtual ~ORCFilter() = default;
-    virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size, 
void* arg = nullptr) const = 0;
+    virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size,
+                        void* arg = nullptr) const = 0;
   };
 
   class RowReader;
@@ -562,7 +562,8 @@ namespace orc {
      * @param options RowReader Options
      * @return a RowReader to read the rows
      */
-    virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& 
options, const ORCFilter* filter = nullptr) const = 0;
+    virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& 
options,
+                                                       const ORCFilter* filter 
= nullptr) const = 0;
 
     /**
      * Get the name of the input stream.
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index 73b813e..1f0901e 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -27,37 +27,37 @@
 
 namespace orc {
   enum class ReaderCategory {
-      FILTER_CHILD,    // Primitive type that is a filter column
-      FILTER_PARENT,   // Compound type with filter children
-      NON_FILTER       // Non-filter column
+    FILTER_CHILD,   // Primitive type that is a filter column
+    FILTER_PARENT,  // Compound type with filter children
+    NON_FILTER      // Non-filter column
   };
 
   class ReadPhase {
    public:
-      static const int NUM_CATEGORIES = 3;  // Number of values in 
ReaderCategory
-      std::bitset<NUM_CATEGORIES> categories;
-
-      static const ReadPhase ALL;
-      static const ReadPhase LEADERS;
-      static const ReadPhase FOLLOWERS;
-      static const ReadPhase LEADER_PARENTS;
-      static const ReadPhase FOLLOWERS_AND_PARENTS;
-
-      static ReadPhase fromCategories(const 
std::unordered_set<ReaderCategory>& cats) {
-        ReadPhase phase;
-        for (ReaderCategory cat : cats) {
-          phase.categories.set(static_cast<size_t>(cat));
-        }
-        return phase;
+    static const int NUM_CATEGORIES = 3;  // Number of values in ReaderCategory
+    std::bitset<NUM_CATEGORIES> categories;
+
+    static const ReadPhase ALL;
+    static const ReadPhase LEADERS;
+    static const ReadPhase FOLLOWERS;
+    static const ReadPhase LEADER_PARENTS;
+    static const ReadPhase FOLLOWERS_AND_PARENTS;
+
+    static ReadPhase fromCategories(const std::unordered_set<ReaderCategory>& 
cats) {
+      ReadPhase phase;
+      for (ReaderCategory cat : cats) {
+        phase.categories.set(static_cast<size_t>(cat));
       }
+      return phase;
+    }
 
-      bool contains(ReaderCategory cat) const {
-        return categories.test(static_cast<size_t>(cat));
-      }
+    bool contains(ReaderCategory cat) const {
+      return categories.test(static_cast<size_t>(cat));
+    }
 
-      bool operator==(const ReadPhase& other) const {
-        return categories == other.categories;
-      }
+    bool operator==(const ReadPhase& other) const {
+      return categories == other.categories;
+    }
   };
 
   enum TypeKind {
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index cabcdbe..8c6e166 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -85,7 +85,8 @@ namespace orc {
     return numValues;
   }
 
-  void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* incomingMask, const ReadPhase& readPhase) {
+  void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* incomingMask,
+                          const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size) {
     if (numValues > rowBatch.capacity) {
       rowBatch.resize(numValues);
     }
@@ -101,19 +102,17 @@ namespace orc {
           return;
         }
       }
-      rowBatch.hasNulls = false;
     } else if (incomingMask) {
       // If we don't have a notNull stream, copy the incomingMask
       rowBatch.hasNulls = true;
       memcpy(rowBatch.notNull.data(), incomingMask, numValues);
       return;
-    } else {
-      rowBatch.hasNulls = false;
-      memset(rowBatch.notNull.data(), 1, numValues);
     }
+    rowBatch.hasNulls = false;
   }
 
-  void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions, const ReadPhase& readPhase) {
+  void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions,
+                                    const ReadPhase& readPhase) {
     if (notNullDecoder.get()) {
       notNullDecoder->seek(positions.at(columnId));
     }
@@ -148,9 +147,11 @@ namespace orc {
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
   };
 
   template <typename BatchType>
@@ -176,7 +177,8 @@ namespace orc {
 
   template <typename BatchType>
   void BooleanColumnReader<BatchType>::next(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
-                                            char* notNull, const ReadPhase& 
readPhase) {
+                                            char* notNull, const ReadPhase& 
readPhase,
+                                            uint16_t* sel_rowid_idx, size_t 
sel_size) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     // Since the byte rle places the output in a char* and BatchType here may 
be
     // LongVectorBatch with long*. We cheat here in that case and use the long*
@@ -215,7 +217,8 @@ namespace orc {
       return numValues;
     }
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override {
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override {
       ColumnReader::next(rowBatch, numValues, notNull, readPhase);
       // Since the byte rle places the output in a char* instead of long*,
       // we cheat here and use the long* and then expand it in a second pass.
@@ -225,7 +228,8 @@ namespace orc {
       expandBytesToIntegers(ptr, numValues);
     }
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override {
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override {
       ColumnReader::seekToRowGroup(positions, readPhase);
       rle->seek(positions.at(columnId));
     }
@@ -255,13 +259,15 @@ namespace orc {
       return numValues;
     }
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override {
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override {
       ColumnReader::next(rowBatch, numValues, notNull, readPhase);
       rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues,
                 rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
     }
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override {
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override {
       ColumnReader::seekToRowGroup(positions, readPhase);
       rle->seek(positions.at(columnId));
     }
@@ -276,15 +282,24 @@ namespace orc {
     const int64_t epochOffset;
     const bool sameTimezone;
 
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase);
+    void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                size_t sel_size);
+    uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase);
+
    public:
     TimestampColumnReader(const Type& type, StripeStreams& stripe, bool 
isInstantType);
     ~TimestampColumnReader() override;
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
   };
 
   TimestampColumnReader::TimestampColumnReader(const Type& type, 
StripeStreams& stripe,
@@ -310,12 +325,28 @@ namespace orc {
 
   uint64_t TimestampColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
     numValues = ColumnReader::skip(numValues, readPhase);
+    numValues = skipInternal(numValues, readPhase);
+    return numValues;
+  }
+
+  uint64_t TimestampColumnReader::skipInternal(uint64_t numValues, const 
ReadPhase& readPhase) {
     secondsRle->skip(numValues);
     nanoRle->skip(numValues);
     return numValues;
   }
 
-  void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull, const ReadPhase& readPhase) {
+  void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                   const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                   size_t sel_size) {
+    if (sel_rowid_idx == nullptr) {
+      nextInternal(rowBatch, numValues, notNull, readPhase);
+    } else {
+      nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
+    }
+  }
+
+  void TimestampColumnReader::nextInternal(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
+                                           char* notNull, const ReadPhase& 
readPhase) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     TimestampVectorBatch& timestampBatch = 
dynamic_cast<TimestampVectorBatch&>(rowBatch);
@@ -356,6 +387,51 @@ namespace orc {
     }
   }
 
+  void TimestampColumnReader::nextInternalWithFilter(ColumnVectorBatch& 
rowBatch,
+                                                     uint64_t numValues, char* 
notNull,
+                                                     const ReadPhase& 
readPhase,
+                                                     uint16_t* sel_rowid_idx, 
size_t sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+    TimestampVectorBatch& timestampBatch = 
dynamic_cast<TimestampVectorBatch&>(rowBatch);
+    int64_t* secsBuffer = timestampBatch.data.data();
+    secondsRle->next(secsBuffer, numValues, notNull);
+    int64_t* nanoBuffer = timestampBatch.nanoseconds.data();
+    nanoRle->next(nanoBuffer, numValues, notNull);
+
+    // Construct the values
+    for (size_t i = 0; i < sel_size; i++) {
+      uint16_t idx = sel_rowid_idx[i];
+      if (notNull == nullptr || notNull[idx]) {
+        uint64_t zeros = nanoBuffer[idx] & 0x7;
+        nanoBuffer[idx] >>= 3;
+        if (zeros != 0) {
+          for (uint64_t j = 0; j <= zeros; ++j) {
+            nanoBuffer[idx] *= 10;
+          }
+        }
+        int64_t writerTime = secsBuffer[idx] + epochOffset;
+        if (!sameTimezone) {
+          // adjust timestamp value to same wall clock time if writer and 
reader
+          // time zones have different rules, which is required for Apache Orc.
+          const auto& wv = writerTimezone.getVariant(writerTime);
+          const auto& rv = readerTimezone.getVariant(writerTime);
+          if (!wv.hasSameTzRule(rv)) {
+            // If the timezone adjustment moves the millis across a DST 
boundary,
+            // we need to reevaluate the offsets.
+            int64_t adjustedTime = writerTime + wv.gmtOffset - rv.gmtOffset;
+            const auto& adjustedReader = 
readerTimezone.getVariant(adjustedTime);
+            writerTime = writerTime + wv.gmtOffset - adjustedReader.gmtOffset;
+          }
+        }
+        secsBuffer[idx] = writerTime;
+        if (secsBuffer[idx] < 0 && nanoBuffer[i] > 999999) {
+          secsBuffer[idx] -= 1;
+        }
+      }
+    }
+  }
+
   void TimestampColumnReader::seekToRowGroup(
       std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
     ColumnReader::seekToRowGroup(positions, readPhase);
@@ -371,9 +447,11 @@ namespace orc {
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
 
    private:
     std::unique_ptr<SeekableInputStream> inputStream;
@@ -381,6 +459,13 @@ namespace orc {
     const char* bufferPointer;
     const char* bufferEnd;
 
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase);
+
+    void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                size_t sel_size);
+
     unsigned char readByte() {
       if (bufferPointer == bufferEnd) {
         int length;
@@ -442,6 +527,8 @@ namespace orc {
       }
       return static_cast<FloatType>(*result);
     }
+
+    uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase);
   };
 
   template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
@@ -456,7 +543,12 @@ namespace orc {
   uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::skip(
       uint64_t numValues, const ReadPhase& readPhase) {
     numValues = ColumnReader::skip(numValues, readPhase);
+    return skipInternal(numValues, readPhase);
+  }
 
+  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
+  uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::skipInternal(
+      uint64_t numValues, const ReadPhase& readPhase) {
     if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue * 
numValues) {
       bufferPointer += bytesPerValue * numValues;
     } else {
@@ -477,6 +569,17 @@ namespace orc {
 
   template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
   void DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::next(
+      ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const 
ReadPhase& readPhase,
+      uint16_t* sel_rowid_idx, size_t sel_size) {
+    if (sel_rowid_idx == nullptr) {
+      nextInternal(rowBatch, numValues, notNull, readPhase);
+    } else {
+      nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
+    }
+  }
+
+  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
+  void DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::nextInternal(
       ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const 
ReadPhase& readPhase) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     // update the notNull from the parent class
@@ -521,6 +624,68 @@ namespace orc {
     }
   }
 
+  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
+  void DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::nextInternalWithFilter(
+      ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const 
ReadPhase& readPhase,
+      uint16_t* sel_rowid_idx, size_t sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
+    // update the notNull from the parent class
+    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+    ValueType* outArray =
+        
reinterpret_cast<ValueType*>(dynamic_cast<BatchType&>(rowBatch).data.data());
+    uint16_t previousIdx = 0;
+
+    if constexpr (columnKind == FLOAT) {
+      if (notNull) {
+        for (size_t i = 0; i < sel_size; i++) {
+          uint16_t idx = sel_rowid_idx[i];
+          if (idx - previousIdx > 0) {
+            skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), 
readPhase);
+          }
+          if (notNull[idx]) {
+            outArray[idx] = readFloat<ValueType>();
+          }
+          previousIdx = idx + 1;
+        }
+        skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), 
readPhase);
+      } else {
+        for (size_t i = 0; i < sel_size; i++) {
+          uint16_t idx = sel_rowid_idx[i];
+          if (idx - previousIdx > 0) {
+            skipInternal(idx - previousIdx, readPhase);
+          }
+          outArray[idx] = readFloat<ValueType>();
+          previousIdx = idx + 1;
+        }
+        skipInternal(numValues - previousIdx, readPhase);
+      }
+    } else {
+      if (notNull) {
+        for (size_t i = 0; i < sel_size; i++) {
+          uint16_t idx = sel_rowid_idx[i];
+          if (idx - previousIdx > 0) {
+            skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), 
readPhase);
+          }
+          if (notNull[idx]) {
+            outArray[idx] = readDouble<ValueType>();
+          }
+          previousIdx = idx + 1;
+        }
+        skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), 
readPhase);
+      } else {
+        for (size_t i = 0; i < sel_size; i++) {
+          uint16_t idx = sel_rowid_idx[i];
+          if (idx - previousIdx > 0) {
+            skipInternal(idx - previousIdx, readPhase);
+          }
+          outArray[idx] = readDouble<ValueType>();
+          previousIdx = idx + 1;
+        }
+        skipInternal(numValues - previousIdx, readPhase);
+      }
+    }
+  }
+
   template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
   void DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::seekToRowGroup(
       std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
@@ -552,17 +717,26 @@ namespace orc {
     std::shared_ptr<StringDictionary> dictionary;
     std::unique_ptr<RleDecoder> rle;
 
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase);
+    void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                size_t sel_size);
+
    public:
     StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
     ~StringDictionaryColumnReader() override;
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                     const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
   };
 
   StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type,
@@ -613,7 +787,17 @@ namespace orc {
   }
 
   void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
-                                          char* notNull, const ReadPhase& 
readPhase) {
+                                          char* notNull, const ReadPhase& 
readPhase,
+                                          uint16_t* sel_rowid_idx, size_t 
sel_size) {
+    if (sel_rowid_idx == nullptr) {
+      nextInternal(rowBatch, numValues, notNull, readPhase);
+    } else {
+      nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
+    }
+  }
+
+  void StringDictionaryColumnReader::nextInternal(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
+                                                  char* notNull, const 
ReadPhase& readPhase) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     // update the notNull from the parent class
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
@@ -647,8 +831,58 @@ namespace orc {
     }
   }
 
+  void StringDictionaryColumnReader::nextInternalWithFilter(ColumnVectorBatch& 
rowBatch,
+                                                            uint64_t 
numValues, char* notNull,
+                                                            const ReadPhase& 
readPhase,
+                                                            uint16_t* 
sel_rowid_idx,
+                                                            size_t sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+    // update the notNull from the parent class
+    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+    StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
+    char* blob = dictionary->dictionaryBlob.data();
+    int64_t* dictionaryOffsets = dictionary->dictionaryOffset.data();
+    char** outputStarts = byteBatch.data.data();
+    int64_t* outputLengths = byteBatch.length.data();
+    std::unique_ptr<int64_t[]> tmpOutputLengths(new 
int64_t[byteBatch.length.size()]);
+    rle->next(tmpOutputLengths.get(), numValues, notNull);
+    uint64_t dictionaryCount = dictionary->dictionaryOffset.size() - 1;
+    if (notNull) {
+      for (size_t i = 0; i < numValues; i++) {
+        outputStarts[i] = nullptr;
+        outputLengths[i] = 0;
+      }
+      for (size_t i = 0; i < sel_size; i++) {
+        uint16_t idx = sel_rowid_idx[i];
+        if (notNull[idx]) {
+          int64_t entry = tmpOutputLengths[idx];
+          if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
+            throw ParseError("Entry index out of range in 
StringDictionaryColumn");
+          }
+          outputStarts[idx] = blob + dictionaryOffsets[entry];
+          outputLengths[idx] = dictionaryOffsets[entry + 1] - 
dictionaryOffsets[entry];
+        }
+      }
+    } else {
+      for (size_t i = 0; i < numValues; i++) {
+        outputStarts[i] = nullptr;
+        outputLengths[i] = 0;
+      }
+      for (size_t i = 0; i < sel_size; i++) {
+        uint16_t idx = sel_rowid_idx[i];
+        int64_t entry = tmpOutputLengths[idx];
+        if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
+          throw ParseError("Entry index out of range in 
StringDictionaryColumn");
+        }
+        outputStarts[idx] = blob + dictionaryOffsets[entry];
+        outputLengths[idx] = dictionaryOffsets[entry + 1] - 
dictionaryOffsets[entry];
+      }
+    }
+  }
+
   void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
-                                                 char* notNull, const 
ReadPhase& readPhase) {
+                                                 char* notNull, const 
ReadPhase& readPhase,
+                                                 uint16_t* sel_rowid_idx, 
size_t sel_size) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     rowBatch.isEncoded = true;
@@ -688,9 +922,11 @@ namespace orc {
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
   };
 
   StringDirectColumnReader::StringDirectColumnReader(const Type& type, 
StripeStreams& stripe)
@@ -760,7 +996,8 @@ namespace orc {
   }
 
   void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                      char* notNull, const ReadPhase& 
readPhase) {
+                                      char* notNull, const ReadPhase& 
readPhase,
+                                      uint16_t* sel_rowid_idx, size_t 
sel_size) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     // update the notNull from the parent class
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
@@ -836,15 +1073,19 @@ namespace orc {
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                     const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
 
    private:
     template <bool encoded>
-    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase);
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size);
   };
 
   StructColumnReader::StructColumnReader(const Type& type, StripeStreams& 
stripe,
@@ -879,35 +1120,40 @@ namespace orc {
     return numValues;
   }
 
-  void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull, const ReadPhase& readPhase) {
-    nextInternal<false>(rowBatch, numValues, notNull, readPhase);
+  void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                size_t sel_size) {
+    nextInternal<false>(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
   }
 
   void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                       char* notNull, const ReadPhase& 
readPhase) {
-    nextInternal<true>(rowBatch, numValues, notNull, readPhase);
+                                       char* notNull, const ReadPhase& 
readPhase,
+                                       uint16_t* sel_rowid_idx, size_t 
sel_size) {
+    nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
   }
 
   template <bool encoded>
   void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                        char* notNull, const ReadPhase& 
readPhase) {
-    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+                                        char* notNull, const ReadPhase& 
readPhase,
+                                        uint16_t* sel_rowid_idx, size_t 
sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
     uint64_t i = 0;
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
       if (shouldProcessChild((*iter)->getType().getReaderCategory(), 
readPhase)) {
         if (encoded) {
-          (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch 
&>(rowBatch).fields[i]), numValues,
-                               notNull, readPhase);
+          
(*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), 
numValues,
+                               notNull, readPhase, sel_rowid_idx, sel_size);
         } else {
-          (*iter)->next(*(dynamic_cast<StructVectorBatch 
&>(rowBatch).fields[i]), numValues, notNull, readPhase);
+          
(*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), 
numValues, notNull,
+                        readPhase, sel_rowid_idx, sel_size);
         }
       }
     }
   }
 
-  void StructColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+  void StructColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions,
+                                          const ReadPhase& readPhase) {
     ColumnReader::seekToRowGroup(positions, readPhase);
 
     for (auto& ptr : children) {
@@ -928,15 +1174,19 @@ namespace orc {
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                     const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
 
    private:
     template <bool encoded>
-    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase);
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size);
   };
 
   ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe,
@@ -982,19 +1232,23 @@ namespace orc {
     return numValues;
   }
 
-  void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull, const ReadPhase& readPhase) {
-    nextInternal<false>(rowBatch, numValues, notNull, readPhase);
+  void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull,
+                              const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                              size_t sel_size) {
+    nextInternal<false>(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
   }
 
-  void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                     char* notNull, const ReadPhase& 
readPhase) {
-    nextInternal<true>(rowBatch, numValues, notNull, readPhase);
+  void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                     const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                     size_t sel_size) {
+    nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
   }
 
   template <bool encoded>
   void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                      char* notNull, const ReadPhase& 
readPhase) {
-    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+                                      char* notNull, const ReadPhase& 
readPhase,
+                                      uint16_t* sel_rowid_idx, size_t 
sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
     ListVectorBatch& listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
     int64_t* offsets = listBatch.offsets.data();
     notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
@@ -1021,14 +1275,17 @@ namespace orc {
     ColumnReader* childReader = child.get();
     if (childReader) {
       if (encoded) {
-        childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, 
nullptr, readPhase);
+        childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, 
nullptr, readPhase,
+                                 sel_rowid_idx, sel_size);
       } else {
-        childReader->next(*(listBatch.elements.get()), totalChildren, nullptr, 
readPhase);
+        childReader->next(*(listBatch.elements.get()), totalChildren, nullptr, 
readPhase,
+                          sel_rowid_idx, sel_size);
       }
     }
   }
 
-  void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions, const ReadPhase& readPhase) {
+  void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions,
+                                        const ReadPhase& readPhase) {
     ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
     if (child.get()) {
@@ -1048,15 +1305,19 @@ namespace orc {
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                     const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
 
    private:
     template <bool encoded>
-    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase);
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size);
   };
 
   MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe,
@@ -1112,19 +1373,22 @@ namespace orc {
     return numValues;
   }
 
-  void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull, const ReadPhase& readPhase) {
-    nextInternal<false>(rowBatch, numValues, notNull, readPhase);
+  void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull,
+                             const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx, size_t sel_size) {
+    nextInternal<false>(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
   }
 
-  void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                    char* notNull, const ReadPhase& readPhase) 
{
-    nextInternal<true>(rowBatch, numValues, notNull, readPhase);
+  void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                    const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                    size_t sel_size) {
+    nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
   }
 
   template <bool encoded>
-  void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                     char* notNull, const ReadPhase& 
readPhase) {
-    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+  void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                     const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                     size_t sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
     MapVectorBatch& mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
     int64_t* offsets = mapBatch.offsets.data();
     notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
@@ -1159,14 +1423,16 @@ namespace orc {
     ColumnReader* rawElementReader = elementReader.get();
     if (rawElementReader) {
       if (encoded) {
-        rawElementReader->nextEncoded(*(mapBatch.elements.get()), 
totalChildren, nullptr, readPhase);
+        rawElementReader->nextEncoded(*(mapBatch.elements.get()), 
totalChildren, nullptr,
+                                      readPhase);
       } else {
         rawElementReader->next(*(mapBatch.elements.get()), totalChildren, 
nullptr, readPhase);
       }
     }
   }
 
-  void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions, const ReadPhase& readPhase) {
+  void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions,
+                                       const ReadPhase& readPhase) {
     ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
     if (keyReader.get()) {
@@ -1189,15 +1455,19 @@ namespace orc {
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                     const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
 
    private:
     template <bool encoded>
-    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase);
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase, uint16_t* sel_rowid_idx, 
size_t sel_size);
   };
 
   UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe,
@@ -1237,26 +1507,30 @@ namespace orc {
       lengthsRead += chunk;
     }
     for (size_t i = 0; i < numChildren; ++i) {
-      if (counts[i] != 0 && childrenReader[i] != nullptr
-        && 
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
+      if (counts[i] != 0 && childrenReader[i] != nullptr &&
+          shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
         childrenReader[i]->skip(static_cast<uint64_t>(counts[i]), readPhase);
       }
     }
     return numValues;
   }
 
-  void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull, const ReadPhase& readPhase) {
-    nextInternal<false>(rowBatch, numValues, notNull, readPhase);
+  void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                               const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                               size_t sel_size) {
+    nextInternal<false>(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
   }
 
   void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                      char* notNull, const ReadPhase& 
readPhase) {
-    nextInternal<true>(rowBatch, numValues, notNull, readPhase);
+                                      char* notNull, const ReadPhase& 
readPhase,
+                                      uint16_t* sel_rowid_idx, size_t 
sel_size) {
+    nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx, 
sel_size);
   }
 
   template <bool encoded>
   void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                       char* notNull, const ReadPhase& 
readPhase) {
+                                       char* notNull, const ReadPhase& 
readPhase,
+                                       uint16_t* sel_rowid_idx, size_t 
sel_size) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
     uint64_t* offsets = unionBatch.offsets.data();
@@ -1279,7 +1553,8 @@ namespace orc {
     }
     // read the right number of each child column
     for (size_t i = 0; i < numChildren; ++i) {
-      if (childrenReader[i] != nullptr && 
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
+      if (childrenReader[i] != nullptr &&
+          shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
         if (encoded) {
           childrenReader[i]->nextEncoded(*(unionBatch.children[i]),
                                          static_cast<uint64_t>(counts[i]), 
nullptr, readPhase);
@@ -1291,12 +1566,13 @@ namespace orc {
     }
   }
 
-  void UnionColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+  void UnionColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions,
+                                         const ReadPhase& readPhase) {
     ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
     for (size_t i = 0; i < numChildren; ++i) {
-      if (childrenReader[i] != nullptr && 
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
+      if (childrenReader[i] != nullptr &&
+          shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
         childrenReader[i]->seekToRowGroup(positions, readPhase);
       }
     }
@@ -1367,15 +1643,24 @@ namespace orc {
       }
     }
 
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase);
+    void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                size_t sel_size);
+    uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase);
+
    public:
     Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
     ~Decimal64ColumnReader() override;
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions,
+                        const ReadPhase& readPhase) override;
   };
   const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
   const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
@@ -1420,6 +1705,12 @@ namespace orc {
 
   uint64_t Decimal64ColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
     numValues = ColumnReader::skip(numValues, readPhase);
+    numValues = skipInternal(numValues, readPhase);
+    scaleDecoder->skip(numValues);
+    return numValues;
+  }
+
+  uint64_t Decimal64ColumnReader::skipInternal(uint64_t numValues, const 
ReadPhase& readPhase) {
     uint64_t skipped = 0;
     while (skipped < numValues) {
       readBuffer();
@@ -1427,11 +1718,21 @@ namespace orc {
         skipped += 1;
       }
     }
-    scaleDecoder->skip(numValues);
     return numValues;
   }
 
-  void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull, const ReadPhase& readPhase) {
+  void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                   const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                   size_t sel_size) {
+    if (sel_rowid_idx == nullptr) {
+      nextInternal(rowBatch, numValues, notNull, readPhase);
+    } else {
+      nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
+    }
+  }
+
+  void Decimal64ColumnReader::nextInternal(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
+                                           char* notNull, const ReadPhase& 
readPhase) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     Decimal64VectorBatch& batch = 
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
@@ -1454,6 +1755,46 @@ namespace orc {
     }
   }
 
+  void Decimal64ColumnReader::nextInternalWithFilter(ColumnVectorBatch& 
rowBatch,
+                                                     uint64_t numValues, char* 
notNull,
+                                                     const ReadPhase& 
readPhase,
+                                                     uint16_t* sel_rowid_idx, 
size_t sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+    Decimal64VectorBatch& batch = 
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
+    int64_t* values = batch.values.data();
+    int64_t* scaleBuffer = batch.readScales.data();
+    scaleDecoder->next(scaleBuffer, numValues, notNull);
+    batch.precision = precision;
+    batch.scale = scale;
+
+    uint16_t previousIdx = 0;
+    if (notNull) {
+      for (size_t i = 0; i < sel_size; i++) {
+        uint16_t idx = sel_rowid_idx[i];
+        if (idx - previousIdx > 0) {
+          skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), 
readPhase);
+        }
+        if (notNull[idx]) {
+          readInt64(values[idx], static_cast<int32_t>(scaleBuffer[idx]));
+          ;
+        }
+        previousIdx = idx + 1;
+      }
+      skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), 
readPhase);
+    } else {
+      for (size_t i = 0; i < sel_size; i++) {
+        uint16_t idx = sel_rowid_idx[i];
+        if (idx - previousIdx > 0) {
+          skipInternal(idx - previousIdx, readPhase);
+        }
+        readInt64(values[idx], static_cast<int32_t>(scaleBuffer[idx]));
+        previousIdx = idx + 1;
+      }
+      skipInternal(numValues - previousIdx, readPhase);
+    }
+  }
+
   void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
     if (scale > currentScale) {
       while (scale > currentScale) {
@@ -1488,7 +1829,8 @@ namespace orc {
     Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
     ~Decimal128ColumnReader() override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
 
    private:
     void readInt128(Int128& value, int32_t currentScale) {
@@ -1509,6 +1851,13 @@ namespace orc {
       unZigZagInt128(value);
       scaleInt128(value, static_cast<uint32_t>(scale), 
static_cast<uint32_t>(currentScale));
     }
+
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase);
+
+    void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                size_t sel_size);
   };
 
   Decimal128ColumnReader::Decimal128ColumnReader(const Type& type, 
StripeStreams& stripe)
@@ -1520,8 +1869,18 @@ namespace orc {
     // PASS
   }
 
-  void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                    char* notNull, const ReadPhase& readPhase) 
{
+  void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                    const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                    size_t sel_size) {
+    if (sel_rowid_idx == nullptr) {
+      nextInternal(rowBatch, numValues, notNull, readPhase);
+    } else {
+      nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
+    }
+  }
+
+  void Decimal128ColumnReader::nextInternal(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
+                                            char* notNull, const ReadPhase& 
readPhase) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     Decimal128VectorBatch& batch = 
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
@@ -1544,6 +1903,46 @@ namespace orc {
     }
   }
 
+  void Decimal128ColumnReader::nextInternalWithFilter(ColumnVectorBatch& 
rowBatch,
+                                                      uint64_t numValues, 
char* notNull,
+                                                      const ReadPhase& 
readPhase,
+                                                      uint16_t* sel_rowid_idx, 
size_t sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+    Decimal128VectorBatch& batch = 
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
+    Int128* values = batch.values.data();
+    // read the next group of scales
+    int64_t* scaleBuffer = batch.readScales.data();
+    scaleDecoder->next(scaleBuffer, numValues, notNull);
+    batch.precision = precision;
+    batch.scale = scale;
+
+    uint16_t previousIdx = 0;
+    if (notNull) {
+      for (size_t i = 0; i < sel_size; i++) {
+        uint16_t idx = sel_rowid_idx[i];
+        if (idx - previousIdx > 0) {
+          skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), 
readPhase);
+        }
+        if (notNull[idx]) {
+          readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx]));
+        }
+        previousIdx = idx + 1;
+      }
+      skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), 
readPhase);
+    } else {
+      for (size_t i = 0; i < sel_size; i++) {
+        uint16_t idx = sel_rowid_idx[i];
+        if (idx - previousIdx > 0) {
+          skipInternal(idx - previousIdx, readPhase);
+        }
+        readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx]));
+        previousIdx = idx + 1;
+      }
+      skipInternal(numValues - previousIdx, readPhase);
+    }
+  }
+
   class Decimal64ColumnReaderV2 : public ColumnReader {
    protected:
     std::unique_ptr<RleDecoder> valueDecoder;
@@ -1556,7 +1955,8 @@ namespace orc {
 
     uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
   };
 
   Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type, 
StripeStreams& stripe)
@@ -1583,8 +1983,9 @@ namespace orc {
     return numValues;
   }
 
-  void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                     char* notNull, const ReadPhase& 
readPhase) {
+  void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                     const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                     size_t sel_size) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     Decimal64VectorBatch& batch = 
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
@@ -1635,11 +2036,19 @@ namespace orc {
       return value >= MIN_VALUE && value <= MAX_VALUE;
     }
 
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase);
+
+    void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull,
+                                const ReadPhase& readPhase, uint16_t* 
sel_rowid_idx,
+                                size_t sel_size);
+
    public:
     DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
     ~DecimalHive11ColumnReader() override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+              const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t 
sel_size) override;
   };
 
   DecimalHive11ColumnReader::DecimalHive11ColumnReader(const Type& type, 
StripeStreams& stripe)
@@ -1654,7 +2063,17 @@ namespace orc {
   }
 
   void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                       char* notNull, const ReadPhase& 
readPhase) {
+                                       char* notNull, const ReadPhase& 
readPhase,
+                                       uint16_t* sel_rowid_idx, size_t 
sel_size) {
+    if (sel_rowid_idx == nullptr) {
+      nextInternal(rowBatch, numValues, notNull, readPhase);
+    } else {
+      nextInternalWithFilter(rowBatch, numValues, notNull, readPhase, 
sel_rowid_idx, sel_size);
+    }
+  }
+
+  void DecimalHive11ColumnReader::nextInternal(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
+                                               char* notNull, const ReadPhase& 
readPhase) {
     ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     Decimal128VectorBatch& batch = 
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
@@ -1698,6 +2117,67 @@ namespace orc {
     }
   }
 
+  void DecimalHive11ColumnReader::nextInternalWithFilter(ColumnVectorBatch& 
rowBatch,
+                                                         uint64_t numValues, 
char* notNull,
+                                                         const ReadPhase& 
readPhase,
+                                                         uint16_t* 
sel_rowid_idx, size_t sel_size) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+    Decimal128VectorBatch& batch = 
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
+    Int128* values = batch.values.data();
+    // read the next group of scales
+    int64_t* scaleBuffer = batch.readScales.data();
+
+    scaleDecoder->next(scaleBuffer, numValues, notNull);
+
+    batch.precision = precision;
+    batch.scale = scale;
+
+    uint16_t previousIdx = 0;
+    if (notNull) {
+      for (size_t i = 0; i < sel_size; i++) {
+        uint16_t idx = sel_rowid_idx[i];
+        if (idx - previousIdx > 0) {
+          skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx), 
readPhase);
+        }
+        if (notNull[idx]) {
+          if (!readInt128(values[idx], 
static_cast<int32_t>(scaleBuffer[idx]))) {
+            if (throwOnOverflow) {
+              throw ParseError("Hive 0.11 decimal was more than 38 digits.");
+            } else {
+              *errorStream << "Warning: "
+                           << "Hive 0.11 decimal with more than 38 digits "
+                           << "replaced by NULL.\n";
+              notNull[idx] = false;
+            }
+          }
+        }
+        previousIdx = idx + 1;
+      }
+      skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues), 
readPhase);
+    } else {
+      for (size_t i = 0; i < sel_size; i++) {
+        uint16_t idx = sel_rowid_idx[i];
+        if (idx - previousIdx > 0) {
+          skipInternal(idx - previousIdx, readPhase);
+        }
+        if (!readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx]))) {
+          if (throwOnOverflow) {
+            throw ParseError("Hive 0.11 decimal was more than 38 digits.");
+          } else {
+            *errorStream << "Warning: "
+                         << "Hive 0.11 decimal with more than 38 digits "
+                         << "replaced by NULL.\n";
+            batch.hasNulls = true;
+            batch.notNull[idx] = false;
+          }
+        }
+        previousIdx = idx + 1;
+      }
+      skipInternal(numValues - previousIdx, readPhase);
+    }
+  }
+
   static bool isLittleEndian() {
     static union {
       uint32_t i;
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 25363e2..fd89489 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -118,6 +118,16 @@ namespace orc {
       return readPhase.contains(readerCategory) || readerCategory == 
ReaderCategory::FILTER_PARENT;
     }
 
+    static int countNonNullRowsInRange(char* notNull, uint16_t start, uint16_t 
end) {
+      int result = 0;
+      while (start < end) {
+        if (notNull[start++]) {
+          result++;
+        }
+      }
+      return result;
+    }
+
    public:
     ColumnReader(const Type& type, StripeStreams& stipe);
 
@@ -142,7 +152,9 @@ namespace orc {
      *           a mask (with at least numValues bytes) for which values to
      *           set.
      */
-    virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase = ReadPhase::ALL);
+    virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull,
+                      const ReadPhase& readPhase = ReadPhase::ALL,
+                      uint16_t* sel_rowid_idx = nullptr, size_t sel_size = 0);
 
     /**
      * Read the next group of values without decoding
@@ -152,16 +164,19 @@ namespace orc {
      *           a mask (with at least numValues bytes) for which values to
      *           set.
      */
-    virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull, const ReadPhase& readPhase = ReadPhase::ALL) {
+    virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull,
+                             const ReadPhase& readPhase = ReadPhase::ALL,
+                             uint16_t* sel_rowid_idx = nullptr, size_t 
sel_size = 0) {
       rowBatch.isEncoded = false;
-      next(rowBatch, numValues, notNull, readPhase);
+      next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx);
     }
 
     /**
      * Seek to beginning of a row group in the current stripe
      * @param positions a list of PositionProviders storing the positions
      */
-    virtual void seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions, const ReadPhase& readPhase = ReadPhase::ALL);
+    virtual void seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions,
+                                const ReadPhase& readPhase = ReadPhase::ALL);
   };
 
   /**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 52052fb..c132e46 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -247,8 +247,7 @@ namespace orc {
   }
 
   RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
-                               const RowReaderOptions& opts,
-                               const ORCFilter* _filter)
+                               const RowReaderOptions& opts, const ORCFilter* 
_filter)
       : localTimezone(getLocalTimezone()),
         contents(_contents),
         throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
@@ -319,7 +318,7 @@ namespace orc {
 
     std::unordered_set<int> filterColIds;
     if (!filterCols.empty()) {
-      for (auto& colName: filterCols) {
+      for (auto& colName : filterCols) {
         auto iter = nameTypeMap.find(colName);
         if (iter != nameTypeMap.end()) {
           Type* type = iter->second;
@@ -405,8 +404,7 @@ namespace orc {
     return columnPath.substr(0, columnPath.length() - 1);
   }
 
-
-    CompressionKind RowReaderImpl::getCompression() const {
+  CompressionKind RowReaderImpl::getCompression() const {
     return contents->compression;
   }
 
@@ -900,7 +898,8 @@ namespace orc {
     return createRowReader(defaultOpts, filter);
   }
 
-  std::unique_ptr<RowReader> ReaderImpl::createRowReader(const 
RowReaderOptions& opts, const ORCFilter* filter) const {
+  std::unique_ptr<RowReader> ReaderImpl::createRowReader(const 
RowReaderOptions& opts,
+                                                         const ORCFilter* 
filter) const {
     if (opts.getSearchArgument() && !isMetadataLoaded) {
       // load stripe statistics for PPD
       readMetadata();
@@ -1121,9 +1120,6 @@ namespace orc {
       rowsInCurrentStripe = currentStripeInfo.numberofrows();
       processingStripe = currentStripe;
 
-      // read row group statistics and bloom filters of current stripe
-      loadStripeIndex();
-
       if (sargsApplier) {
         bool isStripeNeeded = true;
         if (contents->metadata) {
@@ -1137,6 +1133,8 @@ namespace orc {
         }
 
         if (isStripeNeeded) {
+          // read row group statistics and bloom filters of current stripe
+          loadStripeIndex();
           // select row groups to read in the current stripe
           sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, 
bloomFilterIndex);
           if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
@@ -1170,7 +1168,8 @@ namespace orc {
                                   sargsApplier->getNextSkippedRows());
         previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
         if (currentRowInStripe > 0) {
-          seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / 
footer->rowindexstride()), readPhase);
+          seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / 
footer->rowindexstride()),
+                         readPhase);
         }
       }
     } else {
@@ -1200,29 +1199,29 @@ namespace orc {
         followRowInStripe = currentRowInStripe;
       }
       rowsToRead =
-          std::min(static_cast<uint64_t>(data.capacity),
-                   rowsInCurrentStripe - currentRowInStripe);
+          std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe - 
currentRowInStripe);
       if (sargsApplier) {
-        rowsToRead = computeBatchSize(rowsToRead,
-                                      currentRowInStripe,
-                                      rowsInCurrentStripe,
-                                      footer->rowindexstride(),
-                                      sargsApplier->getNextSkippedRows());
+        rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe, 
rowsInCurrentStripe,
+                                      footer->rowindexstride(), 
sargsApplier->getNextSkippedRows());
       }
       if (rowsToRead == 0) {
         markEndOfFile();
         return readRows;
       }
-      uint16_t sel_rowid_idx[rowsToRead];
-      nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx, arg);
-
-      if (startReadPhase == ReadPhase::LEADERS && data.numElements > 0) {
-        // At least 1 row has been selected and as a result we read the follow 
columns into the
-        // row batch
-        nextBatch(data, rowsToRead,
-                  prepareFollowReaders(footer->rowindexstride(),
-                                       currentRowInStripe, followRowInStripe), 
sel_rowid_idx, arg);
-        followRowInStripe = currentRowInStripe + rowsToRead;
+      if (startReadPhase == ReadPhase::LEADERS) {
+        auto sel_rowid_idx = std::make_unique<uint16_t[]>(rowsToRead);
+        nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx.get(), arg);
+        if (data.numElements > 0) {
+          // At least 1 row has been selected and as a result we read the 
follow columns into the
+          // row batch
+          nextBatch(
+              data, rowsToRead,
+              prepareFollowReaders(footer->rowindexstride(), 
currentRowInStripe, followRowInStripe),
+              sel_rowid_idx.get(), arg);
+          followRowInStripe = currentRowInStripe + rowsToRead;
+        }
+      } else {
+        nextBatch(data, rowsToRead, startReadPhase, nullptr, arg);
       }
 
       // update row number
@@ -1230,11 +1229,11 @@ namespace orc {
       currentRowInStripe += rowsToRead;
       readRows += rowsToRead;
 
-    // check if we need to advance to next selected row group
-    if (sargsApplier) {
-      uint64_t nextRowToRead =
-          advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe, 
footer->rowindexstride(),
-                                sargsApplier->getNextSkippedRows());
+      // check if we need to advance to next selected row group
+      if (sargsApplier) {
+        uint64_t nextRowToRead =
+            advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe, 
footer->rowindexstride(),
+                                  sargsApplier->getNextSkippedRows());
         if (currentRowInStripe != nextRowToRead) {
           // it is guaranteed to be at start of a row group
           currentRowInStripe = nextRowToRead;
@@ -1253,16 +1252,22 @@ namespace orc {
     return readRows;
   }
 
-  void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const 
ReadPhase& readPhase, uint16_t* sel_rowid_idx, void* arg) {
-    if (enableEncodedBlock) {
-      reader->nextEncoded(data, batchSize, nullptr, readPhase);
-    }
-    else {
-      reader->next(data, batchSize, nullptr, readPhase);
-    }
+  void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const 
ReadPhase& readPhase,
+                                uint16_t* sel_rowid_idx, void* arg) {
     if (readPhase == ReadPhase::ALL || readPhase == ReadPhase::LEADERS) {
+      if (enableEncodedBlock) {
+        reader->nextEncoded(data, batchSize, nullptr, readPhase);
+      } else {
+        reader->next(data, batchSize, nullptr, readPhase);
+      }
       // Set the batch size when reading everything or when reading FILTER 
columns
       data.numElements = batchSize;
+    } else {
+      if (enableEncodedBlock) {
+        reader->nextEncoded(data, batchSize, nullptr, readPhase, 
sel_rowid_idx, data.numElements);
+      } else {
+        reader->next(data, batchSize, nullptr, readPhase, sel_rowid_idx, 
data.numElements);
+      }
     }
 
     if (readPhase == ReadPhase::LEADERS) {
@@ -1274,80 +1279,81 @@ namespace orc {
     }
   }
 
-    /**
+  /**
    * Determine the RowGroup based on the supplied row id.
    * @param rowIdx Row for which the row group is being determined
    * @return Id of the RowGroup that the row belongs to
    */
-    int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) {
-      return rowIndexStride == 0 ? 0 : (int) (rowIdx / rowIndexStride);
-    }
-
-    /**
-     * This method prepares the non-filter column readers for next batch. This 
involves the following
-     * 1. Determine position
-     * 2. Perform IO if required
-     * 3. Position the non-filter readers
-     *
-     * This method is repositioning the non-filter columns and as such this 
method shall never have to
-     * deal with navigating the stripe forward or skipping row groups, all of 
this should have already
-     * taken place based on the filter columns.
-     * @param toFollowRow The rowIdx identifies the required row position 
within the stripe for
-     *                    follow read
-     * @param fromFollowRow Indicates the current position of the follow read, 
exclusive
-     * @return the read phase for reading non-filter columns, this shall be 
FOLLOWERS_AND_PARENTS in
-     * case of a seek otherwise will be FOLLOWERS
-     */
-    ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride, 
long toFollowRow, long fromFollowRow) {
-      // 1. Determine the required row group and skip rows needed from the RG 
start
-      int needRG = computeRGIdx(rowIndexStride, toFollowRow);
-      // The current row is not yet read so we -1 to compute the previously 
read row group
-      int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1);
-      long skipRows;
-      if (needRG == readRG && toFollowRow >= fromFollowRow) {
-        // In case we are skipping forward within the same row group, we 
compute skip rows from the
-        // current position
-        skipRows = toFollowRow - fromFollowRow;
-      } else {
-        // In all other cases including seeking backwards, we compute the skip 
rows from the start of
-        // the required row group
-        skipRows = toFollowRow - (needRG * rowIndexStride);
-      }
-
-      // 2. Plan the row group idx for the non-filter columns if this has not 
already taken place
-      if (needsFollowColumnsRead) {
-        needsFollowColumnsRead = false;
-      }
+  int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) {
+    return rowIndexStride == 0 ? 0 : (int)(rowIdx / rowIndexStride);
+  }
 
-      // 3. Position the non-filter readers to the required RG and skipRows
-      ReadPhase result = ReadPhase::FOLLOWERS;
-      if (needRG != readRG || toFollowRow < fromFollowRow) {
-        // When having to change a row group or in case of back navigation, 
seek both the filter
-        // parents and non-filter. This will re-position the parents present 
vector. This is needed
-        // to determine the number of non-null values to skip on the 
non-filter columns.
-        seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS);
-        // skip rows on both the filter parents and non-filter as both have 
been positioned in the
-        // previous step
-        reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
-        result = ReadPhase::FOLLOWERS_AND_PARENTS;
-      } else if (skipRows > 0) {
-        // in case we are only skipping within the row group, position the 
filter parents back to the
-        // position of the follow. This is required to determine the non-null 
values to skip on the
-        // non-filter columns.
-        seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS);
-        reader->skip(fromFollowRow - (readRG * rowIndexStride), 
ReadPhase::LEADER_PARENTS);
-        // Move both the filter parents and non-filter forward, this will 
compute the correct
-        // non-null skips on follow children
-        reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
-        result = ReadPhase::FOLLOWERS_AND_PARENTS;
-      }
-      // Identifies the read level that should be performed for the read
-      // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both 
non-filter and filter parents
-      // FOLLOWERS indicates read only of the non-filter level without the 
parents, which is used during
-      // contiguous read. During a contiguous read no skips are needed and the 
non-null information of
-      // the parent is available in the column vector for use during 
non-filter read
-      return result;
-    }
+  /**
+   * This method prepares the non-filter column readers for next batch. This 
involves the following
+   * 1. Determine position
+   * 2. Perform IO if required
+   * 3. Position the non-filter readers
+   *
+   * This method is repositioning the non-filter columns and as such this 
method shall never have to
+   * deal with navigating the stripe forward or skipping row groups, all of 
this should have already
+   * taken place based on the filter columns.
+   * @param toFollowRow The rowIdx identifies the required row position within 
the stripe for
+   *                    follow read
+   * @param fromFollowRow Indicates the current position of the follow read, 
exclusive
+   * @return the read phase for reading non-filter columns, this shall be 
FOLLOWERS_AND_PARENTS in
+   * case of a seek otherwise will be FOLLOWERS
+   */
+  ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride, long 
toFollowRow,
+                                                long fromFollowRow) {
+    // 1. Determine the required row group and skip rows needed from the RG 
start
+    int needRG = computeRGIdx(rowIndexStride, toFollowRow);
+    // The current row is not yet read so we -1 to compute the previously read 
row group
+    int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1);
+    long skipRows;
+    if (needRG == readRG && toFollowRow >= fromFollowRow) {
+      // In case we are skipping forward within the same row group, we compute 
skip rows from the
+      // current position
+      skipRows = toFollowRow - fromFollowRow;
+    } else {
+      // In all other cases including seeking backwards, we compute the skip 
rows from the start of
+      // the required row group
+      skipRows = toFollowRow - (needRG * rowIndexStride);
+    }
+
+    // 2. Plan the row group idx for the non-filter columns if this has not 
already taken place
+    if (needsFollowColumnsRead) {
+      needsFollowColumnsRead = false;
+    }
+
+    // 3. Position the non-filter readers to the required RG and skipRows
+    ReadPhase result = ReadPhase::FOLLOWERS;
+    if (needRG != readRG || toFollowRow < fromFollowRow) {
+      // When having to change a row group or in case of back navigation, seek 
both the filter
+      // parents and non-filter. This will re-position the parents present 
vector. This is needed
+      // to determine the number of non-null values to skip on the non-filter 
columns.
+      seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS);
+      // skip rows on both the filter parents and non-filter as both have been 
positioned in the
+      // previous step
+      reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
+      result = ReadPhase::FOLLOWERS_AND_PARENTS;
+    } else if (skipRows > 0) {
+      // in case we are only skipping within the row group, position the 
filter parents back to the
+      // position of the follow. This is required to determine the non-null 
values to skip on the
+      // non-filter columns.
+      seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS);
+      reader->skip(fromFollowRow - (readRG * rowIndexStride), 
ReadPhase::LEADER_PARENTS);
+      // Move both the filter parents and non-filter forward, this will 
compute the correct
+      // non-null skips on follow children
+      reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
+      result = ReadPhase::FOLLOWERS_AND_PARENTS;
+    }
+    // Identifies the read level that should be performed for the read
+    // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both 
non-filter and filter parents
+    // FOLLOWERS indicates read only of the non-filter level without the 
parents, which is used
+    // during contiguous read. During a contiguous read no skips are needed 
and the non-null
+    // information of the parent is available in the column vector for use 
during non-filter read
+    return result;
+  }
 
   uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t 
currentRowInStripe,
                                            uint64_t rowsInCurrentStripe, 
uint64_t rowIndexStride,
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 0c52387..d84532c 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -35,22 +35,23 @@ namespace orc {
   static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
 
   class ReaderContext {
-  public:
-   ReaderContext() = default;
-
-   const ORCFilter* getFilterCallback() const {
-     return filter;
-   }
-
-   ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds, 
const ORCFilter* _filter) {
-     this->filterColumnIds = std::move(_filterColumnIds);
-     this->filter = _filter;
-     return *this;
-   }
-
-  private:
-   std::unordered_set<int> filterColumnIds;
-   const ORCFilter* filter;
+   public:
+    ReaderContext() = default;
+
+    const ORCFilter* getFilterCallback() const {
+      return filter;
+    }
+
+    ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds,
+                                     const ORCFilter* _filter) {
+      this->filterColumnIds = std::move(_filterColumnIds);
+      this->filter = _filter;
+      return *this;
+    }
+
+   private:
+    std::unordered_set<int> filterColumnIds;
+    const ORCFilter* filter;
   };
 
   /**
@@ -237,25 +238,26 @@ namespace orc {
      */
     bool hasBadBloomFilters();
 
-
     // build map from type name and id, id to Type
     void buildTypeNameIdMap(Type* type);
 
     std::string toDotColumnPath();
 
-    void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& 
readPhase, uint16_t* sel_rowid_idx, void* arg);
+    void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& 
readPhase,
+                   uint16_t* sel_rowid_idx, void* arg);
 
     int computeRGIdx(uint64_t rowIndexStride, long rowIdx);
 
     ReadPhase prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow, 
long fromFollowRow);
 
-  public:
+   public:
     /**
      * Constructor that lets the user specify additional options.
      * @param contents of the file
      * @param options options for reading
      */
-    RowReaderImpl(std::shared_ptr<FileContents> contents, const 
RowReaderOptions& options, const ORCFilter* filter = nullptr);
+    RowReaderImpl(std::shared_ptr<FileContents> contents, const 
RowReaderOptions& options,
+                  const ORCFilter* filter = nullptr);
 
     // Select the columns from the options object
     const std::vector<bool> getSelectedColumns() const override;
@@ -357,7 +359,8 @@ namespace orc {
 
     std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter = 
nullptr) const override;
 
-    std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& 
options, const ORCFilter* filter = nullptr) const override;
+    std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options,
+                                               const ORCFilter* filter = 
nullptr) const override;
 
     uint64_t getContentLength() const override;
     uint64_t getStripeStatisticsLength() const override;
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index 47095b3..3f28dce 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -25,11 +25,15 @@
 
 namespace orc {
 
-  const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories({ 
ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT, 
ReaderCategory::NON_FILTER });
-  const ReadPhase ReadPhase::LEADERS = ReadPhase::fromCategories({ 
ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT });
-  const ReadPhase ReadPhase::FOLLOWERS = ReadPhase::fromCategories({ 
ReaderCategory::NON_FILTER });
-  const ReadPhase ReadPhase::LEADER_PARENTS = ReadPhase::fromCategories({ 
ReaderCategory::FILTER_PARENT });
-  const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS = 
ReadPhase::fromCategories({ ReaderCategory::FILTER_PARENT, 
ReaderCategory::NON_FILTER });
+  const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories(
+      {ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT, 
ReaderCategory::NON_FILTER});
+  const ReadPhase ReadPhase::LEADERS =
+      ReadPhase::fromCategories({ReaderCategory::FILTER_CHILD, 
ReaderCategory::FILTER_PARENT});
+  const ReadPhase ReadPhase::FOLLOWERS = 
ReadPhase::fromCategories({ReaderCategory::NON_FILTER});
+  const ReadPhase ReadPhase::LEADER_PARENTS =
+      ReadPhase::fromCategories({ReaderCategory::FILTER_PARENT});
+  const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS =
+      ReadPhase::fromCategories({ReaderCategory::FILTER_PARENT, 
ReaderCategory::NON_FILTER});
 
   Type::~Type() {
     // PASS


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to