This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this 
push:
     new 5b44b322af7 [fix](filecache) Fix LRU persist queue thread-safe (#53046)
5b44b322af7 is described below

commit 5b44b322af7785b43eedad5cb99f7a4ed51e11b2
Author: zhengyu <[email protected]>
AuthorDate: Mon Jul 14 19:19:27 2025 +0800

    [fix](filecache) Fix LRU persist queue thread-safe (#53046)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    1.fix LRU queue crash use after free
    2.fix extra LRU queue info when 'need_to_move' flag unset
    3.use concurrent queueu to record queueu change info for thread safety
    
    ```
    ERROR: AddressSanitizer: heap-use-after-free on address 0x603005548c40 at 
pc 0x55f28e8c4785 bp 0x7f603582e1f0 sp 0x7f603582e1e8
    READ of size 8 at 0x603005548c40 thread T201
        #0 0x55f28e8c4784 in std::_Head_base<0ul, doris::io::CacheLRULog*, 
false>::_Head_base<doris::io::CacheLRULog*>(doris::io::CacheLRULog*&&) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/tuple:190:17
        #1 0x55f28e8c4784 in std::_Tuple_impl<0ul, doris::io::CacheLRULog*, 
std::default_delete<doris::io::CacheLRULog>>::_Tuple_impl(std::_Tuple_impl<0ul, 
doris::io::CacheLRULog*, std::default_delete<doris::io::CacheLRULog>>&&) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/tuple:292:2
        #2 0x55f28e8c4784 in std::tuple<doris::io::CacheLRULog*, 
std::default_delete<doris::io::CacheLRULog>>::tuple(std::tuple<doris::io::CacheLRULog*,
 std::default_delete<doris::io::CacheLRULog>>&&) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/tuple:1079:17
        #3 0x55f28e8c4784 in std::_uniq_ptr_impl<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>::uniq_ptr_impl(std::_uniq_ptr_impl<doris::io::CacheLRULog,
 std::default_delete<doris::io::CacheLRULog>>&&) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:162:9
        #4 0x55f28e8c4784 in std::_uniq_ptr_data<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>, true, 
true>::uniq_ptr_data(std::_uniq_ptr_data<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>, true, true>&&) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:211:7
        #5 0x55f28e8c4784 in std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>::unique_ptr(std::unique_ptr<doris::io::CacheLRULog,
 std::default_delete<doris::io::CacheLRULog>>&&) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:327:7
        #6 0x55f28e8c4784 in 
doris::io::LRUQueueRecorder::replay_queue_event(doris::io::FileCacheType) 
/root/doris/be/src/io/cache/lru_queue_recorder.cpp:40:20
        #7 0x55f28e82d620 in 
doris::io::BlockFileCache::run_background_lru_log_replay() 
/root/doris/be/src/io/cache/block_file_cache.cpp:2242:24
        #8 0x55f2cdc2720f in execute_native_thread_routine 
/data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc+-v3/src/c11/../../../../../libstdc-v3/src/c+11/thread.cc:82:18
        #9 0x7f61f1842608 in start_thread 
/build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8
        #10 0x7f61f1aef132 in __clone 
/build/glibc-SzIz7B/glibc-2.31/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95
    
    0x603005548c40 is located 16 bytes inside of 24-byte region 
[0x603005548c30,0x603005548c48)
    freed by thread T201 here:
        #0 0x55f28e51680d in operator delete(void*) 
(/home/work/unlimit_teamcity/TeamCity/Agents/20250708205944agent_172.16.0.48_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x3975a80d)
 (BuildId: 8b6ba6101e736655)
        #1 0x55f28e8c3ce0 in 
std::__cxx11::list<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>, 
std::allocator<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>>>::pop_front() 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_list.h:1198:15
        #2 0x55f28e8c3ce0 in 
doris::io::LRUQueueRecorder::replay_queue_event(doris::io::FileCacheType) 
/root/doris/be/src/io/cache/lru_queue_recorder.cpp:41:19
        #3 0x55f28e82d620 in 
doris::io::BlockFileCache::run_background_lru_log_replay() 
/root/doris/be/src/io/cache/block_file_cache.cpp:2242:24
        #4 0x55f2cdc2720f in execute_native_thread_routine 
/data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc+-v3/src/c11/../../../../../libstdc-v3/src/c+11/thread.cc:82:18
    
    previously allocated by thread T607 (CumuCompactionT) here:
        #0 0x55f28e515fad in operator new(unsigned long) 
(/home/work/unlimit_teamcity/TeamCity/Agents/20250708205944agent_172.16.0.48_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x39759fad)
 (BuildId: 8b6ba6101e736655)
        #1 0x55f28e8c660d in 
__gnu_cxx::new_allocator<std::_List_node<std::unique_ptr<doris::io::CacheLRULog,
 std::default_delete<doris::io::CacheLRULog>>>>::allocate(unsigned long, void 
const*) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/new_allocator.h:121:27
        #2 0x55f28e8c660d in 
std::allocator<std::_List_node<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>>>::allocate(unsigned long) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/allocator.h:173:32
        #3 0x55f28e8c660d in 
std::allocator_traits<std::allocator<std::_List_node<std::unique_ptr<doris::io::CacheLRULog,
 
std::default_delete<doris::io::CacheLRULog>>>>>::allocate(std::allocator<std::_List_node<std::unique_ptr<doris::io::CacheLRULog,
 std::default_delete<doris::io::CacheLRULog>>>>&, unsigned long) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:460:20
        #4 0x55f28e8c660d in 
std::__cxx11::_List_base<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>, 
std::allocator<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>>>::_M_get_node() 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_list.h:442:16
        #5 0x55f28e8c660d in 
std::List_node<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>>* 
std::_cxx11::list<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>, 
std::allocator<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>>>::_M_create_node<std::unique_ptr<doris::io::CacheLRULog,
 
std::default_delete<doris::io::CacheLRULog>>>(std::unique_ptr<doris::io::CacheLRULog,
 std::default_d [...]
        #6 0x55f28e8c660d in void 
std::__cxx11::list<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>, 
std::allocator<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>>>::_M_insert<std::unique_ptr<doris::io::CacheLRULog,
 
std::default_delete<doris::io::CacheLRULog>>>(std::_List_iterator<std::unique_ptr<doris::io::CacheLRULog,
 std::default_delete<doris::io::CacheLRULog>>>, 
std::unique_ptr<doris::io::CacheLRULog, std::def [...]
        #7 0x55f28e8c3522 in 
std::__cxx11::list<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>, 
std::allocator<std::unique_ptr<doris::io::CacheLRULog, 
std::default_delete<doris::io::CacheLRULog>>>>::push_back(std::unique_ptr<doris::io::CacheLRULog,
 std::default_delete<doris::io::CacheLRULog>>&&) 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_list.h:1217:15
        #8 0x55f28e8c3522 in 
doris::io::LRUQueueRecorder::record_queue_event(doris::io::FileCacheType, 
doris::io::CacheLRULogType, doris::io::UInt128Wrapper, unsigned long, unsigned 
long) /root/doris/be/src/io/cache/lru_queue_recorder.cpp:29:15
        #9 0x55f28e82f09b in 
doris::io::BlockFileCache::use_cell(doris::io::BlockFileCache::FileBlockCell 
const&, std::__cxx11::list<std::shared_ptr<doris::io::FileBlock>, 
std::allocator<std::shared_ptr<doris::io::FileBlock>>>*, bool, 
std::lock_guard<std::mutex>&) 
/root/doris/be/src/io/cache/block_file_cache.cpp:380:20
        #10 0x55f28e833d1b in 
doris::io::BlockFileCache::get_impl[abi:cxx11](doris::io::UInt128Wrapper 
const&, doris::io::CacheContext const&, doris::io::FileBlock::Range const&, 
std::lock_guard<std::mutex>&) 
/root/doris/be/src/io/cache/block_file_cache.cpp:572:13
        #11 0x55f28e83b4ef in 
doris::io::BlockFileCache::get_or_set(doris::io::UInt128Wrapper const&, 
unsigned long, unsigned long, doris::io::CacheContext&) 
/root/doris/be/src/io/cache/block_file_cache.cpp:762:27
        #12 0x55f28e7ffcee in 
doris::io::CachedRemoteFileReader::read_at_impl(unsigned long, doris::Slice, 
unsigned long*, doris::io::IOContext const*) 
/root/doris/be/src/io/cache/cached_remote_file_reader.cpp:191:21
        #13 0x55f28e7f8017 in doris::io::FileReader::read_at(unsigned long, 
doris::Slice, unsigned long*, doris::io::IOContext const*) 
/root/doris/be/src/io/fs/file_reader.cpp:34:17
    ```
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [x] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ---------
    
    Signed-off-by: zhengyu <[email protected]>
---
 be/src/io/cache/block_file_cache.cpp                | 12 ++++++++----
 be/src/io/cache/lru_queue_recorder.cpp              | 12 ++++++------
 be/src/io/cache/lru_queue_recorder.h                |  6 +++++-
 be/test/io/cache/block_file_cache_test.cpp          |  2 ++
 be/test/io/cache/block_file_cache_test_lru_dump.cpp | 16 ++++++++--------
 5 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index 4522db10601..a4453b9d937 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -376,10 +376,10 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, 
FileBlocks* result, boo
     /// Move to the end of the queue. The iterator remains valid.
     if (cell.queue_iterator && move_iter_flag) {
         queue.move_to_end(*cell.queue_iterator, cache_lock);
+        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
+                                          CacheLRULogType::MOVETOBACK, 
cell.file_block->_key.hash,
+                                          cell.file_block->_key.offset, 
cell.size());
     }
-    _lru_recorder->record_queue_event(cell.file_block->cache_type(), 
CacheLRULogType::MOVETOBACK,
-                                      cell.file_block->_key.hash, 
cell.file_block->_key.offset,
-                                      cell.size());
 
     cell.update_atime();
 }
