This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/orc by this push:
     new db01184f765 [opt] support orc merge multi stripes io (#244)
db01184f765 is described below

commit db01184f765c03496e4107bd3ac37c077ac4bc5f
Author: daidai <2017501...@qq.com>
AuthorDate: Fri Oct 18 23:17:33 2024 +0800

    [opt] support orc merge multi stripes io (#244)
---
 c++/include/orc/Reader.hh |  8 +++++-
 c++/src/Reader.cc         | 71 +++++++++++++++++++++++++++++++++++++----------
 c++/src/Reader.hh         |  9 +++++-
 3 files changed, 71 insertions(+), 17 deletions(-)

diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 5843d88c059..c9be47e0d8b 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -40,7 +40,7 @@ namespace orc {
   // classes that hold data members so we can maintain binary compatibility
   struct ReaderOptionsPrivate;
   struct RowReaderOptionsPrivate;
-
+  class InputStream;
   /**
    * Expose the reader metrics including the latency and
    * number of calls of the decompression/decoding/IO modules.
@@ -633,6 +633,12 @@ namespace orc {
      */
     virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
         uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;
+
+    virtual InputStream* getStream() const = 0;
+
+    virtual void setStream(std::unique_ptr<InputStream>) = 0;
+
+    virtual std::vector<int> getNeedReadStripes(const RowReaderOptions& opts) 
= 0;
   };
 
   /**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 80a5cfd4b7d..eddeeee0b05 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -306,7 +306,8 @@ namespace orc {
     column_selector.updateSelected(selectedColumns, opts);
 
     // prepare SargsApplier if SearchArgument is available
-    if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
+    sargsApplier = std::move(contents->sargsApplier);
+    if (sargsApplier == nullptr && opts.getSearchArgument() && 
footer->rowindexstride() > 0) {
       sargs = opts.getSearchArgument();
       sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), 
footer->rowindexstride(),
                                           
getWriterVersionImpl(_contents.get()),
@@ -917,6 +918,37 @@ namespace orc {
     return std::make_unique<RowReaderImpl>(contents, opts, filter, 
stringDictFilter);
   }
 
+  std::vector<int> ReaderImpl::getNeedReadStripes(const RowReaderOptions& 
opts) {
+    if (opts.getSearchArgument() && !isMetadataLoaded) {
+      // load stripe statistics for PPD
+      readMetadata();
+    }
+
+    std::vector<int> allStripesNeeded(numberOfStripes,1);
+
+    if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
+      auto sargs = opts.getSearchArgument();
+      sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), 
footer->rowindexstride(),
+                                          getWriterVersionImpl(contents.get()),
+                                          contents->readerMetrics));
+
+      if (sargsApplier == nullptr || contents->metadata == nullptr) {
+        return allStripesNeeded;
+      }
+
+      for ( uint64_t currentStripeIndex = 0;currentStripeIndex < 
numberOfStripes ; currentStripeIndex ++) {
+        const auto& currentStripeStats =
+              
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
+        //Not need add mMetrics,so use 0.
+        allStripesNeeded[currentStripeIndex] = 
sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);;
+      }
+      contents->sargsApplier = std::move(sargsApplier);
+    }
+    return allStripesNeeded;
+  }
+
+
+
   uint64_t maxStreamsForType(const proto::Type& type) {
     switch (static_cast<int64_t>(type.kind())) {
       case proto::Type_Kind_STRUCT:
@@ -1126,29 +1158,38 @@ namespace orc {
             << ", footerLength=" << currentStripeInfo.footerlength() << ")";
         throw ParseError(msg.str());
       }
-      currentStripeFooter = getStripeFooter(currentStripeInfo, 
*contents.get());
-      rowsInCurrentStripe = currentStripeInfo.numberofrows();
-      processingStripe = currentStripe;
-
-      std::unique_ptr<StripeInformation> currentStripeInformation(new 
StripeInformationImpl(
-          currentStripeInfo.offset(), currentStripeInfo.indexlength(),
-          currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
-          currentStripeInfo.numberofrows(), contents->stream.get(), 
*contents->pool,
-          contents->compression, contents->blockSize, 
contents->readerMetrics));
-      contents->stream->beforeReadStripe(std::move(currentStripeInformation), 
selectedColumns);
 
       if (sargsApplier) {
         bool isStripeNeeded = true;
         if (contents->metadata) {
-          const auto& currentStripeStats =
-              contents->metadata->stripestats(static_cast<int>(currentStripe));
+          const auto &currentStripeStats =
+                  
contents->metadata->stripestats(static_cast<int>(currentStripe));
           // skip this stripe after stats fail to satisfy sargs
           uint64_t stripeRowGroupCount =
-              (rowsInCurrentStripe + footer->rowindexstride() - 1) / 
footer->rowindexstride();
+                  (rowsInCurrentStripe + footer->rowindexstride() - 1) / 
footer->rowindexstride();
           isStripeNeeded =
-              sargsApplier->evaluateStripeStatistics(currentStripeStats, 
stripeRowGroupCount);
+                  sargsApplier->evaluateStripeStatistics(currentStripeStats, 
stripeRowGroupCount);
         }
+        if (!isStripeNeeded) {
+          // advance to next stripe when current stripe has no matching rows
+          currentStripe += 1;
+          currentRowInStripe = 0;
+          continue;
+        }
+      }
+      currentStripeFooter = getStripeFooter(currentStripeInfo, 
*contents.get());
+      rowsInCurrentStripe = currentStripeInfo.numberofrows();
+      processingStripe = currentStripe;
 
+      std::unique_ptr<StripeInformation> currentStripeInformation(new 
StripeInformationImpl(
+            currentStripeInfo.offset(), currentStripeInfo.indexlength(),
+            currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
+            currentStripeInfo.numberofrows(), contents->stream.get(), 
*contents->pool,
+            contents->compression, contents->blockSize, 
contents->readerMetrics));
+      contents->stream->beforeReadStripe(std::move(currentStripeInformation), 
selectedColumns);
+
+      if (sargsApplier) {
+        bool isStripeNeeded = true;
         if (isStripeNeeded) {
           // read row group statistics and bloom filters of current stripe
           loadStripeIndex();
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index c0f891ef27c..9505022c558 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -100,6 +100,7 @@ namespace orc {
     bool isDecimalAsLong;
     std::unique_ptr<proto::Metadata> metadata;
     ReaderMetrics* readerMetrics;
+    std::unique_ptr<SargsApplier> sargsApplier;
   };
 
   proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -314,6 +315,8 @@ namespace orc {
     // footer
     proto::Footer* footer;
     uint64_t numberOfStripes;
+    std::unique_ptr<SargsApplier> sargsApplier;
+    std::vector<int> getNeedReadStripes(const RowReaderOptions& opts) override;
     uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);
 
     // internal methods
@@ -418,10 +421,14 @@ namespace orc {
       return contents->schema.get();
     }
 
-    InputStream* getStream() const {
+    InputStream* getStream() const override {
       return contents->stream.get();
     }
 
+    void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{
+      contents->stream =  std::move(inputStreamUPtr);
+    }
+
     uint64_t getMemoryUse(int stripeIx = -1) override;
 
     uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int 
stripeIx = -1) override;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to