This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/orc by this push:
     new ff8c18e  [Feature] Implements ORC lazy materialization. (#56)
ff8c18e is described below

commit ff8c18e96d55942f3767a2d41960cabfb0efd144
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Thu Apr 27 13:56:21 2023 +0800

    [Feature] Implements ORC lazy materialization. (#56)
    
    Refer to orc java implementation: LazyIO of non-filter columns in the 
presence of filters
    Note: Row-level filtering by selection vector is currently not implemented, 
will do it in future PR.
---
 .clang-format             |  26 ++++
 c++/include/orc/Reader.hh |  48 ++++++-
 c++/include/orc/Type.hh   |  39 +++++
 c++/src/ColumnReader.cc   | 357 ++++++++++++++++++++++++----------------------
 c++/src/ColumnReader.hh   |  19 ++-
 c++/src/Options.hh        |  28 ++++
 c++/src/Reader.cc         | 295 +++++++++++++++++++++++++++++++-------
 c++/src/Reader.hh         |  55 ++++++-
 c++/src/TypeImpl.cc       |  29 +++-
 c++/src/TypeImpl.hh       |   9 ++
 10 files changed, 663 insertions(+), 242 deletions(-)

diff --git a/.clang-format b/.clang-format
new file mode 100644
index 0000000..0779071
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+Language: Cpp
+BasedOnStyle: Google
+ColumnLimit: 100
+IndentWidth: 2
+NamespaceIndentation: All
+UseTab: Never
+AllowShortFunctionsOnASingleLine: Empty
+DerivePointerAlignment: false
+IncludeBlocks: Preserve
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index d8f83f9..2666b4d 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -186,6 +186,26 @@ namespace orc {
     RowReaderOptions& includeTypes(const std::list<uint64_t>& types);
 
     /**
+      * For files that have structs as the top-level object, filter the fields.
+      * by index. The first field is 0, the second 1, and so on. By default,
+      * all columns are read. This option clears any previous setting of
+      * the selected columns.
+      * @param filterColIndexes a list of fields to read
+      * @return this
+      */
+    RowReaderOptions& filter(const std::list<uint64_t>& filterColIndexes);
+
+    /**
+      * For files that have structs as the top-level object, filter the fields
+      * by name. By default, all columns are read. This option clears
+      * any previous setting of the selected columns.
+      * @param filterColNames a list of fields to read
+      * @return this
+      */
+    RowReaderOptions& filter(const std::list<std::string>& filterColNames);
+
+
+      /**
      * A map type of <typeId, ReadIntent>.
      */
     typedef std::map<uint64_t, ReadIntent> IdReadIntentMap;
@@ -281,6 +301,12 @@ namespace orc {
      */
     const std::list<std::string>& getIncludeNames() const;
 
+    /**
+     * Get the list of selected columns to read. All children of the selected
+     * columns are also selected.
+     */
+    const std::list<std::string>& getFilterColNames() const;
+
     /**
      * Get the start of the range for the data being processed.
      * @return if not set, return 0
@@ -338,6 +364,12 @@ namespace orc {
     bool getUseTightNumericVector() const;
   };
 
+  class ORCFilter {
+   public:
+    virtual ~ORCFilter() = default;
+    virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size, 
void* arg = nullptr) const = 0;
+  };
+
   class RowReader;
 
   /**
@@ -523,14 +555,14 @@ namespace orc {
      * Create a RowReader based on this reader with the default options.
      * @return a RowReader to read the rows
      */
-    virtual std::unique_ptr<RowReader> createRowReader() const = 0;
+    virtual std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter 
= nullptr) const = 0;
 
     /**
      * Create a RowReader based on this reader.
      * @param options RowReader Options
      * @return a RowReader to read the rows
      */
-    virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& 
options) const = 0;
+    virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& 
options, const ORCFilter* filter = nullptr) const = 0;
 
     /**
      * Get the name of the input stream.
@@ -616,13 +648,23 @@ namespace orc {
      */
     virtual std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size) 
const = 0;
 
+    /**
+     * Read the next row batch from the current position.
+     * Caller must look at numElements in the row batch to determine how
+     * many rows were read.
+     * @param data the row batch to read into.
+     * @param arg argument.
+     * @return number of rows.
+     */
+    virtual uint64_t nextBatch(ColumnVectorBatch& data, void* arg = nullptr) = 
0;
+
     /**
      * Read the next row batch from the current position.
      * Caller must look at numElements in the row batch to determine how
      * many rows were read.
      * @param data the row batch to read into.
      * @return true if a non-zero number of rows were read or false if the
-     *   end of the file was reached.
+     * end of the file was reached.
      */
     virtual bool next(ColumnVectorBatch& data) = 0;
 
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index c8ada75..73b813e 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -19,11 +19,46 @@
 #ifndef ORC_TYPE_HH
 #define ORC_TYPE_HH
 
