This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6841a547acd [fix](inverted index) resolve io_ctx heap-use-after-free 
in concurrent reader access (#47634)
6841a547acd is described below

commit 6841a547acd4485c29c8c9cd7203c21e5b546854
Author: zzzxl <yangs...@selectdb.com>
AuthorDate: Mon Feb 10 10:25:23 2025 +0800

    [fix](inverted index) resolve io_ctx heap-use-after-free in concurrent 
reader access (#47634)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    When concurrently opening readers for the inverted index, it's necessary
    to clone the stream before setting the io_ctx. This ensures that each
    query uses its own io_ctx. Without cloning, multiple queries might share
    the same stream, leading to heap-use-after-free issues when accessing
    the io_ctx.
---
 .../segment_v2/inverted_index_compound_reader.cpp  | 31 +++++++++++++++++++++-
 .../segment_v2/inverted_index_compound_reader.h    | 26 +++++++-----------
 .../segment_v2/inverted_index_file_reader.cpp      | 22 +++++----------
 .../rowset/segment_v2/inverted_index_file_reader.h |  6 +++--
 .../rowset/segment_v2/inverted_index_reader.cpp    | 22 +++++++++++++--
 .../test_index_io_context.groovy                   |  4 ++-
 6 files changed, 72 insertions(+), 39 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 7a993daacf1..86efe86ca43 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -152,11 +152,35 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
     _io_ctx = static_cast<const io::IOContext*>(io_ctx);
 }
 
-DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, 
int32_t read_buffer_size)
+DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
+                                         EntriesType* entries_clone, int32_t 
read_buffer_size,
+                                         const io::IOContext* io_ctx)
+        : _stream(stream),
+          _entries(_CLNEW EntriesType(true, true)),
+          _read_buffer_size(read_buffer_size) {
+    // After stream clone, the io_ctx needs to be reconfigured.
+    initialize(io_ctx);
+
+    for (auto& e : *entries_clone) {
+        auto* origin_entry = e.second;
+        auto* entry = _CLNEW ReaderFileEntry();
+        char* aid = strdup(e.first);
+        entry->file_name = origin_entry->file_name;
+        entry->offset = origin_entry->offset;
+        entry->length = origin_entry->length;
+        _entries->put(aid, entry);
+    }
+};
+
+DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, 
int32_t read_buffer_size,
+                                         const io::IOContext* io_ctx)
         : _ram_dir(new lucene::store::RAMDirectory()),
           _stream(stream),
           _entries(_CLNEW EntriesType(true, true)),
           _read_buffer_size(read_buffer_size) {
+    // After stream clone, the io_ctx needs to be reconfigured.
+    initialize(io_ctx);
+
     try {
         int32_t count = _stream->readVInt();
         ReaderFileEntry* entry = nullptr;
@@ -383,5 +407,10 @@ CL_NS(store)::IndexInput* 
DorisCompoundReader::getDorisIndexInput() {
     return _stream;
 }
 
+void DorisCompoundReader::initialize(const io::IOContext* io_ctx) {
+    _stream->setIoContext(io_ctx);
+    _stream->setIdxFileCache(true);
+}
+
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 1c7bc159b9c..4a687e4ed3e 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -34,6 +34,7 @@
 #include <vector>
 
 #include "io/fs/file_system.h"
+#include "io/io_common.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 
 class CLuceneError;
@@ -78,24 +79,12 @@ protected:
     bool doDeleteFile(const char* name) override;
 
 public:
-    explicit DorisCompoundReader(
-            CL_NS(store)::IndexInput* stream, EntriesType* entries_clone,
-            int32_t read_buffer_size = 
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE)
-            : _stream(stream),
-              _entries(_CLNEW EntriesType(true, true)),
-              _read_buffer_size(read_buffer_size) {
-        for (auto& e : *entries_clone) {
-            auto* origin_entry = e.second;
-            auto* entry = _CLNEW ReaderFileEntry();
-            char* aid = strdup(e.first);
-            entry->file_name = origin_entry->file_name;
-            entry->offset = origin_entry->offset;
-            entry->length = origin_entry->length;
-            _entries->put(aid, entry);
-        }
-    };
+    DorisCompoundReader(CL_NS(store)::IndexInput* stream, EntriesType* 
entries_clone,
+                        int32_t read_buffer_size = 
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
+                        const io::IOContext* io_ctx = nullptr);
     DorisCompoundReader(CL_NS(store)::IndexInput* stream,
-                        int32_t read_buffer_size = 
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
+                        int32_t read_buffer_size = 
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
+                        const io::IOContext* io_ctx = nullptr);
     ~DorisCompoundReader() override;
     void copyFile(const char* file, int64_t file_length, uint8_t* buffer, 
int64_t buffer_length);
     bool list(std::vector<std::string>* names) const override;
@@ -115,6 +104,9 @@ public:
     static const char* getClassName();
     const char* getObjectName() const override;
     CL_NS(store)::IndexInput* getDorisIndexInput();
+
+private:
+    void initialize(const io::IOContext* io_ctx);
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index e7838f1ffd0..30c3e178732 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -28,22 +28,13 @@
 namespace doris::segment_v2 {
 
 Status InvertedIndexFileReader::init(int32_t read_buffer_size, const 
io::IOContext* io_ctx) {
+    std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
     if (!_inited) {
         _read_buffer_size = read_buffer_size;
         if (_storage_format >= InvertedIndexStorageFormatPB::V2) {
-            auto st = _init_from(read_buffer_size, io_ctx);
-            if (!st.ok()) {
-                return st;
-            }
+            RETURN_IF_ERROR(_init_from(read_buffer_size, io_ctx));
         }
         _inited = true;
-    } else {
-        if (_storage_format == InvertedIndexStorageFormatPB::V2) {
-            if (_stream) {
-                _stream->setIoContext(io_ctx);
-                _stream->setIndexFile(true);
-            }
-        }
     }
     return Status::OK();
 }
@@ -51,7 +42,6 @@ Status InvertedIndexFileReader::init(int32_t 
read_buffer_size, const io::IOConte
 Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const 
io::IOContext* io_ctx) {
     auto index_file_full_path = 
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
 
-    std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
     try {
         CLuceneError err;
         CL_NS(store)::IndexInput* index_input = nullptr;
@@ -161,7 +151,7 @@ Result<InvertedIndexDirectoryMap> 
InvertedIndexFileReader::get_all_directories()
 }
 
 Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open(
-        int64_t index_id, const std::string& index_suffix) const {
+        int64_t index_id, const std::string& index_suffix, const 
io::IOContext* io_ctx) const {
     std::unique_ptr<DorisCompoundReader> compound_reader;
 
     if (_storage_format == InvertedIndexStorageFormatPB::V1) {
@@ -231,16 +221,16 @@ Result<std::unique_ptr<DorisCompoundReader>> 
InvertedIndexFileReader::_open(
         }
         // Need to clone resource here, because index searcher cache need it.
         compound_reader = std::make_unique<DorisCompoundReader>(
-                _stream->clone(), index_it->second.get(), _read_buffer_size);
+                _stream->clone(), index_it->second.get(), _read_buffer_size, 
io_ctx);
     }
     return compound_reader;
 }
 
 Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::open(
-        const TabletIndex* index_meta) const {
+        const TabletIndex* index_meta, const io::IOContext* io_ctx) const {
     auto index_id = index_meta->index_id();
     auto index_suffix = index_meta->get_index_suffix();
-    return _open(index_id, index_suffix);
+    return _open(index_id, index_suffix, io_ctx);
 }
 
 std::string InvertedIndexFileReader::get_index_file_cache_key(const 
TabletIndex* index_meta) const {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h 
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
index 5f3775649d3..63dd89cf975 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
@@ -60,7 +60,8 @@ public:
 
     Status init(int32_t read_buffer_size = 
config::inverted_index_read_buffer_size,
                 const io::IOContext* io_ctx = nullptr);
-    Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* 
index_meta) const;
+    Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* 
index_meta,
+                                                      const io::IOContext* 
io_ctx = nullptr) const;
     void debug_file_entries();
     std::string get_index_file_cache_key(const TabletIndex* index_meta) const;
     std::string get_index_file_path(const TabletIndex* index_meta) const;
@@ -74,7 +75,8 @@ public:
 protected:
     Status _init_from(int32_t read_buffer_size, const io::IOContext* io_ctx);
     Result<std::unique_ptr<DorisCompoundReader>> _open(int64_t index_id,
-                                                       const std::string& 
index_suffix) const;
+                                                       const std::string& 
index_suffix,
+                                                       const io::IOContext* 
io_ctx = nullptr) const;
 
 private:
     IndicesEntriesMap _indices_entries;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index c1e3b10d882..5da74fd1dcf 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -127,7 +127,7 @@ Status InvertedIndexReader::read_null_bitmap(const 
io::IOContext* io_ctx,
                 LOG(WARNING) << st;
                 return st;
             }
-            auto directory = 
DORIS_TRY(_inverted_index_file_reader->open(&_index_meta));
+            auto directory = 
DORIS_TRY(_inverted_index_file_reader->open(&_index_meta, io_ctx));
             dir = directory.release();
             owned_dir = true;
         }
