This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 133149310e4 [fix](inverted index) inverted Index File Cache Queue 
Optimization (#46518)
133149310e4 is described below

commit 133149310e4f8e85de1d078715190fdbfdad0524
Author: zzzxl <yangs...@selectdb.com>
AuthorDate: Tue Jan 7 19:14:51 2025 +0800

    [fix](inverted index) inverted Index File Cache Queue Optimization (#46518)
    
    https://github.com/apache/doris/pull/46024
---
 .../segment_v2/inverted_index_compound_reader.cpp  |  41 ++++++---
 .../olap/rowset/segment_v2/inverted_index_desc.cpp |   6 ++
 .../olap/rowset/segment_v2/inverted_index_desc.h   |   4 +
 .../segment_v2/inverted_index_file_reader.cpp      |   2 +
 .../segment_v2/inverted_index_file_writer.cpp      |  15 ++-
 .../segment_v2/inverted_index_fs_directory.cpp     |  16 ----
 .../rowset/segment_v2/inverted_index_reader.cpp    |   4 +-
 .../segment_v2/inverted_index_file_writer_test.cpp |  30 +++---
 .../test_index_file_cache_fault_injection.out      |  52 +++++++++++
 .../test_index_file_cache_fault_injection.groovy   | 101 +++++++++++++++++++++
 10 files changed, 223 insertions(+), 48 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index c30017cc8fe..7a993daacf1 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -57,17 +57,18 @@ namespace segment_v2 {
 class CSIndexInput : public lucene::store::BufferedIndexInput {
 private:
     CL_NS(store)::IndexInput* base;
+    std::string file_name;
     int64_t fileOffset;
     int64_t _length;
     const io::IOContext* _io_ctx = nullptr;
-    bool _is_index_file = false; // Indicates if the file is a TII file
 
 protected:
     void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
     void seekInternal(const int64_t /*pos*/) override {}
 
 public:
-    CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, 
const int64_t length,
+    CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
+                 const int64_t fileOffset, const int64_t length,
                  const int32_t read_buffer_size = 
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
     CSIndexInput(const CSIndexInput& clone);
     ~CSIndexInput() override;
@@ -78,13 +79,14 @@ public:
     const char* getObjectName() const override { return getClassName(); }
     static const char* getClassName() { return "CSIndexInput"; }
     void setIoContext(const void* io_ctx) override;
-    void setIndexFile(bool isIndexFile) override;
 };
 
-CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t 
fileOffset,
-                           const int64_t length, const int32_t 
read_buffer_size)
+CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& 
file_name,
+                           const int64_t fileOffset, const int64_t length,
+                           const int32_t read_buffer_size)
         : BufferedIndexInput(read_buffer_size) {
     this->base = base;
+    this->file_name = file_name;
     this->fileOffset = fileOffset;
     this->_length = length;
 }
@@ -101,7 +103,27 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t 
len) {
         base->setIoContext(_io_ctx);
     }
 
-    base->setIndexFile(_is_index_file);
+    DBUG_EXECUTE_IF("CSIndexInput.readInternal", {
+        for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) 
{
+            if (file_name.find(entry.first) != std::string::npos) {
+                if (!static_cast<const 
io::IOContext*>(base->getIoContext())->is_index_data) {
+                    _CLTHROWA(CL_ERR_IO,
+                              "The 'is_index_data' flag should be true for 
inverted index meta "
+                              "files.");
+                }
+            }
+        }
+        for (const auto& entry : 
InvertedIndexDescriptor::normal_file_info_map) {
+            if (file_name.find(entry.first) != std::string::npos) {
+                if (static_cast<const 
io::IOContext*>(base->getIoContext())->is_index_data) {
+                    _CLTHROWA(CL_ERR_IO,
+                              "The 'is_index_data' flag should be false for 
non-meta inverted "
+                              "index files.");
+                }
+            }
+        }
+    });
+
     base->seek(fileOffset + start);
     bool read_from_buffer = true;
     base->readBytes(b, len, read_from_buffer);