+#include <bitset>
+#include <unordered_set>
 #include "MemoryPool.hh"
 #include "orc/Vector.hh"
 #include "orc/orc-config.hh"
 
 namespace orc {
+  enum class ReaderCategory {
+      FILTER_CHILD,    // Primitive type that is a filter column
+      FILTER_PARENT,   // Compound type with filter children
+      NON_FILTER       // Non-filter column
+  };
+
+  class ReadPhase {
+   public:
+      static const int NUM_CATEGORIES = 3;  // Number of values in 
ReaderCategory
+      std::bitset<NUM_CATEGORIES> categories;
+
+      static const ReadPhase ALL;
+      static const ReadPhase LEADERS;
+      static const ReadPhase FOLLOWERS;
+      static const ReadPhase LEADER_PARENTS;
+      static const ReadPhase FOLLOWERS_AND_PARENTS;
+
+      static ReadPhase fromCategories(const 
std::unordered_set<ReaderCategory>& cats) {
+        ReadPhase phase;
+        for (ReaderCategory cat : cats) {
+          phase.categories.set(static_cast<size_t>(cat));
+        }
+        return phase;
+      }
+
+      bool contains(ReaderCategory cat) const {
+        return categories.test(static_cast<size_t>(cat));
+      }
+
+      bool operator==(const ReadPhase& other) const {
+        return categories == other.categories;
+      }
+  };
 
   enum TypeKind {
     BOOLEAN = 0,
@@ -54,7 +89,9 @@ namespace orc {
     virtual uint64_t getMaximumColumnId() const = 0;
     virtual TypeKind getKind() const = 0;
     virtual uint64_t getSubtypeCount() const = 0;
+    virtual Type* getParent() const = 0;
     virtual const Type* getSubtype(uint64_t childId) const = 0;
+    virtual Type* getSubtype(uint64_t childId) = 0;
     virtual const std::string& getFieldName(uint64_t childId) const = 0;
     virtual uint64_t getMaximumLength() const = 0;
     virtual uint64_t getPrecision() const = 0;
@@ -64,6 +101,8 @@ namespace orc {
     virtual Type& removeAttribute(const std::string& key) = 0;
     virtual std::vector<std::string> getAttributeKeys() const = 0;
     virtual std::string getAttributeValue(const std::string& key) const = 0;
+    virtual ReaderCategory getReaderCategory() const = 0;
+    virtual void setReaderCategory(ReaderCategory readerCategory) = 0;
     virtual std::string toString() const = 0;
     /**
      * Get the Type with the given column ID
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 2a72c80..cabcdbe 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -46,8 +46,9 @@ namespace orc {
     }
   }
 
-  ColumnReader::ColumnReader(const Type& type, StripeStreams& stripe)
-      : columnId(type.getColumnId()),
+  ColumnReader::ColumnReader(const Type& _type, StripeStreams& stripe)
+      : type(_type),
+        columnId(type.getColumnId()),
         memoryPool(stripe.getMemoryPool()),
         metrics(stripe.getReaderMetrics()) {
     std::unique_ptr<SeekableInputStream> stream =
@@ -61,7 +62,7 @@ namespace orc {
     // PASS
   }
 
-  uint64_t ColumnReader::skip(uint64_t numValues) {
+  uint64_t ColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) {
     ByteRleDecoder* decoder = notNullDecoder.get();
     if (decoder) {
       // page through the values that we want to skip
@@ -84,7 +85,7 @@ namespace orc {
     return numValues;
   }
 
-  void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* incomingMask) {
+  void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* incomingMask, const ReadPhase& readPhase) {
     if (numValues > rowBatch.capacity) {
       rowBatch.resize(numValues);
     }
@@ -100,16 +101,19 @@ namespace orc {
           return;
         }
       }
+      rowBatch.hasNulls = false;
     } else if (incomingMask) {
       // If we don't have a notNull stream, copy the incomingMask
       rowBatch.hasNulls = true;
       memcpy(rowBatch.notNull.data(), incomingMask, numValues);
       return;
+    } else {
+      rowBatch.hasNulls = false;
+      memset(rowBatch.notNull.data(), 1, numValues);
     }
-    rowBatch.hasNulls = false;
   }
 
-  void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions) {
+  void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions, const ReadPhase& readPhase) {
     if (notNullDecoder.get()) {
       notNullDecoder->seek(positions.at(columnId));
     }
@@ -142,11 +146,11 @@ namespace orc {
     BooleanColumnReader(const Type& type, StripeStreams& stipe);
     ~BooleanColumnReader() override;
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
   };
 
   template <typename BatchType>
@@ -164,16 +168,16 @@ namespace orc {
   }
 
   template <typename BatchType>
-  uint64_t BooleanColumnReader<BatchType>::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t BooleanColumnReader<BatchType>::skip(uint64_t numValues, const 
ReadPhase& readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     rle->skip(numValues);
     return numValues;
   }
 
   template <typename BatchType>
   void BooleanColumnReader<BatchType>::next(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
-                                            char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                            char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     // Since the byte rle places the output in a char* and BatchType here may 
be
     // LongVectorBatch with long*. We cheat here in that case and use the long*
     // and then expand it in a second pass..
@@ -185,8 +189,8 @@ namespace orc {
 
   template <typename BatchType>
   void BooleanColumnReader<BatchType>::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
   }
 
@@ -205,14 +209,14 @@ namespace orc {
 
     ~ByteColumnReader() override = default;
 
-    uint64_t skip(uint64_t numValues) override {
-      numValues = ColumnReader::skip(numValues);
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override {
+      numValues = ColumnReader::skip(numValues, readPhase);
       rle->skip(numValues);
       return numValues;
     }
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override {
-      ColumnReader::next(rowBatch, numValues, notNull);
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override {
+      ColumnReader::next(rowBatch, numValues, notNull, readPhase);
       // Since the byte rle places the output in a char* instead of long*,
       // we cheat here and use the long* and then expand it in a second pass.
       auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data();
@@ -221,8 +225,8 @@ namespace orc {
       expandBytesToIntegers(ptr, numValues);
     }
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override {
-      ColumnReader::seekToRowGroup(positions);
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override {
+      ColumnReader::seekToRowGroup(positions, readPhase);
       rle->seek(positions.at(columnId));
     }
   };
@@ -245,20 +249,20 @@ namespace orc {
       // PASS
     }
 
-    uint64_t skip(uint64_t numValues) override {
-      numValues = ColumnReader::skip(numValues);
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override {
+      numValues = ColumnReader::skip(numValues, readPhase);
       rle->skip(numValues);
       return numValues;
     }
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override {
-      ColumnReader::next(rowBatch, numValues, notNull);
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override {
+      ColumnReader::next(rowBatch, numValues, notNull, readPhase);
       rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues,
                 rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
     }
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override {
-      ColumnReader::seekToRowGroup(positions);
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override {
+      ColumnReader::seekToRowGroup(positions, readPhase);
       rle->seek(positions.at(columnId));
     }
   };
@@ -276,11 +280,11 @@ namespace orc {
     TimestampColumnReader(const Type& type, StripeStreams& stripe, bool 
isInstantType);
     ~TimestampColumnReader() override;
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
   };
 
   TimestampColumnReader::TimestampColumnReader(const Type& type, 
StripeStreams& stripe,
@@ -304,15 +308,15 @@ namespace orc {
     // PASS
   }
 
-  uint64_t TimestampColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t TimestampColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     secondsRle->skip(numValues);
     nanoRle->skip(numValues);
     return numValues;
   }
 
-  void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+  void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull, const ReadPhase& readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     TimestampVectorBatch& timestampBatch = 
dynamic_cast<TimestampVectorBatch&>(rowBatch);
     int64_t* secsBuffer = timestampBatch.data.data();
@@ -353,8 +357,8 @@ namespace orc {
   }
 
   void TimestampColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     secondsRle->seek(positions.at(columnId));
     nanoRle->seek(positions.at(columnId));
   }
@@ -365,11 +369,11 @@ namespace orc {
     DoubleColumnReader(const Type& type, StripeStreams& stripe);
     ~DoubleColumnReader() override {}
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
 
    private:
     std::unique_ptr<SeekableInputStream> inputStream;
@@ -450,8 +454,8 @@ namespace orc {
 
   template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
   uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::skip(
-      uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+      uint64_t numValues, const ReadPhase& readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
 
     if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue * 
numValues) {
       bufferPointer += bytesPerValue * numValues;
@@ -473,8 +477,8 @@ namespace orc {
 
   template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
   void DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::next(
-      ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+      ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const 
ReadPhase& readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     // update the notNull from the parent class
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     ValueType* outArray =
@@ -519,8 +523,8 @@ namespace orc {
 
   template <TypeKind columnKind, bool isLittleEndian, typename ValueType, 
typename BatchType>
   void DoubleColumnReader<columnKind, isLittleEndian, ValueType, 
BatchType>::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     inputStream->seek(positions.at(columnId));
     // clear buffer state after seek
     bufferEnd = nullptr;
@@ -552,13 +556,13 @@ namespace orc {
     StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
     ~StringDictionaryColumnReader() override;
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
   };
 
   StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type,
@@ -602,15 +606,15 @@ namespace orc {
     // PASS
   }
 
-  uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t StringDictionaryColumnReader::skip(uint64_t numValues, const 
ReadPhase& readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     rle->skip(numValues);
     return numValues;
   }
 
   void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
-                                          char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                          char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     // update the notNull from the parent class
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
@@ -644,8 +648,8 @@ namespace orc {
   }
 
   void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, 
uint64_t numValues,
-                                                 char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                                 char* notNull, const 
ReadPhase& readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     rowBatch.isEncoded = true;
 
@@ -657,8 +661,8 @@ namespace orc {
   }
 
   void StringDictionaryColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
   }
 
@@ -682,11 +686,11 @@ namespace orc {
     StringDirectColumnReader(const Type& type, StripeStreams& stipe);
     ~StringDirectColumnReader() override;
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
   };
 
   StringDirectColumnReader::StringDirectColumnReader(const Type& type, 
StripeStreams& stripe)
@@ -706,9 +710,9 @@ namespace orc {
     // PASS
   }
 
-  uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
+  uint64_t StringDirectColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
     const size_t BUFFER_SIZE = 1024;
-    numValues = ColumnReader::skip(numValues);
+    numValues = ColumnReader::skip(numValues, readPhase);
     int64_t buffer[BUFFER_SIZE];
     uint64_t done = 0;
     size_t totalBytes = 0;
@@ -756,8 +760,8 @@ namespace orc {
   }
 
   void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                      char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                      char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     // update the notNull from the parent class
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
@@ -814,8 +818,8 @@ namespace orc {
   }
 
   void StringDirectColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     blobStream->seek(positions.at(columnId));
     lengthRle->seek(positions.at(columnId));
     // clear buffer state after seek
@@ -830,17 +834,17 @@ namespace orc {
    public:
     StructColumnReader(const Type& type, StripeStreams& stipe, bool 
useTightNumericVector = false);
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
 
    private:
     template <bool encoded>
-    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull);
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase);
   };
 
   StructColumnReader::StructColumnReader(const Type& type, StripeStreams& 
stripe,
@@ -865,45 +869,51 @@ namespace orc {
     }
   }
 
-  uint64_t StructColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t StructColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     for (auto& ptr : children) {
-      ptr->skip(numValues);
+      if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) {
+        ptr->skip(numValues, readPhase);
+      }
     }
     return numValues;
   }
 
-  void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull) {
-    nextInternal<false>(rowBatch, numValues, notNull);
+  void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull, const ReadPhase& readPhase) {
+    nextInternal<false>(rowBatch, numValues, notNull, readPhase);
   }
 
   void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                       char* notNull) {
-    nextInternal<true>(rowBatch, numValues, notNull);
+                                       char* notNull, const ReadPhase& 
readPhase) {
+    nextInternal<true>(rowBatch, numValues, notNull, readPhase);
   }
 
   template <bool encoded>
   void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                        char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                        char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     uint64_t i = 0;
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
-      if (encoded) {
-        
(*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), 
numValues,
-                             notNull);
-      } else {
-        (*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), 
numValues, notNull);
+      if (shouldProcessChild((*iter)->getType().getReaderCategory(), 
readPhase)) {
+        if (encoded) {
+          (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch 
&>(rowBatch).fields[i]), numValues,
+                               notNull, readPhase);
+        } else {
+          (*iter)->next(*(dynamic_cast<StructVectorBatch 
&>(rowBatch).fields[i]), numValues, notNull, readPhase);
+        }
       }
     }
   }
 
   void StructColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
 
     for (auto& ptr : children) {
-      ptr->seekToRowGroup(positions);
+      if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) {
+        ptr->seekToRowGroup(positions, readPhase);
+      }
     }
   }
 
@@ -916,17 +926,17 @@ namespace orc {
     ListColumnReader(const Type& type, StripeStreams& stipe, bool 
useTightNumericVector = false);
     ~ListColumnReader() override;
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
 
    private:
     template <bool encoded>
-    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull);
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase);
   };
 
   ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe,
@@ -949,8 +959,8 @@ namespace orc {
     // PASS
   }
 
-  uint64_t ListColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t ListColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     ColumnReader* childReader = child.get();
     if (childReader) {
       const uint64_t BUFFER_SIZE = 1024;
@@ -965,26 +975,26 @@ namespace orc {
         }
         lengthsRead += chunk;
       }
-      childReader->skip(childrenElements);
+      childReader->skip(childrenElements, readPhase);
     } else {
       rle->skip(numValues);
     }
     return numValues;
   }
 
-  void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull) {
-    nextInternal<false>(rowBatch, numValues, notNull);
+  void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull, const ReadPhase& readPhase) {
+    nextInternal<false>(rowBatch, numValues, notNull, readPhase);
   }
 
   void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                     char* notNull) {
-    nextInternal<true>(rowBatch, numValues, notNull);
+                                     char* notNull, const ReadPhase& 
readPhase) {
+    nextInternal<true>(rowBatch, numValues, notNull, readPhase);
   }
 
   template <bool encoded>
   void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                      char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                      char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     ListVectorBatch& listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
     int64_t* offsets = listBatch.offsets.data();
     notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
@@ -1011,18 +1021,18 @@ namespace orc {
     ColumnReader* childReader = child.get();
     if (childReader) {
       if (encoded) {
-        childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, 
nullptr);
+        childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, 
nullptr, readPhase);
       } else {
-        childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
+        childReader->next(*(listBatch.elements.get()), totalChildren, nullptr, 
readPhase);
       }
     }
   }
 
-  void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+  void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions, const ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
     if (child.get()) {
-      child->seekToRowGroup(positions);
+      child->seekToRowGroup(positions, readPhase);
     }
   }
 
@@ -1036,17 +1046,17 @@ namespace orc {
     MapColumnReader(const Type& type, StripeStreams& stipe, bool 
useTightNumericVector = false);
     ~MapColumnReader() override;
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
 
    private:
     template <bool encoded>
-    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull);
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase);
   };
 
   MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe,
@@ -1073,8 +1083,8 @@ namespace orc {
     // PASS
   }
 
-  uint64_t MapColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t MapColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     ColumnReader* rawKeyReader = keyReader.get();
     ColumnReader* rawElementReader = elementReader.get();
     if (rawKeyReader || rawElementReader) {
@@ -1091,10 +1101,10 @@ namespace orc {
         lengthsRead += chunk;
       }
       if (rawKeyReader) {
-        rawKeyReader->skip(childrenElements);
+        rawKeyReader->skip(childrenElements, readPhase);
       }
       if (rawElementReader) {
-        rawElementReader->skip(childrenElements);
+        rawElementReader->skip(childrenElements, readPhase);
       }
     } else {
       rle->skip(numValues);
@@ -1102,19 +1112,19 @@ namespace orc {
     return numValues;
   }
 
-  void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull) {
-    nextInternal<false>(rowBatch, numValues, notNull);
+  void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull, const ReadPhase& readPhase) {
+    nextInternal<false>(rowBatch, numValues, notNull, readPhase);
   }
 
   void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                    char* notNull) {
-    nextInternal<true>(rowBatch, numValues, notNull);
+                                    char* notNull, const ReadPhase& readPhase) 
{
+    nextInternal<true>(rowBatch, numValues, notNull, readPhase);
   }
 
   template <bool encoded>
   void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                     char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                     char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     MapVectorBatch& mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
     int64_t* offsets = mapBatch.offsets.data();
     notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
@@ -1141,29 +1151,29 @@ namespace orc {
     ColumnReader* rawKeyReader = keyReader.get();
     if (rawKeyReader) {
       if (encoded) {
-        rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren, 
nullptr);
+        rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren, 
nullptr, readPhase);
       } else {
-        rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
+        rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr, 
readPhase);
       }
     }
     ColumnReader* rawElementReader = elementReader.get();
     if (rawElementReader) {
       if (encoded) {
-        rawElementReader->nextEncoded(*(mapBatch.elements.get()), 
totalChildren, nullptr);
+        rawElementReader->nextEncoded(*(mapBatch.elements.get()), 
totalChildren, nullptr, readPhase);
       } else {
-        rawElementReader->next(*(mapBatch.elements.get()), totalChildren, 
nullptr);
+        rawElementReader->next(*(mapBatch.elements.get()), totalChildren, 
nullptr, readPhase);
       }
     }
   }
 
-  void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+  void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions, const ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
     if (keyReader.get()) {
-      keyReader->seekToRowGroup(positions);
+      keyReader->seekToRowGroup(positions, readPhase);
     }
     if (elementReader.get()) {
-      elementReader->seekToRowGroup(positions);
+      elementReader->seekToRowGroup(positions, readPhase);
     }
   }
 
@@ -1177,17 +1187,17 @@ namespace orc {
    public:
     UnionColumnReader(const Type& type, StripeStreams& stipe, bool 
useTightNumericVector = false);
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull) override;
+    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
 
    private:
     template <bool encoded>
-    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull);
+    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase);
   };
 
   UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe,
@@ -1211,8 +1221,8 @@ namespace orc {
     }
   }
 
-  uint64_t UnionColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t UnionColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     const uint64_t BUFFER_SIZE = 1024;
     char buffer[BUFFER_SIZE];
     uint64_t lengthsRead = 0;
@@ -1227,26 +1237,27 @@ namespace orc {
       lengthsRead += chunk;
     }
     for (size_t i = 0; i < numChildren; ++i) {
-      if (counts[i] != 0 && childrenReader[i] != nullptr) {
-        childrenReader[i]->skip(static_cast<uint64_t>(counts[i]));
+      if (counts[i] != 0 && childrenReader[i] != nullptr
+        && 
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
+        childrenReader[i]->skip(static_cast<uint64_t>(counts[i]), readPhase);
       }
     }
     return numValues;
   }
 
-  void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull) {
-    nextInternal<false>(rowBatch, numValues, notNull);
+  void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull, const ReadPhase& readPhase) {
+    nextInternal<false>(rowBatch, numValues, notNull, readPhase);
   }
 
   void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                      char* notNull) {
-    nextInternal<true>(rowBatch, numValues, notNull);
+                                      char* notNull, const ReadPhase& 
readPhase) {
+    nextInternal<true>(rowBatch, numValues, notNull, readPhase);
   }
 
   template <bool encoded>
   void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                       char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                       char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
     uint64_t* offsets = unionBatch.offsets.data();
     int64_t* counts = childrenCounts.data();
@@ -1268,25 +1279,25 @@ namespace orc {
     }
     // read the right number of each child column
     for (size_t i = 0; i < numChildren; ++i) {
-      if (childrenReader[i] != nullptr) {
+      if (childrenReader[i] != nullptr && 
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
         if (encoded) {
           childrenReader[i]->nextEncoded(*(unionBatch.children[i]),
-                                         static_cast<uint64_t>(counts[i]), 
nullptr);
+                                         static_cast<uint64_t>(counts[i]), 
nullptr, readPhase);
         } else {
           childrenReader[i]->next(*(unionBatch.children[i]), 
static_cast<uint64_t>(counts[i]),
-                                  nullptr);
+                                  nullptr, readPhase);
         }
       }
     }
   }
 
   void UnionColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     rle->seek(positions.at(columnId));
     for (size_t i = 0; i < numChildren; ++i) {
-      if (childrenReader[i] != nullptr) {
-        childrenReader[i]->seekToRowGroup(positions);
+      if (childrenReader[i] != nullptr && 
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(), 
readPhase)) {
+        childrenReader[i]->seekToRowGroup(positions, readPhase);
       }
     }
   }
@@ -1360,11 +1371,11 @@ namespace orc {
     Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
     ~Decimal64ColumnReader() override;
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions) override;
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& 
positions, const ReadPhase& readPhase) override;
   };
   const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
   const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
@@ -1407,8 +1418,8 @@ namespace orc {
     // PASS
   }
 
-  uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t Decimal64ColumnReader::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     uint64_t skipped = 0;
     while (skipped < numValues) {
       readBuffer();
@@ -1420,8 +1431,8 @@ namespace orc {
     return numValues;
   }
 
-  void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+  void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues, char* notNull, const ReadPhase& readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     Decimal64VectorBatch& batch = 
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
     int64_t* values = batch.values.data();
@@ -1463,8 +1474,8 @@ namespace orc {
   }
 
   void Decimal64ColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
+      std::unordered_map<uint64_t, PositionProvider>& positions, const 
ReadPhase& readPhase) {
+    ColumnReader::seekToRowGroup(positions, readPhase);
     valueStream->seek(positions.at(columnId));
     scaleDecoder->seek(positions.at(columnId));
     // clear buffer state after seek
@@ -1477,7 +1488,7 @@ namespace orc {
     Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
     ~Decimal128ColumnReader() override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
 
    private:
     void readInt128(Int128& value, int32_t currentScale) {
@@ -1510,8 +1521,8 @@ namespace orc {
   }
 
   void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                    char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                    char* notNull, const ReadPhase& readPhase) 
{
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     Decimal128VectorBatch& batch = 
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
     Int128* values = batch.values.data();
@@ -1543,9 +1554,9 @@ namespace orc {
     Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe);
     ~Decimal64ColumnReaderV2() override;
 
-    uint64_t skip(uint64_t numValues) override;
+    uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
   };
 
   Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type, 
StripeStreams& stripe)
@@ -1566,15 +1577,15 @@ namespace orc {
     // PASS
   }
 
-  uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
+  uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues, const ReadPhase& 
readPhase) {
+    numValues = ColumnReader::skip(numValues, readPhase);
     valueDecoder->skip(numValues);
     return numValues;
   }
 
   void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                     char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                     char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     Decimal64VectorBatch& batch = 
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
     valueDecoder->next(batch.values.data(), numValues, notNull);
@@ -1628,7 +1639,7 @@ namespace orc {
     DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
     ~DecimalHive11ColumnReader() override;
 
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override;
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, 
const ReadPhase& readPhase) override;
   };
 
   DecimalHive11ColumnReader::DecimalHive11ColumnReader(const Type& type, 
StripeStreams& stripe)
@@ -1643,8 +1654,8 @@ namespace orc {
   }
 
   void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t 
numValues,
-                                       char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
+                                       char* notNull, const ReadPhase& 
readPhase) {
+    ColumnReader::next(rowBatch, numValues, notNull, readPhase);
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
     Decimal128VectorBatch& batch = 
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
     Int128* values = batch.values.data();
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 3b765cb..25363e2 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -109,21 +109,30 @@ namespace orc {
   class ColumnReader {
    protected:
     std::unique_ptr<ByteRleDecoder> notNullDecoder;
+    const Type& type;
     uint64_t columnId;
     MemoryPool& memoryPool;
     ReaderMetrics* metrics;
 
+    static bool shouldProcessChild(ReaderCategory readerCategory, const 
ReadPhase& readPhase) {
+      return readPhase.contains(readerCategory) || readerCategory == 
ReaderCategory::FILTER_PARENT;
+    }
+
    public:
     ColumnReader(const Type& type, StripeStreams& stipe);
 
     virtual ~ColumnReader();
 
+    const Type& getType() const {
+      return type;
+    }
+
     /**
      * Skip number of specified rows.
      * @param numValues the number of values to skip
      * @return the number of non-null values skipped
      */
-    virtual uint64_t skip(uint64_t numValues);
+    virtual uint64_t skip(uint64_t numValues, const ReadPhase& readPhase = 
ReadPhase::ALL);
 
     /**
      * Read the next group of values into this rowBatch.
@@ -133,7 +142,7 @@ namespace orc {
      *           a mask (with at least numValues bytes) for which values to
      *           set.
      */
-    virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull);
+    virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* 
notNull, const ReadPhase& readPhase = ReadPhase::ALL);
 
     /**
      * Read the next group of values without decoding
@@ -143,16 +152,16 @@ namespace orc {
      *           a mask (with at least numValues bytes) for which values to
      *           set.
      */
-    virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull) {
+    virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, 
char* notNull, const ReadPhase& readPhase = ReadPhase::ALL) {
       rowBatch.isEncoded = false;
-      next(rowBatch, numValues, notNull);
+      next(rowBatch, numValues, notNull, readPhase);
     }
 
     /**
      * Seek to beginning of a row group in the current stripe
      * @param positions a list of PositionProviders storing the positions
      */
-    virtual void seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions);
+    virtual void seekToRowGroup(std::unordered_map<uint64_t, 
PositionProvider>& positions, const ReadPhase& readPhase = ReadPhase::ALL);
   };
 
   /**
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 151434e..40a583e 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -34,6 +34,13 @@ namespace orc {
     ColumnSelection_TYPE_IDS = 3,
   };
 
+  enum ColumnFilter {
+    ColumnFilter_NONE = 0,
+    ColumnFilter_NAMES = 1,
+    ColumnFilter_FIELD_IDS = 2,
+    ColumnFilter_TYPE_IDS = 3,
+  };
+
   /**
    * ReaderOptions Implementation
    */
