This is an automated email from the ASF dual-hosted git repository. airborne pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6841a547acd [fix](inverted index) resolve io_ctx heap-use-after-free in concurrent reader access (#47634) 6841a547acd is described below commit 6841a547acd4485c29c8c9cd7203c21e5b546854 Author: zzzxl <yangs...@selectdb.com> AuthorDate: Mon Feb 10 10:25:23 2025 +0800 [fix](inverted index) resolve io_ctx heap-use-after-free in concurrent reader access (#47634) ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: When concurrently opening readers for the inverted index, it's necessary to clone the stream before setting the io_ctx. This ensures that each query uses its own io_ctx. Without cloning, multiple queries might share the same stream, leading to heap-use-after-free issues when accessing the io_ctx. --- .../segment_v2/inverted_index_compound_reader.cpp | 31 +++++++++++++++++++++- .../segment_v2/inverted_index_compound_reader.h | 26 +++++++----------- .../segment_v2/inverted_index_file_reader.cpp | 22 +++++---------- .../rowset/segment_v2/inverted_index_file_reader.h | 6 +++-- .../rowset/segment_v2/inverted_index_reader.cpp | 22 +++++++++++++-- .../test_index_io_context.groovy | 4 ++- 6 files changed, 72 insertions(+), 39 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index 7a993daacf1..86efe86ca43 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -152,11 +152,35 @@ void CSIndexInput::setIoContext(const void* io_ctx) { _io_ctx = static_cast<const io::IOContext*>(io_ctx); } -DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size) +DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, + EntriesType* entries_clone, int32_t read_buffer_size, + const io::IOContext* io_ctx) + : _stream(stream), + _entries(_CLNEW EntriesType(true, true)), + _read_buffer_size(read_buffer_size) { + // After stream clone, the io_ctx needs to be reconfigured. + initialize(io_ctx); + + for (auto& e : *entries_clone) { + auto* origin_entry = e.second; + auto* entry = _CLNEW ReaderFileEntry(); + char* aid = strdup(e.first); + entry->file_name = origin_entry->file_name; + entry->offset = origin_entry->offset; + entry->length = origin_entry->length; + _entries->put(aid, entry); + } +}; + +DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size, + const io::IOContext* io_ctx) : _ram_dir(new lucene::store::RAMDirectory()), _stream(stream), _entries(_CLNEW EntriesType(true, true)), _read_buffer_size(read_buffer_size) { + // After stream clone, the io_ctx needs to be reconfigured. + initialize(io_ctx); + try { int32_t count = _stream->readVInt(); ReaderFileEntry* entry = nullptr; @@ -383,5 +407,10 @@ CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() { return _stream; } +void DorisCompoundReader::initialize(const io::IOContext* io_ctx) { + _stream->setIoContext(io_ctx); + _stream->setIdxFileCache(true); +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h index 1c7bc159b9c..4a687e4ed3e 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h @@ -34,6 +34,7 @@ #include <vector> #include "io/fs/file_system.h" +#include "io/io_common.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" class CLuceneError; @@ -78,24 +79,12 @@ protected: bool doDeleteFile(const char* name) override; public: - explicit DorisCompoundReader( - CL_NS(store)::IndexInput* stream, EntriesType* entries_clone, - int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE) - : _stream(stream), - _entries(_CLNEW EntriesType(true, true)), - _read_buffer_size(read_buffer_size) { - for (auto& e : *entries_clone) { - auto* origin_entry = e.second; - auto* entry = _CLNEW ReaderFileEntry(); - char* aid = strdup(e.first); - entry->file_name = origin_entry->file_name; - entry->offset = origin_entry->offset; - entry->length = origin_entry->length; - _entries->put(aid, entry); - } - }; + DorisCompoundReader(CL_NS(store)::IndexInput* stream, EntriesType* entries_clone, + int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE, + const io::IOContext* io_ctx = nullptr); DorisCompoundReader(CL_NS(store)::IndexInput* stream, - int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE); + int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE, + const io::IOContext* io_ctx = nullptr); ~DorisCompoundReader() override; void copyFile(const char* file, int64_t file_length, uint8_t* buffer, int64_t buffer_length); bool list(std::vector<std::string>* names) const override; @@ -115,6 +104,9 @@ public: static const char* getClassName(); const char* getObjectName() const override; CL_NS(store)::IndexInput* getDorisIndexInput(); + +private: + void initialize(const io::IOContext* io_ctx); }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp index e7838f1ffd0..30c3e178732 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp @@ -28,22 +28,13 @@ namespace doris::segment_v2 { Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) { + std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing if (!_inited) { _read_buffer_size = read_buffer_size; if (_storage_format >= InvertedIndexStorageFormatPB::V2) { - auto st = _init_from(read_buffer_size, io_ctx); - if (!st.ok()) { - return st; - } + RETURN_IF_ERROR(_init_from(read_buffer_size, io_ctx)); } _inited = true; - } else { - if (_storage_format == InvertedIndexStorageFormatPB::V2) { - if (_stream) { - _stream->setIoContext(io_ctx); - _stream->setIndexFile(true); - } - } } return Status::OK(); } @@ -51,7 +42,6 @@ Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOConte Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) { auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix); - std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing try { CLuceneError err; CL_NS(store)::IndexInput* index_input = nullptr; @@ -161,7 +151,7 @@ Result<InvertedIndexDirectoryMap> InvertedIndexFileReader::get_all_directories() } Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open( - int64_t index_id, const std::string& index_suffix) const { + int64_t index_id, const std::string& index_suffix, const io::IOContext* io_ctx) const { std::unique_ptr<DorisCompoundReader> compound_reader; if (_storage_format == InvertedIndexStorageFormatPB::V1) { @@ -231,16 +221,16 @@ Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open( } // Need to clone resource here, because index searcher cache need it. compound_reader = std::make_unique<DorisCompoundReader>( - _stream->clone(), index_it->second.get(), _read_buffer_size); + _stream->clone(), index_it->second.get(), _read_buffer_size, io_ctx); } return compound_reader; } Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::open( - const TabletIndex* index_meta) const { + const TabletIndex* index_meta, const io::IOContext* io_ctx) const { auto index_id = index_meta->index_id(); auto index_suffix = index_meta->get_index_suffix(); - return _open(index_id, index_suffix); + return _open(index_id, index_suffix, io_ctx); } std::string InvertedIndexFileReader::get_index_file_cache_key(const TabletIndex* index_meta) const { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h index 5f3775649d3..63dd89cf975 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h @@ -60,7 +60,8 @@ public: Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size, const io::IOContext* io_ctx = nullptr); - Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* index_meta) const; + Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* index_meta, + const io::IOContext* io_ctx = nullptr) const; void debug_file_entries(); std::string get_index_file_cache_key(const TabletIndex* index_meta) const; std::string get_index_file_path(const TabletIndex* index_meta) const; @@ -74,7 +75,8 @@ public: protected: Status _init_from(int32_t read_buffer_size, const io::IOContext* io_ctx); Result<std::unique_ptr<DorisCompoundReader>> _open(int64_t index_id, - const std::string& index_suffix) const; + const std::string& index_suffix, + const io::IOContext* io_ctx = nullptr) const; private: IndicesEntriesMap _indices_entries; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp index c1e3b10d882..5da74fd1dcf 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp @@ -127,7 +127,7 @@ Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx, LOG(WARNING) << st; return st; } - auto directory = DORIS_TRY(_inverted_index_file_reader->open(&_index_meta)); + auto directory = DORIS_TRY(_inverted_index_file_reader->open(&_index_meta, io_ctx)); dir = directory.release(); owned_dir = true; } @@ -218,7 +218,25 @@ Status InvertedIndexReader::handle_searcher_cache( LOG(WARNING) << st; return st; } - auto dir = DORIS_TRY(_inverted_index_file_reader->open(&_index_meta)); + auto dir = DORIS_TRY(_inverted_index_file_reader->open(&_index_meta, io_ctx)); + + DBUG_EXECUTE_IF("InvertedIndexReader.handle_searcher_cache.io_ctx", ({ + if (dir) { + auto* stream = dir->getDorisIndexInput(); + const auto* cur_io_ctx = + (const io::IOContext*)stream->getIoContext(); + if (cur_io_ctx->file_cache_stats) { + if (cur_io_ctx->file_cache_stats != &stats->file_cache_stats) { + LOG(FATAL) << "io context file cache stats is not equal to " + "stats file cache " + "stats: " + << cur_io_ctx->file_cache_stats << ", " + << &stats->file_cache_stats; + } + } + } + })); + // try to reuse index_searcher's directory to read null_bitmap to cache // to avoid open directory additionally for null_bitmap // TODO: handle null bitmap procedure in new format. diff --git a/regression-test/suites/fault_injection_p0/test_index_io_context.groovy b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy index 8dd82ccb304..23096432a3a 100644 --- a/regression-test/suites/fault_injection_p0/test_index_io_context.groovy +++ b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_index_io_context", "p0") { +suite("test_index_io_context", "nonConcurrent") { def tableName1 = "test_index_io_context1" def tableName2 = "test_index_io_context2" @@ -79,6 +79,7 @@ suite("test_index_io_context", "p0") { sql """ set enable_common_expr_pushdown = true; """ try { + GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexReader.handle_searcher_cache.io_ctx") qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """ qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """ qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """ @@ -104,6 +105,7 @@ suite("test_index_io_context", "p0") { qt_sql """ select count() from ${tableName2} where request match_phrase 'ticket_quest_bg2.jpg ~10+'; """ qt_sql """ select count() from ${tableName2} where request match_phrase 'ticket_quest_bg2.jpg ~10+'; """ } finally { + GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexReader.handle_searcher_cache.io_ctx") } } finally { } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org