@@ -119,6 +141,7 @@ lucene::store::IndexInput* CSIndexInput::clone() const {
 
 CSIndexInput::CSIndexInput(const CSIndexInput& clone) : 
BufferedIndexInput(clone) {
     this->base = clone.base;
+    this->file_name = clone.file_name;
     this->fileOffset = clone.fileOffset;
     this->_length = clone._length;
 }
@@ -129,10 +152,6 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
     _io_ctx = static_cast<const io::IOContext*>(io_ctx);
 }
 
-void CSIndexInput::setIndexFile(bool isIndexFile) {
-    _is_index_file = isIndexFile;
-}
-
 DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, 
int32_t read_buffer_size)
         : _ram_dir(new lucene::store::RAMDirectory()),
           _stream(stream),
@@ -312,7 +331,7 @@ bool DorisCompoundReader::openInput(const char* name, 
lucene::store::IndexInput*
         bufferSize = _read_buffer_size;
     }
 
-    ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length, 
bufferSize);
+    ret = _CLNEW CSIndexInput(_stream, entry->file_name, entry->offset, 
entry->length, bufferSize);
     return true;
 }
 
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
index 8eac73f13a6..e909bc1e0a9 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
@@ -24,6 +24,12 @@
 
 namespace doris::segment_v2 {
 
+const std::unordered_map<std::string, int32_t> 
InvertedIndexDescriptor::index_file_info_map = {
+        {"null_bitmap", 1}, {"segments.gen", 2}, {"segments_", 3}, {"fnm", 4}, 
{"tii", 5}};
+
+const std::unordered_map<std::string, int32_t> 
InvertedIndexDescriptor::normal_file_info_map = {
+        {"tis", 1}, {"frq", 2}, {"prx", 3}};
+
 // {tmp_dir}/{rowset_id}_{seg_id}_{index_id}@{suffix}
 std::string InvertedIndexDescriptor::get_temporary_index_path(std::string_view 
tmp_dir_path,
                                                               std::string_view 
rowset_id,
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_desc.h 
b/be/src/olap/rowset/segment_v2/inverted_index_desc.h
index 37f9cf3f4a1..f421c7f3790 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_desc.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_desc.h
@@ -20,6 +20,7 @@
 #include <stdint.h>
 
 #include <string>
+#include <unordered_map>
 
 namespace doris {
 struct RowsetId;
@@ -28,6 +29,9 @@ namespace segment_v2 {
 
 class InvertedIndexDescriptor {
 public:
+    static const std::unordered_map<std::string, int32_t> index_file_info_map;
+    static const std::unordered_map<std::string, int32_t> normal_file_info_map;
+
     static constexpr std::string_view segment_suffix = ".dat";
     static constexpr std::string_view index_suffix = ".idx";
     static std::string get_temporary_index_path(std::string_view tmp_dir_path,
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index 5306f2956c2..3629cfbbdfa 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -41,6 +41,7 @@ Status InvertedIndexFileReader::init(int32_t 
read_buffer_size, const io::IOConte
         if (_storage_format == InvertedIndexStorageFormatPB::V2) {
             if (_stream) {
                 _stream->setIoContext(io_ctx);
+                _stream->setIndexFile(true);
             }
         }
     }
@@ -83,6 +84,7 @@ Status InvertedIndexFileReader::_init_from(int32_t 
read_buffer_size, const io::I
         }
         _stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
         _stream->setIoContext(io_ctx);
+        _stream->setIndexFile(true);
 
         // 3. read file
         int32_t version = _stream->readInt(); // Read version number
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index bb373be5ee9..73c3f1b65d4 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -171,17 +171,14 @@ Status InvertedIndexFileWriter::close() {
 
 void InvertedIndexFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
     auto file_priority = [](const std::string& filename) {
-        if (filename.find("segments") != std::string::npos) {
-            return 1;
-        }
-        if (filename.find("fnm") != std::string::npos) {
-            return 2;
-        }
-        if (filename.find("tii") != std::string::npos) {
-            return 3;
+        for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) 
{
+            if (filename.find(entry.first) != std::string::npos) {
+                return entry.second;
+            }
         }
-        return 4; // Other files
+        return 6; // Other files
     };
+
     std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a, 
const FileInfo& b) {
         int32_t priority_a = file_priority(a.filename);
         int32_t priority_b = file_priority(b.filename);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 4befeba8991..e06ce69b7b2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -201,22 +201,6 @@ lucene::store::IndexInput* 
DorisFSDirectory::FSIndexInput::clone() const {
 }
 void DorisFSDirectory::FSIndexInput::close() {
     BufferedIndexInput::close();
-    /*if (_handle != nullptr) {
-        std::mutex* lock = _handle->_shared_lock;
-        bool ref = false;
-        {
-            std::lock_guard<std::mutex> wlock(*lock);
-            //determine if we are about to delete the handle...
-            ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1);
-            //decdelete (deletes if refcount is down to 0
-            _CLDECDELETE(_handle);
-        }
-
-        //if _handle is not ref by other FSIndexInput, try to release mutex 
lock, or it will be leaked.
-        if (!ref) {
-            delete lock;
-        }
-    }*/
 }
 
 void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 1c7f83b29a6..a75a9462f91 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -213,7 +213,9 @@ Status 
InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
     *searcher = searcher_result;
 
     // When the meta information has been read, the ioContext needs to be 
reset to prevent it from being used by other queries.
-    
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIoContext(nullptr);
+    auto stream = static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput();
+    stream->setIoContext(nullptr);
+    stream->setIndexFile(false);
 
     // NOTE: before mem_tracker hook becomes active, we caculate reader memory 
size by hand.
     mem_tracker->consume(index_searcher_builder->get_reader_size());
diff --git a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp 
b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
index dd3b4195c14..2320108af2c 100644
--- a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
@@ -335,7 +335,8 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) 
{
     
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
     
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
     mock_dir->init(_fs, local_fs_index_path.c_str());
-    std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii", 
"nullbitmap", "write.lock"};
+    std::vector<std::string> files = {"segments_0", "segments.gen", "0.fnm",
+                                      "0.tii",      "null_bitmap",  
"write.lock"};
     for (auto& file : files) {
         auto out_file_1 =
                 
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
@@ -343,11 +344,14 @@ TEST_F(InvertedIndexFileWriterTest, 
PrepareSortedFilesTest) {
         out_file_1->close();
     }
 
-    EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments")))
+    EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments_0")))
             .WillOnce(testing::Return(1000));
+    EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments.gen")))
+            .WillOnce(testing::Return(1200));
     EXPECT_CALL(*mock_dir, 
fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000));
     EXPECT_CALL(*mock_dir, 
fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500));
-    EXPECT_CALL(*mock_dir, 
fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500));
+    EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("null_bitmap")))
+            .WillOnce(testing::Return(500));
 
     InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
                                    InvertedIndexStorageFormatPB::V2);
@@ -362,24 +366,28 @@ TEST_F(InvertedIndexFileWriterTest, 
PrepareSortedFilesTest) {
     std::vector<FileInfo> sorted_files =
             writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1, 
"suffix1")].get());
 