@@ -130,6 +137,9 @@ namespace orc {
     ColumnSelection selection;
     std::list<uint64_t> includedColumnIndexes;
     std::list<std::string> includedColumnNames;
+    ColumnFilter filter;
+    std::list<uint64_t> filterColumnIndexes;
+    std::list<std::string> filterColumnNames;
     uint64_t dataStart;
     uint64_t dataLength;
     bool throwOnHive11DecimalOverflow;
@@ -214,6 +224,20 @@ namespace orc {
     return *this;
   }
 
+  RowReaderOptions& RowReaderOptions::filter(const std::list<uint64_t>& 
filterColIndexes) {
+    privateBits->filter = ColumnFilter_FIELD_IDS;
+    privateBits->filterColumnIndexes.assign(filterColIndexes.begin(), 
filterColIndexes.end());
+    privateBits->filterColumnNames.clear();
+    return *this;
+  }
+
+  RowReaderOptions& RowReaderOptions::filter(const std::list<std::string>& 
filterColNames) {
+    privateBits->filter = ColumnFilter_NAMES;
+    privateBits->filterColumnNames.assign(filterColNames.begin(), 
filterColNames.end());
+    privateBits->filterColumnIndexes.clear();
+    return *this;
+  }
+
   RowReaderOptions& RowReaderOptions::range(uint64_t offset, uint64_t length) {
     privateBits->dataStart = offset;
     privateBits->dataLength = length;
@@ -240,6 +264,10 @@ namespace orc {
     return privateBits->includedColumnNames;
   }
 
+  const std::list<std::string>& RowReaderOptions::getFilterColNames() const {
+    return privateBits->filterColumnNames;
+  }
+
   uint64_t RowReaderOptions::getOffset() const {
     return privateBits->dataStart;
   }
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 8ccb0ec..52052fb 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -247,7 +247,8 @@ namespace orc {
   }
 
   RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
-                               const RowReaderOptions& opts)
+                               const RowReaderOptions& opts,
+                               const ORCFilter* _filter)
       : localTimezone(getLocalTimezone()),
         contents(_contents),
         throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
