This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch orc-2.1
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/orc-2.1 by this push:
     new 8e7a0511980 [Optimize] Support pread cache for index/stripe/file for 
orc
8e7a0511980 is described below

commit 8e7a0511980039f29802c25f956e3a907967333a
Author: caiconghui1 <[email protected]>
AuthorDate: Thu Aug 14 00:11:33 2025 +0800

    [Optimize] Support pread cache for index/stripe/file for orc
---
 c++/include/orc/OrcFile.hh |  7 +++++++
 c++/src/Reader.cc          | 34 ++++++++++++----------------------
 2 files changed, 19 insertions(+), 22 deletions(-)

diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index d840e67cf28..d4d554a3501 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -36,6 +36,12 @@ namespace orc {
    */
   class InputStream {
    public:
+    enum class CacheType {
+      FILE,
+      STRIPE,
+      ROW_GROUP_INDEX,
+    };
+
     virtual ~InputStream();
 
     /**
@@ -65,6 +71,7 @@ namespace orc {
 
     virtual void beforeReadStripe(std::unique_ptr<StripeInformation> 
currentStripeInformation,
                                   std::vector<bool>& selectedColumns);
+    virtual void preadCache(CacheType cacheType, uint64_t offset, uint64_t 
length);
   };
 
   /**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 4a5fb8f73b5..4c33a3c9aff 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -540,22 +540,9 @@ namespace orc {
     // reset all previous row indexes
     rowIndexes.clear();
     bloomFilterIndex.clear();
-    static const uint64_t MAX_READ_STRIPE_INDEX_SIZE_FOR_BUFFER = 1024 * 1024;
-    const char* stripeIndexBuffer = nullptr;
     uint64_t offset = currentStripeInfo.offset();
-    uint64_t startOffset = offset;
-    uint64_t stripeIndexSize = currentStripeInfo.indexlength();
-    std::unique_ptr<SeekableFileInputStream> stripeIndexInputStream = nullptr;
-    if (stripeIndexSize < MAX_READ_STRIPE_INDEX_SIZE_FOR_BUFFER) {
-      stripeIndexInputStream = std::unique_ptr<SeekableFileInputStream>(new 
SeekableFileInputStream(
-        contents->stream.get(), startOffset, stripeIndexSize, *contents->pool, 
stripeIndexSize));
-      int size = 0;
-      const void* buffer = nullptr;
-      if (!stripeIndexInputStream->Next(&buffer, &size) || 
static_cast<uint64_t>(size) != stripeIndexSize) {
-        throw ParseError("Failed to read the stripe index");
-      }
-      stripeIndexBuffer = static_cast<const char*>(buffer);
-    }
+    uint64_t rowIndexSize = currentStripeInfo.indexlength();
+    contents->stream->preadCache(InputStream::CacheType::ROW_GROUP_INDEX, 
offset, rowIndexSize);
     // obtain row indexes for selected columns
     for (int i = 0; i < currentStripeFooter.streams_size(); ++i) {
       const proto::Stream& pbStream = currentStripeFooter.streams(i);
@@ -563,13 +550,10 @@ namespace orc {
       if (selectedColumns[colId] && pbStream.has_kind() &&
           (pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
            pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
-        std::unique_ptr<SeekableInputStream> inputStream = stripeIndexBuffer ? 
std::unique_ptr<SeekableInputStream>(
-          new SeekableArrayInputStream(stripeIndexBuffer + offset - 
startOffset, pbStream.length()))
-          : std::unique_ptr<SeekableInputStream>(new 
SeekableFileInputStream(contents->stream.get(), offset,
-            pbStream.length(), *contents->pool));
         std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
             getCompression(),
-            std::move(inputStream),
+            std::unique_ptr<SeekableInputStream>(new 
SeekableFileInputStream(contents->stream.get(),
+              offset, pbStream.length(), *contents->pool)),
             getCompressionSize(), *contents->pool, contents->readerMetrics);
 
         if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
@@ -1190,8 +1174,9 @@ namespace orc {
     while (currentStripe < lastStripe) {
       currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
       uint64_t fileLength = contents->stream->getLength();
-      if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
-              currentStripeInfo.datalength() + 
currentStripeInfo.footerlength() >=
+      size_t stripeSize = currentStripeInfo.indexlength() + 
currentStripeInfo.datalength()
+        + currentStripeInfo.footerlength();
+      if (currentStripeInfo.offset() + stripeSize >=
           fileLength) {
         std::stringstream msg;
         msg << "Malformed StripeInformation at stripe index " << currentStripe
@@ -1221,6 +1206,7 @@ namespace orc {
           continue;
         }
       }
+      contents->stream->preadCache(InputStream::CacheType::STRIPE, 
currentStripeInfo.offset(), stripeSize);
       currentStripeFooter = getStripeFooter(currentStripeInfo, 
*contents.get());
       rowsInCurrentStripe = currentStripeInfo.numberofrows();
       processingStripe = currentStripe;
@@ -1678,6 +1664,7 @@ namespace orc {
         throw ParseError("File size too small");
       }
       auto buffer = std::make_unique<DataBuffer<char>>(*contents->pool, 
readSize);
+      stream->preadCache(InputStream::CacheType::FILE, 0, fileLength);
       stream->read(buffer->data(), readSize, fileLength - readSize);
 
       postscriptLength = buffer->data()[readSize - 1] & 0xff;
@@ -1780,4 +1767,7 @@ namespace orc {
   void InputStream::beforeReadStripe(std::unique_ptr<StripeInformation> 
currentStripeInformation,
                                      std::vector<bool>& selectedColumns) {}
 
+  void InputStream::preadCache(CacheType cacheType, uint64_t offset, uint64_t 
length) {}
+
+
 }  // namespace orc


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to