@@ -218,7 +218,25 @@ Status InvertedIndexReader::handle_searcher_cache(
             LOG(WARNING) << st;
             return st;
         }
-        auto dir = DORIS_TRY(_inverted_index_file_reader->open(&_index_meta));
+        auto dir = DORIS_TRY(_inverted_index_file_reader->open(&_index_meta, 
io_ctx));
+
+        DBUG_EXECUTE_IF("InvertedIndexReader.handle_searcher_cache.io_ctx", ({
+                            if (dir) {
+                                auto* stream = dir->getDorisIndexInput();
+                                const auto* cur_io_ctx =
+                                        (const 
io::IOContext*)stream->getIoContext();
+                                if (cur_io_ctx->file_cache_stats) {
+                                    if (cur_io_ctx->file_cache_stats != 
&stats->file_cache_stats) {
+                                        LOG(FATAL) << "io context file cache 
stats is not equal to "
+                                                      "stats file cache "
+                                                      "stats: "
+                                                   << 
cur_io_ctx->file_cache_stats << ", "
+                                                   << &stats->file_cache_stats;
+                                    }
+                                }
+                            }
+                        }));
+
         // try to reuse index_searcher's directory to read null_bitmap to cache
         // to avoid open directory additionally for null_bitmap
         // TODO: handle null bitmap procedure in new format.
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_io_context.groovy 
b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
index 8dd82ccb304..23096432a3a 100644
--- a/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
+++ b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_index_io_context", "p0") {
+suite("test_index_io_context", "nonConcurrent") {
     def tableName1 = "test_index_io_context1"
     def tableName2 = "test_index_io_context2"
 
@@ -79,6 +79,7 @@ suite("test_index_io_context", "p0") {
         sql """ set enable_common_expr_pushdown = true; """
 
         try {
+            
GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexReader.handle_searcher_cache.io_ctx")
             qt_sql """ select count() from ${tableName1} where request 
match_any 'ticket_quest_bg2.jpg'; """
             qt_sql """ select count() from ${tableName1} where request 
match_any 'ticket_quest_bg2.jpg'; """
             qt_sql """ select count() from ${tableName1} where request 
match_any 'ticket_quest_bg2.jpg'; """
@@ -104,6 +105,7 @@ suite("test_index_io_context", "p0") {
             qt_sql """ select count() from ${tableName2} where request 
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
             qt_sql """ select count() from ${tableName2} where request 
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
         } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexReader.handle_searcher_cache.io_ctx")
         }
     } finally {
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to