@@ -255,7 +256,8 @@ namespace orc {
         footer(contents->footer.get()),
         firstRowOfStripe(*contents->pool, 0),
         enableEncodedBlock(opts.getEnableLazyDecoding()),
-        readerTimezone(getTimezoneByName(opts.getTimezoneName())) {
+        readerTimezone(getTimezoneByName(opts.getTimezoneName())),
+        filter(_filter) {
     uint64_t numberOfStripes;
     numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
     currentStripe = numberOfStripes;
@@ -309,6 +311,37 @@ namespace orc {
     }
 
     skipBloomFilters = hasBadBloomFilters();
+
+    const std::list<std::string>& filterCols = opts.getFilterColNames();
+
+    // Map columnNames to ColumnIds
+    buildTypeNameIdMap(contents->schema.get());
+
+    std::unordered_set<int> filterColIds;
+    if (!filterCols.empty()) {
+      for (auto& colName: filterCols) {
+        auto iter = nameTypeMap.find(colName);
+        if (iter != nameTypeMap.end()) {
+          Type* type = iter->second;
+          while (type != nullptr) {
+            if (type->getSubtypeCount() == 0) {
+              type->setReaderCategory(ReaderCategory::FILTER_CHILD);
+            } else {
+              type->setReaderCategory(ReaderCategory::FILTER_PARENT);
+            }
+            filterColIds.emplace(type->getColumnId());
+            type = type->getParent();
+          }
+        } else {
+          throw ParseError("Invalid column selected " + colName);
+        }
+      }
+      startReadPhase = ReadPhase::LEADERS;
+      readerContext = std::unique_ptr<ReaderContext>(new ReaderContext());
+      readerContext->setFilterCallback(std::move(filterColIds), filter);
+    } else {
+      startReadPhase = ReadPhase::ALL;
+    }
   }
 
   // Check if the file has inconsistent bloom filters.
@@ -337,7 +370,43 @@ namespace orc {
     return false;
   }
 
-  CompressionKind RowReaderImpl::getCompression() const {
+  /**
+   * Recurses over a type tree and build two maps
+   * map<TypeName, TypeId>, map<TypeId, Type>
+   */
+  void RowReaderImpl::buildTypeNameIdMap(Type* type) {
+    // map<type_id, Type*>
+    idTypeMap[type->getColumnId()] = type;
+
+    if (STRUCT == type->getKind()) {
+      for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
+        const std::string& fieldName = type->getFieldName(i);
+        columns.push_back(fieldName);
+        nameTypeMap[toDotColumnPath()] = type->getSubtype(i);
+        buildTypeNameIdMap(type->getSubtype(i));
+        columns.pop_back();
+      }
+    } else {
+      // other non-primitive type
+      for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
+        buildTypeNameIdMap(type->getSubtype(j));
+      }
+    }
+  }
+
+  std::string RowReaderImpl::toDotColumnPath() {
+    if (columns.empty()) {
+      return std::string();
+    }
+    std::ostringstream columnStream;
+    std::copy(columns.begin(), columns.end(),
+              std::ostream_iterator<std::string>(columnStream, "."));
+    std::string columnPath = columnStream.str();
+    return columnPath.substr(0, columnPath.length() - 1);
+  }
+
+
+    CompressionKind RowReaderImpl::getCompression() const {
     return contents->compression;
   }
 
@@ -401,7 +470,7 @@ namespace orc {
       // current stripe doesn't have row indexes
       currentStripe = seekToStripe;
       currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
-      startNextStripe();
+      startNextStripe(startReadPhase);
       if (currentStripe >= lastStripe) {
         return;
       }
@@ -422,14 +491,14 @@ namespace orc {
         loadStripeIndex();
       }
       // TODO(ORC-1175): process the failures of loadStripeIndex() call
-      seekToRowGroup(static_cast<uint32_t>(rowsToSkip / rowIndexStride));
+      seekToRowGroup(static_cast<uint32_t>(rowsToSkip / rowIndexStride), 
startReadPhase);
       // skip leading rows in the target row group
       rowsToSkip %= rowIndexStride;
     }
     // 'reader' is reset in startNextStripe(). It could be nullptr if 
'rowsToSkip' is 0,
     // e.g. when startNextStripe() skips all remaining rows of the file.
     if (rowsToSkip > 0) {
-      reader->skip(rowsToSkip);
+      reader->skip(rowsToSkip, startReadPhase);
     }
   }
 