@@ -1546,7 +1546,11 @@ bool LRUQueue::contains(const UInt128Wrapper& hash, 
size_t offset,
 
 LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
                                  std::lock_guard<std::mutex>& /* cache_lock 
*/) const {
-    return map.find(std::make_pair(hash, offset))->second;
+    auto itr = map.find(std::make_pair(hash, offset));
+    if (itr != map.end()) {
+        return itr->second;
+    }
+    return std::list<FileKeyAndOffset>::iterator();
 }
 
 std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) 
const {
diff --git a/be/src/io/cache/lru_queue_recorder.cpp 
b/be/src/io/cache/lru_queue_recorder.cpp
index 12da29d42b7..c26c19ec371 100644
--- a/be/src/io/cache/lru_queue_recorder.cpp
+++ b/be/src/io/cache/lru_queue_recorder.cpp
@@ -26,7 +26,7 @@ void LRUQueueRecorder::record_queue_event(FileCacheType type, 
CacheLRULogType lo
                                           const UInt128Wrapper hash, const 
size_t offset,
                                           const size_t size) {
     CacheLRULogQueue& log_queue = get_lru_log_queue(type);
-    log_queue.push_back(std::make_unique<CacheLRULog>(log_type, hash, offset, 
size));
+    log_queue.enqueue(std::make_unique<CacheLRULog>(log_type, hash, offset, 
size));
     ++(_lru_queue_update_cnt_from_last_dump[type]);
 }
 
@@ -36,9 +36,8 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) 
{
     LRUQueue& shadow_queue = get_shadow_queue(type);
 
     std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
-    while (!log_queue.empty()) {
-        auto log = std::move(log_queue.front());
-        log_queue.pop_front();
+    std::unique_ptr<CacheLRULog> log;
+    while (log_queue.try_dequeue(log)) {
         try {
             switch (log->type) {
             case CacheLRULogType::ADD: {
@@ -47,7 +46,7 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) 
{
             }
             case CacheLRULogType::REMOVE: {
                 auto it = shadow_queue.get(log->hash, log->offset, 
lru_log_lock);
-                if (it != shadow_queue.end()) {
+                if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
                     shadow_queue.remove(it, lru_log_lock);
                 } else {
                     LOG(WARNING) << "REMOVE failed, doesn't exist in shadow 
queue";
@@ -55,8 +54,9 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) 
{
                 break;
             }
             case CacheLRULogType::MOVETOBACK: {
+                LOG(INFO) << "MOVETOBACK" << log->hash.to_string() << " " << 
log->offset;
                 auto it = shadow_queue.get(log->hash, log->offset, 
lru_log_lock);
-                if (it != shadow_queue.end()) {
+                if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
                     shadow_queue.move_to_end(it, lru_log_lock);
                 } else {
                     LOG(WARNING) << "MOVETOBACK failed, doesn't exist in 
shadow queue";
diff --git a/be/src/io/cache/lru_queue_recorder.h 
b/be/src/io/cache/lru_queue_recorder.h
index dceef7a493c..1f6d69493cf 100644
--- a/be/src/io/cache/lru_queue_recorder.h
+++ b/be/src/io/cache/lru_queue_recorder.h
@@ -17,6 +17,10 @@
 
 #pragma once
 
+#include <concurrentqueue.h>
+
+#include <boost/lockfree/spsc_queue.hpp>
+
 #include "io/cache/file_cache_common.h"
 
 namespace doris::io {
@@ -40,7 +44,7 @@ struct CacheLRULog {
             : type(t), hash(h), offset(o), size(s) {}
 };
 
-using CacheLRULogQueue = std::list<std::unique_ptr<CacheLRULog>>;
+using CacheLRULogQueue = 
moodycamel::ConcurrentQueue<std::unique_ptr<CacheLRULog>>;
 
 class LRUQueueRecorder {
 public:
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index fbb5bdeae46..743abeb8986 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3909,6 +3909,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_error_handle) {
     }
     EXPECT_TRUE(reader.close().ok());
     EXPECT_TRUE(reader.closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -3969,6 +3970,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_init) {
         CachedRemoteFileReader reader(local_reader, opts);
         EXPECT_EQ(reader._cache->get_base_path(), cache_base_path);
     }
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp 
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index ea3cb63601e..eabf4829f05 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -156,10 +156,10 @@ TEST_F(BlockFileCacheTest, 
test_lru_log_record_replay_dump_restore) {
     ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 500000);
 
     // all queue are filled, let's check the lru log records
-    ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size(), 5);
-    ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size(), 5);
-    ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size(), 5);
-    ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size(), 5);
+    ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5);
+    ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 5);
+    ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 5);
+    ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5);
 
     // then check the log replay
     std::this_thread::sleep_for(std::chrono::milliseconds(
@@ -175,10 +175,10 @@ TEST_F(BlockFileCacheTest, 
test_lru_log_record_replay_dump_restore) {
                                        context2); // move index queue 3rd 
element to the end
         cache.remove_if_cached(key3);             // remove all element from 
ttl queue
     }
-    ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size(), 5);
-    ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size(), 1);
-    ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size(), 0);
-    ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size(), 0);
+    ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5);
+    ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 1);
+    ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 0);
+    ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 0);
 
     std::this_thread::sleep_for(std::chrono::milliseconds(
             2 * config::file_cache_background_lru_log_replay_interval_ms));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to