-    // 1. 0.segments (priority 1, size 1000)
-    // 2. 0.fnm (priority 2, size 2000)
-    // 3. 0.tii (priority 3, size 1500)
-    // 4. nullbitmap (priority 4, size 500)
+    // 1. null_bitmap (priority 1, size 500)
+    // 2. segments.gen (priority 2, size 1200)
+    // 3. segments_0 (priority 3, size 1000)
+    // 4. 0.fnm (priority 4, size 2000)
+    // 5. 0.tii (priority 5, size 1500)
 
-    std::vector<std::string> expected_order = {"0.segments", "0.fnm", "0.tii", 
"nullbitmap"};
+    std::vector<std::string> expected_order = {"null_bitmap", "segments.gen", 
"segments_0", "0.fnm",
+                                               "0.tii"};
     ASSERT_EQ(sorted_files.size(), expected_order.size());
 
     for (size_t i = 0; i < expected_order.size(); ++i) {
         EXPECT_EQ(sorted_files[i].filename, expected_order[i]);
-        if (sorted_files[i].filename == "0.segments") {
+        if (sorted_files[i].filename == "null_bitmap") {
+            EXPECT_EQ(sorted_files[i].filesize, 500);
+        } else if (sorted_files[i].filename == "segments.gen") {
+            EXPECT_EQ(sorted_files[i].filesize, 1200);
+        } else if (sorted_files[i].filename == "segments_0") {
             EXPECT_EQ(sorted_files[i].filesize, 1000);
         } else if (sorted_files[i].filename == "0.fnm") {
             EXPECT_EQ(sorted_files[i].filesize, 2000);
         } else if (sorted_files[i].filename == "0.tii") {
             EXPECT_EQ(sorted_files[i].filesize, 1500);
-        } else if (sorted_files[i].filename == "nullbitmap") {
-            EXPECT_EQ(sorted_files[i].filesize, 500);
         }
     }
 }