@@ -477,7 +546,7 @@ namespace orc {
     }
   }
 
-  void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
+  void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId, const 
ReadPhase& readPhase) {
     // store positions for selected columns
     std::list<std::list<uint64_t>> positions;
     // store position providers for selected colimns
@@ -497,7 +566,7 @@ namespace orc {
       positionProviders.insert(std::make_pair(colId, 
PositionProvider(position)));
     }
 
-    reader->seekToRowGroup(positionProviders);
+    reader->seekToRowGroup(positionProviders, readPhase);
   }
 
   const FileContents& RowReaderImpl::getFileContents() const {
@@ -826,17 +895,17 @@ namespace orc {
     }
   }
 
-  std::unique_ptr<RowReader> ReaderImpl::createRowReader() const {
+  std::unique_ptr<RowReader> ReaderImpl::createRowReader(const ORCFilter* 
filter) const {
     RowReaderOptions defaultOpts;
-    return createRowReader(defaultOpts);
+    return createRowReader(defaultOpts, filter);
   }
 
-  std::unique_ptr<RowReader> ReaderImpl::createRowReader(const 
RowReaderOptions& opts) const {
+  std::unique_ptr<RowReader> ReaderImpl::createRowReader(const 
RowReaderOptions& opts, const ORCFilter* filter) const {
     if (opts.getSearchArgument() && !isMetadataLoaded) {
       // load stripe statistics for PPD
       readMetadata();
     }
-    return std::make_unique<RowReaderImpl>(contents, opts);
+    return std::make_unique<RowReaderImpl>(contents, opts, filter);
   }
 
   uint64_t maxStreamsForType(const proto::Type& type) {
@@ -1020,10 +1089,11 @@ namespace orc {
     }
   }
 
-  void RowReaderImpl::startNextStripe() {
+  void RowReaderImpl::startNextStripe(const ReadPhase& readPhase) {
     reader.reset();  // ColumnReaders use lots of memory; free old memory first
     rowIndexes.clear();
     bloomFilterIndex.clear();
+    followRowInStripe = 0;
 
     // evaluate file statistics if it exists
     if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer, 
numRowGroupsInStripeRange)) {
@@ -1051,6 +1121,9 @@ namespace orc {
       rowsInCurrentStripe = currentStripeInfo.numberofrows();
       processingStripe = currentStripe;
 
+      // read row group statistics and bloom filters of current stripe
+      loadStripeIndex();
+
       if (sargsApplier) {
         bool isStripeNeeded = true;
         if (contents->metadata) {
@@ -1064,9 +1137,6 @@ namespace orc {
         }
 
         if (isStripeNeeded) {
-          // read row group statistics and bloom filters of current stripe
-          loadStripeIndex();
-
           // select row groups to read in the current stripe
           sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, 
bloomFilterIndex);
           if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
@@ -1100,7 +1170,7 @@ namespace orc {
                                   sargsApplier->getNextSkippedRows());
         previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
         if (currentRowInStripe > 0) {
-          seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / 
footer->rowindexstride()));
+          seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / 
footer->rowindexstride()), readPhase);
         }
       }
     } else {
@@ -1110,56 +1180,175 @@ namespace orc {
   }
 
   bool RowReaderImpl::next(ColumnVectorBatch& data) {
+    return nextBatch(data, nullptr) != 0;
+  }
+
+  uint64_t RowReaderImpl::nextBatch(ColumnVectorBatch& data, void* arg) {
     SCOPED_STOPWATCH(contents->readerMetrics, ReaderInclusiveLatencyUs, 
ReaderCall);
-    if (currentStripe >= lastStripe) {
-      data.numElements = 0;
-      markEndOfFile();
-      return false;
-    }
-    if (currentRowInStripe == 0) {
-      startNextStripe();
-    }
-    uint64_t rowsToRead =
-        std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe - 
currentRowInStripe);
-    if (sargsApplier && rowsToRead > 0) {
-      rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe, 
rowsInCurrentStripe,
-                                    footer->rowindexstride(), 
sargsApplier->getNextSkippedRows());
-    }
-    data.numElements = rowsToRead;
-    if (rowsToRead == 0) {
-      markEndOfFile();
-      return false;
-    }
-    if (enableEncodedBlock) {
-      reader->nextEncoded(data, rowsToRead, nullptr);
-    } else {
-      reader->next(data, rowsToRead, nullptr);
-    }
-    // update row number
-    previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
-    currentRowInStripe += rowsToRead;
+    uint64_t readRows = 0;
+    uint64_t rowsToRead = 0;
+    // do...while is required to handle the case where the filter eliminates 
all rows in the
+    // batch, we never return an empty batch unless the file is exhausted
+    do {
+      if (currentStripe >= lastStripe) {
+        data.numElements = 0;
+        markEndOfFile();
+        return readRows;
+      }
+      if (currentRowInStripe == 0) {
+        startNextStripe(startReadPhase);
+        followRowInStripe = currentRowInStripe;
+      }
+      rowsToRead =
+          std::min(static_cast<uint64_t>(data.capacity),
+                   rowsInCurrentStripe - currentRowInStripe);
+      if (sargsApplier) {
+        rowsToRead = computeBatchSize(rowsToRead,
+                                      currentRowInStripe,
+                                      rowsInCurrentStripe,
+                                      footer->rowindexstride(),
+                                      sargsApplier->getNextSkippedRows());
+      }
+      if (rowsToRead == 0) {
+        markEndOfFile();
+        return readRows;
+      }
+      uint16_t sel_rowid_idx[rowsToRead];
+      nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx, arg);
+
+      if (startReadPhase == ReadPhase::LEADERS && data.numElements > 0) {
+        // At least 1 row has been selected and as a result we read the follow 
columns into the
+        // row batch
+        nextBatch(data, rowsToRead,
+                  prepareFollowReaders(footer->rowindexstride(),
+                                       currentRowInStripe, followRowInStripe), 
sel_rowid_idx, arg);
+        followRowInStripe = currentRowInStripe + rowsToRead;
+      }
+
+      // update row number
+      previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
+      currentRowInStripe += rowsToRead;
+      readRows += rowsToRead;
 
     // check if we need to advance to next selected row group
     if (sargsApplier) {
       uint64_t nextRowToRead =
           advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe, 
footer->rowindexstride(),
                                 sargsApplier->getNextSkippedRows());
