This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this
push:
new 5b44b322af7 [fix](filecache) Fix LRU persist queue thread-safe (#53046)
5b44b322af7 is described below
commit 5b44b322af7785b43eedad5cb99f7a4ed51e11b2
Author: zhengyu <[email protected]>
AuthorDate: Mon Jul 14 19:19:27 2025 +0800
[fix](filecache) Fix LRU persist queue thread-safe (#53046)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
1.fix LRU queue crash use after free
2.fix extra LRU queue info when 'need_to_move' flag unset
3.use concurrent queueu to record queueu change info for thread safety
```
ERROR: AddressSanitizer: heap-use-after-free on address 0x603005548c40 at
pc 0x55f28e8c4785 bp 0x7f603582e1f0 sp 0x7f603582e1e8
READ of size 8 at 0x603005548c40 thread T201
#0 0x55f28e8c4784 in std::_Head_base<0ul, doris::io::CacheLRULog*,
false>::_Head_base<doris::io::CacheLRULog*>(doris::io::CacheLRULog*&&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/tuple:190:17
#1 0x55f28e8c4784 in std::_Tuple_impl<0ul, doris::io::CacheLRULog*,
std::default_delete<doris::io::CacheLRULog>>::_Tuple_impl(std::_Tuple_impl<0ul,
doris::io::CacheLRULog*, std::default_delete<doris::io::CacheLRULog>>&&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/tuple:292:2
#2 0x55f28e8c4784 in std::tuple<doris::io::CacheLRULog*,
std::default_delete<doris::io::CacheLRULog>>::tuple(std::tuple<doris::io::CacheLRULog*,
std::default_delete<doris::io::CacheLRULog>>&&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/tuple:1079:17
#3 0x55f28e8c4784 in std::_uniq_ptr_impl<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>::uniq_ptr_impl(std::_uniq_ptr_impl<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>&&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:162:9
#4 0x55f28e8c4784 in std::_uniq_ptr_data<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>, true,
true>::uniq_ptr_data(std::_uniq_ptr_data<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>, true, true>&&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:211:7
#5 0x55f28e8c4784 in std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>::unique_ptr(std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>&&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:327:7
#6 0x55f28e8c4784 in
doris::io::LRUQueueRecorder::replay_queue_event(doris::io::FileCacheType)
/root/doris/be/src/io/cache/lru_queue_recorder.cpp:40:20
#7 0x55f28e82d620 in
doris::io::BlockFileCache::run_background_lru_log_replay()
/root/doris/be/src/io/cache/block_file_cache.cpp:2242:24
#8 0x55f2cdc2720f in execute_native_thread_routine
/data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc+-v3/src/c11/../../../../../libstdc-v3/src/c+11/thread.cc:82:18
#9 0x7f61f1842608 in start_thread
/build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8
#10 0x7f61f1aef132 in __clone
/build/glibc-SzIz7B/glibc-2.31/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95
0x603005548c40 is located 16 bytes inside of 24-byte region
[0x603005548c30,0x603005548c48)
freed by thread T201 here:
#0 0x55f28e51680d in operator delete(void*)
(/home/work/unlimit_teamcity/TeamCity/Agents/20250708205944agent_172.16.0.48_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x3975a80d)
(BuildId: 8b6ba6101e736655)
#1 0x55f28e8c3ce0 in
std::__cxx11::list<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>,
std::allocator<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>::pop_front()
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_list.h:1198:15
#2 0x55f28e8c3ce0 in
doris::io::LRUQueueRecorder::replay_queue_event(doris::io::FileCacheType)
/root/doris/be/src/io/cache/lru_queue_recorder.cpp:41:19
#3 0x55f28e82d620 in
doris::io::BlockFileCache::run_background_lru_log_replay()
/root/doris/be/src/io/cache/block_file_cache.cpp:2242:24
#4 0x55f2cdc2720f in execute_native_thread_routine
/data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc+-v3/src/c11/../../../../../libstdc-v3/src/c+11/thread.cc:82:18
previously allocated by thread T607 (CumuCompactionT) here:
#0 0x55f28e515fad in operator new(unsigned long)
(/home/work/unlimit_teamcity/TeamCity/Agents/20250708205944agent_172.16.0.48_1/work/60183217f6ee2a9c/output/be/lib/doris_be+0x39759fad)
(BuildId: 8b6ba6101e736655)
#1 0x55f28e8c660d in
__gnu_cxx::new_allocator<std::_List_node<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>::allocate(unsigned long, void
const*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/new_allocator.h:121:27
#2 0x55f28e8c660d in
std::allocator<std::_List_node<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>::allocate(unsigned long)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/allocator.h:173:32
#3 0x55f28e8c660d in
std::allocator_traits<std::allocator<std::_List_node<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>>::allocate(std::allocator<std::_List_node<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>&, unsigned long)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:460:20
#4 0x55f28e8c660d in
std::__cxx11::_List_base<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>,
std::allocator<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>::_M_get_node()
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_list.h:442:16
#5 0x55f28e8c660d in
std::List_node<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>*
std::_cxx11::list<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>,
std::allocator<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>::_M_create_node<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>(std::unique_ptr<doris::io::CacheLRULog,
std::default_d [...]
#6 0x55f28e8c660d in void
std::__cxx11::list<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>,
std::allocator<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>::_M_insert<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>(std::_List_iterator<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>,
std::unique_ptr<doris::io::CacheLRULog, std::def [...]
#7 0x55f28e8c3522 in
std::__cxx11::list<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>,
std::allocator<std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>>>::push_back(std::unique_ptr<doris::io::CacheLRULog,
std::default_delete<doris::io::CacheLRULog>>&&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_list.h:1217:15
#8 0x55f28e8c3522 in
doris::io::LRUQueueRecorder::record_queue_event(doris::io::FileCacheType,
doris::io::CacheLRULogType, doris::io::UInt128Wrapper, unsigned long, unsigned
long) /root/doris/be/src/io/cache/lru_queue_recorder.cpp:29:15
#9 0x55f28e82f09b in
doris::io::BlockFileCache::use_cell(doris::io::BlockFileCache::FileBlockCell
const&, std::__cxx11::list<std::shared_ptr<doris::io::FileBlock>,
std::allocator<std::shared_ptr<doris::io::FileBlock>>>*, bool,
std::lock_guard<std::mutex>&)
/root/doris/be/src/io/cache/block_file_cache.cpp:380:20
#10 0x55f28e833d1b in
doris::io::BlockFileCache::get_impl[abi:cxx11](doris::io::UInt128Wrapper
const&, doris::io::CacheContext const&, doris::io::FileBlock::Range const&,
std::lock_guard<std::mutex>&)
/root/doris/be/src/io/cache/block_file_cache.cpp:572:13
#11 0x55f28e83b4ef in
doris::io::BlockFileCache::get_or_set(doris::io::UInt128Wrapper const&,
unsigned long, unsigned long, doris::io::CacheContext&)
/root/doris/be/src/io/cache/block_file_cache.cpp:762:27
#12 0x55f28e7ffcee in
doris::io::CachedRemoteFileReader::read_at_impl(unsigned long, doris::Slice,
unsigned long*, doris::io::IOContext const*)
/root/doris/be/src/io/cache/cached_remote_file_reader.cpp:191:21
#13 0x55f28e7f8017 in doris::io::FileReader::read_at(unsigned long,
doris::Slice, unsigned long*, doris::io::IOContext const*)
/root/doris/be/src/io/fs/file_reader.cpp:34:17
```
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [x] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---------
Signed-off-by: zhengyu <[email protected]>
---
be/src/io/cache/block_file_cache.cpp | 12 ++++++++----
be/src/io/cache/lru_queue_recorder.cpp | 12 ++++++------
be/src/io/cache/lru_queue_recorder.h | 6 +++++-
be/test/io/cache/block_file_cache_test.cpp | 2 ++
be/test/io/cache/block_file_cache_test_lru_dump.cpp | 16 ++++++++--------
5 files changed, 29 insertions(+), 19 deletions(-)
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 4522db10601..a4453b9d937 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -376,10 +376,10 @@ void BlockFileCache::use_cell(const FileBlockCell& cell,
FileBlocks* result, boo
/// Move to the end of the queue. The iterator remains valid.
if (cell.queue_iterator && move_iter_flag) {
queue.move_to_end(*cell.queue_iterator, cache_lock);
+ _lru_recorder->record_queue_event(cell.file_block->cache_type(),
+ CacheLRULogType::MOVETOBACK,
cell.file_block->_key.hash,
+ cell.file_block->_key.offset,
cell.size());
}
- _lru_recorder->record_queue_event(cell.file_block->cache_type(),
CacheLRULogType::MOVETOBACK,
- cell.file_block->_key.hash,
cell.file_block->_key.offset,
- cell.size());
cell.update_atime();
}
@@ -1546,7 +1546,11 @@ bool LRUQueue::contains(const UInt128Wrapper& hash,
size_t offset,
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& /* cache_lock
*/) const {
- return map.find(std::make_pair(hash, offset))->second;
+ auto itr = map.find(std::make_pair(hash, offset));
+ if (itr != map.end()) {
+ return itr->second;
+ }
+ return std::list<FileKeyAndOffset>::iterator();
}
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */)
const {
diff --git a/be/src/io/cache/lru_queue_recorder.cpp
b/be/src/io/cache/lru_queue_recorder.cpp
index 12da29d42b7..c26c19ec371 100644
--- a/be/src/io/cache/lru_queue_recorder.cpp
+++ b/be/src/io/cache/lru_queue_recorder.cpp
@@ -26,7 +26,7 @@ void LRUQueueRecorder::record_queue_event(FileCacheType type,
CacheLRULogType lo
const UInt128Wrapper hash, const
size_t offset,
const size_t size) {
CacheLRULogQueue& log_queue = get_lru_log_queue(type);
- log_queue.push_back(std::make_unique<CacheLRULog>(log_type, hash, offset,
size));
+ log_queue.enqueue(std::make_unique<CacheLRULog>(log_type, hash, offset,
size));
++(_lru_queue_update_cnt_from_last_dump[type]);
}
@@ -36,9 +36,8 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type)
{
LRUQueue& shadow_queue = get_shadow_queue(type);
std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
- while (!log_queue.empty()) {
- auto log = std::move(log_queue.front());
- log_queue.pop_front();
+ std::unique_ptr<CacheLRULog> log;
+ while (log_queue.try_dequeue(log)) {
try {
switch (log->type) {
case CacheLRULogType::ADD: {
@@ -47,7 +46,7 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type)
{
}
case CacheLRULogType::REMOVE: {
auto it = shadow_queue.get(log->hash, log->offset,
lru_log_lock);
- if (it != shadow_queue.end()) {
+ if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
shadow_queue.remove(it, lru_log_lock);
} else {
LOG(WARNING) << "REMOVE failed, doesn't exist in shadow
queue";
@@ -55,8 +54,9 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type)
{
break;
}
case CacheLRULogType::MOVETOBACK: {
+ LOG(INFO) << "MOVETOBACK" << log->hash.to_string() << " " <<
log->offset;
auto it = shadow_queue.get(log->hash, log->offset,
lru_log_lock);
- if (it != shadow_queue.end()) {
+ if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
shadow_queue.move_to_end(it, lru_log_lock);
} else {
LOG(WARNING) << "MOVETOBACK failed, doesn't exist in
shadow queue";
diff --git a/be/src/io/cache/lru_queue_recorder.h
b/be/src/io/cache/lru_queue_recorder.h
index dceef7a493c..1f6d69493cf 100644
--- a/be/src/io/cache/lru_queue_recorder.h
+++ b/be/src/io/cache/lru_queue_recorder.h
@@ -17,6 +17,10 @@
#pragma once
+#include <concurrentqueue.h>
+
+#include <boost/lockfree/spsc_queue.hpp>
+
#include "io/cache/file_cache_common.h"
namespace doris::io {
@@ -40,7 +44,7 @@ struct CacheLRULog {
: type(t), hash(h), offset(o), size(s) {}
};
-using CacheLRULogQueue = std::list<std::unique_ptr<CacheLRULog>>;
+using CacheLRULogQueue =
moodycamel::ConcurrentQueue<std::unique_ptr<CacheLRULog>>;
class LRUQueueRecorder {
public:
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index fbb5bdeae46..743abeb8986 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3909,6 +3909,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
}
EXPECT_TRUE(reader.close().ok());
EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3969,6 +3970,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_init) {
CachedRemoteFileReader reader(local_reader, opts);
EXPECT_EQ(reader._cache->get_base_path(), cache_base_path);
}
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index ea3cb63601e..eabf4829f05 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -156,10 +156,10 @@ TEST_F(BlockFileCacheTest,
test_lru_log_record_replay_dump_restore) {
ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 500000);
// all queue are filled, let's check the lru log records
- ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size(), 5);
- ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size(), 5);
- ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size(), 5);
- ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size(), 5);
+ ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5);
+ ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 5);
+ ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 5);
+ ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5);
// then check the log replay
std::this_thread::sleep_for(std::chrono::milliseconds(
@@ -175,10 +175,10 @@ TEST_F(BlockFileCacheTest,
test_lru_log_record_replay_dump_restore) {
context2); // move index queue 3rd
element to the end
cache.remove_if_cached(key3); // remove all element from
ttl queue
}
- ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size(), 5);
- ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size(), 1);
- ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size(), 0);
- ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size(), 0);
+ ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5);
+ ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 1);
+ ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 0);
+ ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 0);
std::this_thread::sleep_for(std::chrono::milliseconds(
2 * config::file_cache_background_lru_log_replay_interval_ms));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]