This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4746e9e3a23 [opt](inverted index)Optimize code to get rid of heap use 
after free (#45745) (#46075)
4746e9e3a23 is described below

commit 4746e9e3a23cf7835c60465e1bfdb95cc72abc8b
Author: qiye <l...@selectdb.com>
AuthorDate: Fri Dec 27 16:46:58 2024 +0800

    [opt](inverted index)Optimize code to get rid of heap use after free 
(#45745) (#46075)
    
    bp #45745
---
 .../segment_v2/inverted_index_compound_reader.cpp  |  16 ++
 .../segment_v2/inverted_index_compound_reader.h    |   1 +
 .../segment_v2/inverted_index_fs_directory.cpp     |   4 +-
 .../rowset/segment_v2/inverted_index_writer.cpp    |  12 +-
 ...st_index_compound_directory_fault_injection.out |  22 ---
 ...index_compound_directory_fault_injection.groovy | 197 ++++++++++++++-------
 6 files changed, 156 insertions(+), 96 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 5e6d8747a2d..08ab1b6cc88 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -214,6 +214,9 @@ const char* DorisCompoundReader::getObjectName() const {
 }
 
 bool DorisCompoundReader::list(std::vector<std::string>* names) const {
+    if (_closed || entries == nullptr) {
+        _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
+    }
     for (EntriesType::const_iterator i = entries->begin(); i != 
entries->end(); i++) {
         names->push_back(i->first);
     }
@@ -221,6 +224,9 @@ bool DorisCompoundReader::list(std::vector<std::string>* 
names) const {
 }
 
 bool DorisCompoundReader::fileExists(const char* name) const {
+    if (_closed || entries == nullptr) {
+        _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
+    }
     return entries->exists((char*)name);
 }
 
@@ -237,6 +243,9 @@ int64_t DorisCompoundReader::fileModified(const char* name) 
const {
 }
 
 int64_t DorisCompoundReader::fileLength(const char* name) const {
+    if (_closed || entries == nullptr) {
+        _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
+    }
     ReaderFileEntry* e = entries->get((char*)name);
     if (e == nullptr) {
         char buf[CL_MAX_PATH + 30];
@@ -251,6 +260,9 @@ int64_t DorisCompoundReader::fileLength(const char* name) 
const {
 bool DorisCompoundReader::openInput(const char* name,
                                     
std::unique_ptr<lucene::store::IndexInput>& ret,
                                     CLuceneError& error, int32_t bufferSize) {
+    if (_closed || entries == nullptr) {
+        _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
+    }
     lucene::store::IndexInput* tmp;
     bool success = openInput(name, tmp, error, bufferSize);
     if (success) {
@@ -294,6 +306,10 @@ void DorisCompoundReader::close() {
         _CLDELETE(stream)
     }
     if (entries != nullptr) {
+        // The life cycle of _entries should be consistent with that of the 
DorisCompoundReader.
+        // DO NOT DELETE _entries here, it will be deleted in the destructor
+        // When directory is closed, all _entries are cleared. But the 
directory may be called in other places.
+        // If we delete the _entries object here, it will cause core dump.
         entries->clear();
     }
     if (ram_dir) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 1ca2d6ad371..bc5ae415052 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -72,6 +72,7 @@ private:
     std::string directory;
     std::string file_name;
     CL_NS(store)::IndexInput* stream = nullptr;
+    // The life cycle of entries should be consistent with that of the 
DorisCompoundReader.
     EntriesType* entries = nullptr;
     std::mutex _this_lock;
     bool _closed = false;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 9dbe0986755..9e2e253c404 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -634,10 +634,8 @@ bool DorisRAMFSDirectory::doDeleteFile(const char* name) {
         SCOPED_LOCK_MUTEX(this->THIS_LOCK);
         sizeInBytes -= itr->second->sizeInBytes;
         filesMap->removeitr(itr);
-        return true;
-    } else {
-        return false;
     }
+    return true;
 }
 
 bool DorisRAMFSDirectory::deleteDirectory() {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index a50b34b5fb1..4e503685e68 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -138,12 +138,14 @@ public:
 
     void close_on_error() override {
         try {
-            if (_index_writer) {
-                _index_writer->close();
-            }
+            // delete directory must be done before index_writer close
+            // because index_writer will close the directory
             if (_dir) {
                 _dir->deleteDirectory();
             }
+            if (_index_writer) {
+                _index_writer->close();
+            }
         } catch (CLuceneError& e) {
             LOG(ERROR) << "InvertedIndexWriter close_on_error failure: " << 
e.what();
         }
@@ -664,11 +666,13 @@ private:
     std::unique_ptr<lucene::document::Document> _doc = nullptr;
     lucene::document::Field* _field = nullptr;
     bool _single_field = true;
+    // Since _index_writer's write.lock is created by _dir.lockFactory,
+    // _dir must destruct after _index_writer, so _dir must be defined before 
_index_writer.
+    DorisFSDirectory* _dir = nullptr;
     std::unique_ptr<lucene::index::IndexWriter> _index_writer = nullptr;
     std::unique_ptr<lucene::analysis::Analyzer> _analyzer = nullptr;
     std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
     std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
-    DorisFSDirectory* _dir = nullptr;
     const KeyCoder* _value_key_coder;
     const TabletIndex* _index_meta;
     InvertedIndexParserType _parser_type;
diff --git 
a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
 
b/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
deleted file mode 100644
index 4efc0928fb7..00000000000
--- 
a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
+++ /dev/null
@@ -1,22 +0,0 @@
--- This file is automatically generated. You should know what you did if you 
want to edit this
--- !sql --
-863
-
--- !sql --
-863
-
--- !sql --
-863
-
--- !sql --
-863
-
--- !sql --
-0
-
--- !sql --
-0
-
--- !sql --
-0
-
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
index 759c409a850..97cd829ec79 100644
--- 
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
@@ -15,9 +15,35 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
+suite("test_index_compound_directory_fault_injection", "nonConcurrent") {
     // define a sql table
     def testTable_dup = "httplogs_dup_compound"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    def set_be_config = { key, value ->
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    def check_config = { String key, String value ->
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+            logger.info("Show config: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def configList = parseJson(out.trim())
+            assert configList instanceof List
+            for (Object ele in (List) configList) {
+                assert ele instanceof List<String>
+                if (((List<String>) ele)[0] == key) {
+                    assertEquals(value, ((List<String>) ele)[2])
+                }
+            }
+        }
+    }
 
     def create_httplogs_dup_table = {testTablex ->
         // multi-line sql
@@ -85,74 +111,111 @@ suite("test_index_compound_directory_failure_injection", 
"nonConcurrent") {
         }
     }
 
-    try {
-        sql "DROP TABLE IF EXISTS ${testTable_dup}"
-        create_httplogs_dup_table.call(testTable_dup)
-
-        try {
-            
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
-            load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
-        } finally {
-            
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
-        }
-        qt_sql "select COUNT() from ${testTable_dup} where request match 
'images'"
-        try {
-            
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
-            load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
-        } finally {
-            
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
-        }
-        qt_sql "select COUNT() from ${testTable_dup} where request match 
'images'"
-        try {
-            
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
-            load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
-        } finally {
-            
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
-        }
-        qt_sql "select COUNT() from ${testTable_dup} where request match 
'images'"
-        try {
-            
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
-            load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
-        } finally {
-            
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
-        }
-        qt_sql "select COUNT() from ${testTable_dup} where request match 
'images'"
-        try {
-            def test_index_compound_directory = 
"test_index_compound_directory1"
-            sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
-            create_httplogs_dup_table.call(test_index_compound_directory)
-            
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
-            load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
-            qt_sql "select COUNT() from ${test_index_compound_directory} where 
request match 'gif'"
-            try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
-        } catch(Exception ex) {
-            logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, 
 result: " + ex)
-        } finally {
-            
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
-        }
-        try {
-            def test_index_compound_directory = 
"test_index_compound_directory2"
-            sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
-            create_httplogs_dup_table.call(test_index_compound_directory)
-            
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
-            load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
-            qt_sql "select COUNT() from ${test_index_compound_directory} where 
request match 'images'"
-            try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
-        } finally {
-            
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
-        }
+    def run_test = {String is_enable ->
+        boolean invertedIndexRAMDirEnable = false
+        boolean has_update_be_config = false
         try {
-            def test_index_compound_directory = 
"test_index_compound_directory3"
-            sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
-            create_httplogs_dup_table.call(test_index_compound_directory)
-            
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
-            load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
-            qt_sql "select COUNT() from ${test_index_compound_directory} where 
request match 'png'"
-            try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
+            String backend_id;
+            backend_id = backendId_to_backendIP.keySet()[0]
+            def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+
+            logger.info("Show config: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def configList = parseJson(out.trim())
+            assert configList instanceof List
+
+            for (Object ele in (List) configList) {
+                assert ele instanceof List<String>
+                if (((List<String>) ele)[0] == 
"inverted_index_ram_dir_enable") {
+                    invertedIndexRAMDirEnable = 
Boolean.parseBoolean(((List<String>) ele)[2])
+                    logger.info("inverted_index_ram_dir_enable: 
${((List<String>) ele)[2]}")
+                }
+            }
+            set_be_config.call("inverted_index_ram_dir_enable", is_enable)
+            has_update_be_config = true
+            // check updated config
+            check_config.call("inverted_index_ram_dir_enable", is_enable);
+
+            sql "DROP TABLE IF EXISTS ${testTable_dup}"
+            create_httplogs_dup_table.call(testTable_dup)
+
+            try {
+                
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
+                load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
+            }
+            def res = sql "select COUNT() from ${testTable_dup} where request 
match 'images'"
+            assertEquals(863, res[0][0])
+            sql "TRUNCATE TABLE ${testTable_dup}"
+
+            try {
+                
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
+                load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
+            }
+            res = sql "select COUNT() from ${testTable_dup} where request 
match 'images'"
+            assertEquals(0, res[0][0])
+            sql "TRUNCATE TABLE ${testTable_dup}"
+
+            try {
+                
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
+                load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
+            }
+            res = sql "select COUNT() from ${testTable_dup} where request 
match 'images'"
+            assertEquals(0, res[0][0])
+            sql "TRUNCATE TABLE ${testTable_dup}"
+
+            try {
+                def test_index_compound_directory = 
"test_index_compound_directory1"
+                sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+                create_httplogs_dup_table.call(test_index_compound_directory)
+                
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
+                load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+                res = sql "select COUNT() from 
${test_index_compound_directory} where request match 'gif'"
+                try_sql("DROP TABLE IF EXISTS 
${test_index_compound_directory}")
+            } catch(Exception ex) {
+                assertTrue(ex.toString().contains("failed to initialize 
storage reader"))
+                
logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer,  result: " + 
ex)
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
+            }
+
+            try {
+                def test_index_compound_directory = 
"test_index_compound_directory2"
+                sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+                create_httplogs_dup_table.call(test_index_compound_directory)
+                
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
+                load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+                res = sql "select COUNT() from 
${test_index_compound_directory} where request match 'images'"
+                assertEquals(0, res[0][0])
+                try_sql("DROP TABLE IF EXISTS 
${test_index_compound_directory}")
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
+            }
+
+            try {
+                def test_index_compound_directory = 
"test_index_compound_directory3"
+                sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+                create_httplogs_dup_table.call(test_index_compound_directory)
+                
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
+                load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+                res = sql "select COUNT() from 
${test_index_compound_directory} where request match 'png'"
+                assertEquals(0, res[0][0])
+                try_sql("DROP TABLE IF EXISTS 
${test_index_compound_directory}")
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
+            }
         } finally {
-            
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
+            if (has_update_be_config) {
+                set_be_config.call("inverted_index_ram_dir_enable", 
invertedIndexRAMDirEnable.toString())
+            }
         }
-    } finally {
-        //try_sql("DROP TABLE IF EXISTS ${testTable}")
     }
+
+    run_test.call("false")
+    run_test.call("true")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to