-      if (currentRowInStripe != nextRowToRead) {
-        // it is guaranteed to be at start of a row group
-        currentRowInStripe = nextRowToRead;
-        if (currentRowInStripe < rowsInCurrentStripe) {
-          seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / 
footer->rowindexstride()));
+        if (currentRowInStripe != nextRowToRead) {
+          // it is guaranteed to be at start of a row group
+          currentRowInStripe = nextRowToRead;
+          if (currentRowInStripe < rowsInCurrentStripe) {
+            seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / 
footer->rowindexstride()),
+                           startReadPhase);
+          }
         }
       }
+
+      if (currentRowInStripe >= rowsInCurrentStripe) {
+        currentStripe += 1;
+        currentRowInStripe = 0;
+      }
+    } while (rowsToRead != 0 && data.numElements == 0);
+    return readRows;
+  }
+
+  void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const 
ReadPhase& readPhase, uint16_t* sel_rowid_idx, void* arg) {
+    if (enableEncodedBlock) {
+      reader->nextEncoded(data, batchSize, nullptr, readPhase);
+    }
+    else {
+      reader->next(data, batchSize, nullptr, readPhase);
+    }
+    if (readPhase == ReadPhase::ALL || readPhase == ReadPhase::LEADERS) {
+      // Set the batch size when reading everything or when reading FILTER 
columns
+      data.numElements = batchSize;
     }
 
-    if (currentRowInStripe >= rowsInCurrentStripe) {
-      currentStripe += 1;
-      currentRowInStripe = 0;
+    if (readPhase == ReadPhase::LEADERS) {
+      // Apply filter callback to reduce number of # rows selected for 
decoding in the next
+      // TreeReaders
+      if (readerContext->getFilterCallback()) {
+        readerContext->getFilterCallback()->filter(data, sel_rowid_idx, 
batchSize, arg);
+      }
     }
-    return rowsToRead != 0;
   }
 
+    /**
+   * Determine the RowGroup based on the supplied row id.
+   * @param rowIdx Row for which the row group is being determined
+   * @return Id of the RowGroup that the row belongs to
+   */
+    int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) {
+      return rowIndexStride == 0 ? 0 : (int) (rowIdx / rowIndexStride);
+    }
+
+    /**
+     * This method prepares the non-filter column readers for next batch. This 
involves the following
+     * 1. Determine position
+     * 2. Perform IO if required
+     * 3. Position the non-filter readers
+     *
+     * This method is repositioning the non-filter columns and as such this 
method shall never have to
+     * deal with navigating the stripe forward or skipping row groups, all of 
this should have already
+     * taken place based on the filter columns.
+     * @param toFollowRow The rowIdx identifies the required row position 
within the stripe for
+     *                    follow read
+     * @param fromFollowRow Indicates the current position of the follow read, 
exclusive
+     * @return the read phase for reading non-filter columns, this shall be 
FOLLOWERS_AND_PARENTS in
+     * case of a seek otherwise will be FOLLOWERS
+     */
+    ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride, 
long toFollowRow, long fromFollowRow) {
+      // 1. Determine the required row group and skip rows needed from the RG 
start
+      int needRG = computeRGIdx(rowIndexStride, toFollowRow);
+      // The current row is not yet read so we -1 to compute the previously 
read row group
+      int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1);
+      long skipRows;
+      if (needRG == readRG && toFollowRow >= fromFollowRow) {
+        // In case we are skipping forward within the same row group, we 
compute skip rows from the
+        // current position
+        skipRows = toFollowRow - fromFollowRow;
+      } else {
+        // In all other cases including seeking backwards, we compute the skip 
rows from the start of
+        // the required row group
+        skipRows = toFollowRow - (needRG * rowIndexStride);
+      }
+
+      // 2. Plan the row group idx for the non-filter columns if this has not 
already taken place
+      if (needsFollowColumnsRead) {
+        needsFollowColumnsRead = false;
+      }
+
+      // 3. Position the non-filter readers to the required RG and skipRows
+      ReadPhase result = ReadPhase::FOLLOWERS;
+      if (needRG != readRG || toFollowRow < fromFollowRow) {
+        // When having to change a row group or in case of back navigation, 
seek both the filter
+        // parents and non-filter. This will re-position the parents present 
vector. This is needed
+        // to determine the number of non-null values to skip on the 
non-filter columns.
+        seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS);
+        // skip rows on both the filter parents and non-filter as both have 
been positioned in the
+        // previous step
+        reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
+        result = ReadPhase::FOLLOWERS_AND_PARENTS;
+      } else if (skipRows > 0) {
+        // in case we are only skipping within the row group, position the 
filter parents back to the
+        // position of the follow. This is required to determine the non-null 
values to skip on the
+        // non-filter columns.
+        seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS);
+        reader->skip(fromFollowRow - (readRG * rowIndexStride), 
ReadPhase::LEADER_PARENTS);
+        // Move both the filter parents and non-filter forward, this will 
compute the correct
+        // non-null skips on follow children
+        reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
+        result = ReadPhase::FOLLOWERS_AND_PARENTS;
+      }
+      // Identifies the read level that should be performed for the read
+      // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both 
non-filter and filter parents
+      // FOLLOWERS indicates read only of the non-filter level without the 
parents, which is used during
+      // contiguous read. During a contiguous read no skips are needed and the 
non-null information of
+      // the parent is available in the column vector for use during 
non-filter read
+      return result;
+    }
+
   uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t 