diff --git 
a/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out
 
b/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out
new file mode 100644
index 00000000000..b096fdedd12
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out
@@ -0,0 +1,52 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1790
+
+-- !sql --
+2000
+
+-- !sql --
+4
+
+-- !sql --
+58
+
+-- !sql --
+0
+
+-- !sql --
+16
+
+-- !sql --
+12
+
+-- !sql --
+16
+
+-- !sql --
+12
+
+-- !sql --
+10
+
+-- !sql --
+88
+
+-- !sql --
+648
+
+-- !sql --
+386
+
+-- !sql --
+78
+
+-- !sql --
+746
+
+-- !sql --
+476
+
+-- !sql --
+2000
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy
new file mode 100644
index 00000000000..8a04c15b839
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_index_file_cache_fault_injection", "nonConcurrent") {
+    def indexTbName = "test_index_file_cache_fault_injection"
+
+    sql "DROP TABLE IF EXISTS ${indexTbName}"
+    sql """
+      CREATE TABLE ${indexTbName} (
+        `@timestamp` int(11) NULL COMMENT "",
+        `clientip` varchar(20) NULL COMMENT "",
+        `request` text NULL COMMENT "",
+        `status` int(11) NULL COMMENT "",
+        `size` int(11) NULL COMMENT "",
+        INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = 
"english", "support_phrase" = "true") COMMENT '',
+      ) ENGINE=OLAP
+      DUPLICATE KEY(`@timestamp`)
+      COMMENT "OLAP"
+      DISTRIBUTED BY RANDOM BUCKETS 1
+      PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "disable_auto_compaction" = "true"
+      );
+    """
+
+    def load_httplogs_data = {table_name, label, read_flag, format_flag, 
file_name, ignore_failure=false,
+                        expected_succ_rows = -1, load_to_single_tablet = 
'true' ->
+
+        // load the json data
+        streamLoad {
+            table "${table_name}"
+
+            // set http request header params
+            set 'label', label + "_" + UUID.randomUUID().toString()
+            set 'read_json_by_line', read_flag
+            set 'format', format_flag
+            file file_name // import json file
+            time 10000 // limit inflight 10s
+            if (expected_succ_rows >= 0) {
+                set 'max_filter_ratio', '1'
+            }
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+            check { result, exception, startTime, endTime ->
+                       if (ignore_failure && expected_succ_rows < 0) { return }
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+            }
+        }
+    }
+
+    try {
+        load_httplogs_data.call(indexTbName, 
'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json')
+        load_httplogs_data.call(indexTbName, 
'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json')
+
+        sql "sync"
+        sql """ set enable_common_expr_pushdown = true; """
+
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("CSIndexInput.readInternal")
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '0'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '1'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '2'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '3'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '4'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '5'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '6'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '7'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '8'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix '9'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix 'a'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix 'b'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix 'c'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix 'd'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix 'e'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix 'f'; """
+            qt_sql """ select count() from ${indexTbName} where request 
match_phrase_prefix 'g'; """
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("CSIndexInput.readInternal")
+        }
+    } finally {
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to