currentRowInStripe,
                                            uint64_t rowsInCurrentStripe, 
uint64_t rowIndexStride,
                                            const std::vector<uint64_t>& 
nextSkippedRows) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index ea6db3a..0c52387 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -34,6 +34,25 @@ namespace orc {
 
   static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
 
+  class ReaderContext {
+  public:
+   ReaderContext() = default;
+
+   const ORCFilter* getFilterCallback() const {
+     return filter;
+   }
+
+   ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds, 
const ORCFilter* _filter) {
+     this->filterColumnIds = std::move(_filterColumnIds);
+     this->filter = _filter;
+     return *this;
+   }
+
+  private:
+   std::unordered_set<int> filterColumnIds;
+   const ORCFilter* filter;
+  };
+
   /**
    * WriterVersion Implementation
    */
@@ -150,6 +169,7 @@ namespace orc {
     uint64_t lastStripe;  // the stripe AFTER the last one
     uint64_t processingStripe;
     uint64_t currentRowInStripe;
+    uint64_t followRowInStripe;
     uint64_t rowsInCurrentStripe;
     // number of row groups between first stripe and last stripe
     uint64_t numRowGroupsInStripeRange;
@@ -160,7 +180,7 @@ namespace orc {
     bool enableEncodedBlock;
     bool useTightNumericVector;
     // internal methods
-    void startNextStripe();
+    void startNextStripe(const ReadPhase& readPhase);
     inline void markEndOfFile();
 
     // row index of current stripe with column id as the key
@@ -172,6 +192,15 @@ namespace orc {
     // desired timezone to return data of timestamp types.
     const Timezone& readerTimezone;
 
+    std::unique_ptr<ReaderContext> readerContext;
+    const ORCFilter* filter;
+    ReadPhase startReadPhase;
+    bool needsFollowColumnsRead;
+
+    std::map<uint64_t, const Type*> idTypeMap;
+    std::map<std::string, Type*> nameTypeMap;
+    std::vector<std::string> columns;
+
     // load stripe index if not done so
     void loadStripeIndex();
 
@@ -199,7 +228,7 @@ namespace orc {
      * Seek to the start of a row group in the current stripe
      * @param rowGroupEntryId the row group id to seek to
      */
-    void seekToRowGroup(uint32_t rowGroupEntryId);
+    void seekToRowGroup(uint32_t rowGroupEntryId, const ReadPhase& readPhase);
 
     /**
      * Check if the file has bad bloom filters. We will skip using them in the
@@ -208,13 +237,25 @@ namespace orc {
      */
     bool hasBadBloomFilters();
 
-   public:
+
+    // build map from type name and id, id to Type
+    void buildTypeNameIdMap(Type* type);
+
+    std::string toDotColumnPath();
+
+    void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase& 
readPhase, uint16_t* sel_rowid_idx, void* arg);
+
+    int computeRGIdx(uint64_t rowIndexStride, long rowIdx);
+
+    ReadPhase prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow, 
long fromFollowRow);
+
+  public:
     /**
      * Constructor that lets the user specify additional options.
      * @param contents of the file
      * @param options options for reading
      */
-    RowReaderImpl(std::shared_ptr<FileContents> contents, const 
RowReaderOptions& options);
+    RowReaderImpl(std::shared_ptr<FileContents> contents, const 
RowReaderOptions& options, const ORCFilter* filter = nullptr);
 
     // Select the columns from the options object
     const std::vector<bool> getSelectedColumns() const override;
@@ -223,6 +264,8 @@ namespace orc {
 
     std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size) const 
override;
 
+    uint64_t nextBatch(ColumnVectorBatch& data, void* arg = nullptr) override;
+
     bool next(ColumnVectorBatch& data) override;
 
     CompressionKind getCompression() const;
@@ -312,9 +355,9 @@ namespace orc {
 
     std::unique_ptr<StripeStatistics> getStripeStatistics(uint64_t 
stripeIndex) const override;
 
-    std::unique_ptr<RowReader> createRowReader() const override;
+    std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter = 
nullptr) const override;
 
-    std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& 
options) const override;
+    std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& 
options, const ORCFilter* filter = nullptr) const override;
 
     uint64_t getContentLength() const override;
     uint64_t getStripeStatisticsLength() const override;
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index 0075d04..47095b3 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -25,6 +25,12 @@
 
 namespace orc {
 
+  const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories({ 
ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT, 
ReaderCategory::NON_FILTER });
+  const ReadPhase ReadPhase::LEADERS = ReadPhase::fromCategories({ 
ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT });
+  const ReadPhase ReadPhase::FOLLOWERS = ReadPhase::fromCategories({ 
ReaderCategory::NON_FILTER });
+  const ReadPhase ReadPhase::LEADER_PARENTS = ReadPhase::fromCategories({ 
ReaderCategory::FILTER_PARENT });
+  const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS = 
ReadPhase::fromCategories({ ReaderCategory::FILTER_PARENT, 
ReaderCategory::NON_FILTER });
+
   Type::~Type() {
     // PASS
   }
@@ -38,6 +44,7 @@ namespace orc {
     precision = 0;
     scale = 0;
     subtypeCount = 0;
+    readerCategory = ReaderCategory::NON_FILTER;
   }
 
   TypeImpl::TypeImpl(TypeKind _kind, uint64_t _maxLength) {
@@ -49,6 +56,7 @@ namespace orc {
     precision = 0;
     scale = 0;
     subtypeCount = 0;
+    readerCategory = ReaderCategory::NON_FILTER;
   }
 
   TypeImpl::TypeImpl(TypeKind _kind, uint64_t _precision, uint64_t _scale) {
@@ -60,6 +68,7 @@ namespace orc {
     precision = _precision;
     scale = _scale;
     subtypeCount = 0;
+    readerCategory = ReaderCategory::NON_FILTER;
   }
 
   uint64_t TypeImpl::assignIds(uint64_t root) const {
@@ -75,8 +84,8 @@ namespace orc {
   void TypeImpl::ensureIdAssigned() const {
     if (columnId == -1) {
       const TypeImpl* root = this;
-      while (root->parent != nullptr) {
-        root = root->parent;
+      while (root->getParent() != nullptr) {
+        root = dynamic_cast<const TypeImpl*>(root->getParent());
       }
       root->assignIds(0);
     }
@@ -100,10 +109,18 @@ namespace orc {
     return subtypeCount;
   }
 
+  Type* TypeImpl::getParent() const {
+    return parent;
+  }
+
   const Type* TypeImpl::getSubtype(uint64_t i) const {
     return subTypes[i].get();
   }
 
+  Type* TypeImpl::getSubtype(uint64_t i) {
+    return subTypes[i].get();
+  }
+
   const std::string& TypeImpl::getFieldName(uint64_t i) const {
     return fieldNames[i];
   }
@@ -155,6 +172,14 @@ namespace orc {
     return it->second;
   }
 
+  ReaderCategory TypeImpl::getReaderCategory() const {
+    return readerCategory;
+  }
+
+  void TypeImpl::setReaderCategory(ReaderCategory _readerCategory) {
+    readerCategory = _readerCategory;
+  }
+
   void TypeImpl::setIds(uint64_t _columnId, uint64_t _maxColumnId) {
     columnId = static_cast<int64_t>(_columnId);
     maximumColumnId = static_cast<int64_t>(_maxColumnId);
diff --git a/c++/src/TypeImpl.hh b/c++/src/TypeImpl.hh
index 6d07437..6bc4a84 100644
--- a/c++/src/TypeImpl.hh
+++ b/c++/src/TypeImpl.hh
@@ -41,6 +41,7 @@ namespace orc {
     uint64_t precision;
     uint64_t scale;
     std::map<std::string, std::string> attributes;
+    ReaderCategory readerCategory;
 
    public:
     /**
@@ -66,8 +67,12 @@ namespace orc {
 
     uint64_t getSubtypeCount() const override;
 
+    Type* getParent() const override;
+
     const Type* getSubtype(uint64_t i) const override;
 
+    Type* getSubtype(uint64_t i) override;
+
     const std::string& getFieldName(uint64_t i) const override;
 
     uint64_t getMaximumLength() const override;
@@ -86,6 +91,10 @@ namespace orc {
 
     std::string getAttributeValue(const std::string& key) const override;
 
+    ReaderCategory getReaderCategory() const override;
+
+    void setReaderCategory(ReaderCategory _readerCategory) override;
+
     std::string toString() const override;
 
     const Type* getTypeByColumnId(uint64_t colIdx) const override;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to