This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ae9c9579062 [enhancement](cloud) optimize block cache lock (#41818) (#43401) ae9c9579062 is described below commit ae9c957906209126d8c08a1afe05a3473d161917 Author: zhengyu <freeman.zhang1...@gmail.com> AuthorDate: Thu Nov 7 15:53:19 2024 +0800 [enhancement](cloud) optimize block cache lock (#41818) (#43401) pick #41818 from master 1. async deletion when do stale rowsets reclycle 2. minimize lock critical size 3. add cache lock held & wait time info for debug --- be/CMakeLists.txt | 4 + be/src/cloud/cloud_tablet.cpp | 2 +- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + be/src/io/cache/block_file_cache.cpp | 106 ++++++++++++++++++++------- be/src/io/cache/block_file_cache.h | 45 +++++++++++- be/src/io/cache/block_file_cache_profile.cpp | 6 +- be/src/io/cache/file_block.cpp | 32 +++++--- be/src/io/cache/fs_file_cache_storage.cpp | 3 +- build.sh | 6 ++ 10 files changed, 168 insertions(+), 42 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index f554ba6053a..4aa94695d30 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -344,6 +344,10 @@ if (ENABLE_INJECTION_POINT) set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DENABLE_INJECTION_POINT") endif() +if (ENABLE_CACHE_LOCK_DEBUG) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DENABLE_CACHE_LOCK_DEBUG") +endif() + # Enable memory tracker, which allows BE to limit the memory of tasks such as query, load, # and compaction,and observe the memory of BE through be_ip:http_port/MemTracker. # Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn off the memory tracker, diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index a5cef7b54d4..71030e3bf26 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -458,7 +458,7 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset // TODO: Segment::file_cache_key auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id); auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); - file_cache->remove_if_cached(file_key); + file_cache->remove_if_cached_async(file_key); } } } diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 48d4565c1d3..7de013bb7a5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -949,6 +949,9 @@ DEFINE_mBool(enable_query_like_bloom_filter, "true"); DEFINE_Int32(doris_remote_scanner_thread_pool_thread_num, "48"); // number of s3 scanner thread pool queue size DEFINE_Int32(doris_remote_scanner_thread_pool_queue_size, "102400"); +DEFINE_mInt64(block_cache_wait_timeout_ms, "1000"); +DEFINE_mInt64(cache_lock_long_tail_threshold, "1000"); +DEFINE_Int64(file_cache_recycle_keys_size, "1000000"); // limit the queue of pending batches which will be sent by a single nodechannel DEFINE_mInt64(nodechannel_pending_queue_max_bytes, "67108864"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 27e697b0c80..e84cdb5a44f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1003,6 +1003,9 @@ DECLARE_mInt64(nodechannel_pending_queue_max_bytes); // The batch size for sending data by brpc streaming client DECLARE_mInt64(brpc_streaming_client_batch_bytes); +DECLARE_mInt64(block_cache_wait_timeout_ms); +DECLARE_mInt64(cache_lock_long_tail_threshold); +DECLARE_Int64(file_cache_recycle_keys_size); DECLARE_Bool(enable_brpc_builtin_services); diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 230f02ef07e..4fb3f3e02cb 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -209,6 +209,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, std::numeric_limits<int>::max()); + _recycle_keys = std::make_shared<boost::lockfree::spsc_queue<FileCacheKey>>( + config::file_cache_recycle_keys_size); if (cache_settings.storage == "memory") { _storage = std::make_unique<MemFileCacheStorage>(); _cache_base_path = "memory"; @@ -253,8 +255,7 @@ FileCacheType BlockFileCache::string_to_cache_type(const std::string& str) { BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder( const TUniqueId& query_id) { - std::lock_guard cache_lock(_mutex); - + SCOPED_CACHE_LOCK(_mutex); if (!config::enable_file_cache_query_limit) { return {}; } @@ -272,7 +273,7 @@ BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context( } void BlockFileCache::remove_query_context(const TUniqueId& query_id) { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); const auto& query_iter = _query_map.find(query_id); if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) { @@ -317,7 +318,7 @@ void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, } Status BlockFileCache::initialize() { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); return initialize_unlocked(cache_lock); } @@ -524,7 +525,7 @@ std::string BlockFileCache::clear_file_cache_async() { int64_t num_cells_to_delete = 0; int64_t num_files_all = 0; { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); if (!_async_clear_file_cache) { for (auto& [_, offset_to_cell] : _files) { ++num_files_all; @@ -760,7 +761,7 @@ FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t o CacheContext& context) { FileBlock::Range range(offset, offset + size - 1); - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); if (auto iter = _key_to_time.find(hash); context.cache_type == FileCacheType::INDEX && iter != _key_to_time.end()) { context.cache_type = FileCacheType::TTL; @@ -836,7 +837,7 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha } size_t BlockFileCache::try_release() { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); std::vector<FileBlockCell*> trash; for (auto& [hash, blocks] : _files) { for (auto& [offset, cell] : blocks) { @@ -901,6 +902,18 @@ void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict, std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); } +void BlockFileCache::remove_file_blocks_async(std::vector<FileBlockCell*>& to_evict, + std::lock_guard<std::mutex>& cache_lock) { + auto remove_file_block_if = [&](FileBlockCell* cell) { + FileBlockSPtr file_block = cell->file_block; + if (file_block) { + std::lock_guard block_lock(file_block->_mutex); + remove(file_block, cache_lock, block_lock, /*sync*/ false); + } + }; + std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); +} + void BlockFileCache::remove_file_blocks_and_clean_time_maps( std::vector<FileBlockCell*>& to_evict, std::lock_guard<std::mutex>& cache_lock) { auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) { @@ -1186,7 +1199,7 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b } void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock); if (!is_ttl_file) { auto iter = _files.find(file_key); @@ -1202,6 +1215,23 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { } } +void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) { + SCOPED_CACHE_LOCK(_mutex); + bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock); + if (!is_ttl_file) { + auto iter = _files.find(file_key); + std::vector<FileBlockCell*> to_remove; + if (iter != _files.end()) { + for (auto& [_, cell] : iter->second) { + if (cell.releasable()) { + to_remove.push_back(&cell); + } + } + } + remove_file_blocks_async(to_remove, cache_lock); + } +} + std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl( FileCacheType cur_cache_type) { switch (cur_cache_type) { @@ -1385,7 +1415,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, template <class T, class U> requires IsXLock<T> && IsXLock<U> -void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock) { +void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) { auto hash = file_block->get_hash_value(); auto offset = file_block->offset(); auto type = file_block->cache_type(); @@ -1405,9 +1435,24 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo key.offset = offset; key.meta.type = type; key.meta.expiration_time = expiration_time; - Status st = _storage->remove(key); - if (!st.ok()) { - LOG_WARNING("").error(st); + if (sync) { + Status st = _storage->remove(key); + if (!st.ok()) { + LOG_WARNING("").error(st); + } + } else { + // the file will be deleted in the bottom half + // so there will be a window that the file is not in the cache but still in the storage + // but it's ok, because the rowset is stale already + // in case something unexpected happen, set the _recycle_keys queue to zero to fallback + bool ret = _recycle_keys->push(key); + if (!ret) { + LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); + Status st = _storage->remove(key); + if (!st.ok()) { + LOG_WARNING("").error(st); + } + } } } _cur_cache_size -= file_block->range().size(); @@ -1422,8 +1467,18 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo *_num_removed_blocks << 1; } +void BlockFileCache::recycle_stale_rowset_async_bottom_half() { + FileCacheKey key; + while (_recycle_keys->pop(key)) { + Status st = _storage->remove(key); + if (!st.ok()) { + LOG_WARNING("").error(st); + } + } +} + size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); return get_used_cache_size_unlocked(cache_type, cache_lock); } @@ -1433,7 +1488,7 @@ size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type, } size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); return get_available_cache_size_unlocked(cache_type, cache_lock); } @@ -1444,7 +1499,7 @@ size_t BlockFileCache::get_available_cache_size_unlocked( } size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); return get_file_blocks_num_unlocked(cache_type, cache_lock); } @@ -1528,7 +1583,7 @@ std::string BlockFileCache::LRUQueue::to_string( } std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); return dump_structure_unlocked(hash, cache_lock); } @@ -1546,7 +1601,7 @@ std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash, } std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); return dump_single_cache_type_unlocked(hash, offset, cache_lock); } @@ -1609,7 +1664,7 @@ std::string BlockFileCache::reset_capacity(size_t new_capacity) { ss << "finish reset_capacity, path=" << _cache_base_path; auto start_time = steady_clock::time_point(); { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); if (new_capacity < _capacity && new_capacity < _cur_cache_size) { int64_t need_remove_size = _cur_cache_size - new_capacity; auto remove_blocks = [&](LRUQueue& queue) -> int64_t { @@ -1722,10 +1777,11 @@ void BlockFileCache::run_background_operation() { break; } } + recycle_stale_rowset_async_bottom_half(); recycle_deleted_blocks(); // gc int64_t cur_time = UnixSeconds(); - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); while (!_time_to_key.empty()) { auto begin = _time_to_key.begin(); if (cur_time < begin->first) { @@ -1771,7 +1827,7 @@ void BlockFileCache::run_background_operation() { void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, uint64_t new_expiration_time) { - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); // 1. If new_expiration_time is equal to zero if (new_expiration_time == 0) { remove_if_ttl_file_unlock(hash, false, cache_lock); @@ -1831,7 +1887,7 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const { int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>( std::chrono::steady_clock::now().time_since_epoch()) .count(); - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta; if (auto iter = _files.find(hash); iter != _files.end()) { for (auto& pair : _files.find(hash)->second) { @@ -1900,7 +1956,7 @@ std::string BlockFileCache::clear_file_cache_directly() { using namespace std::chrono; std::stringstream ss; auto start = steady_clock::now(); - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path); std::string clear_msg; @@ -1938,7 +1994,7 @@ std::string BlockFileCache::clear_file_cache_directly() { std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) { std::map<size_t, FileBlockSPtr> offset_to_block; - std::lock_guard cache_lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); if (_files.contains(hash)) { for (auto& [offset, cell] : _files[hash]) { if (cell.file_block->state() == FileBlock::State::DOWNLOADED) { @@ -1953,7 +2009,7 @@ std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128W } void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) { - std::lock_guard lock(_mutex); + SCOPED_CACHE_LOCK(_mutex); if (auto iter = _files.find(hash); iter != _files.end()) { for (auto& [_, cell] : iter->second) { cell.update_atime(); @@ -2027,5 +2083,5 @@ std::map<std::string, double> BlockFileCache::get_stats_unsafe() { template void BlockFileCache::remove(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock, - std::lock_guard<std::mutex>& block_lock); + std::lock_guard<std::mutex>& block_lock, bool sync); } // namespace doris::io diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index 1511899abe6..0de33dadc82 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -19,6 +19,7 @@ #include <bvar/bvar.h> +#include <boost/lockfree/spsc_queue.hpp> #include <memory> #include <mutex> #include <optional> @@ -27,15 +28,51 @@ #include "io/cache/file_block.h" #include "io/cache/file_cache_common.h" #include "io/cache/file_cache_storage.h" +#include "util/threadpool.h" namespace doris::io { +// Note: the cache_lock is scoped, so do not add do...while(0) here. +#ifdef ENABLE_CACHE_LOCK_DEBUG +#define SCOPED_CACHE_LOCK(MUTEX) \ + std::chrono::time_point<std::chrono::steady_clock> start_time = \ + std::chrono::steady_clock::now(); \ + std::lock_guard cache_lock(MUTEX); \ + std::chrono::time_point<std::chrono::steady_clock> acq_time = \ + std::chrono::steady_clock::now(); \ + auto duration = \ + std::chrono::duration_cast<std::chrono::milliseconds>(acq_time - start_time).count(); \ + if (duration > config::cache_lock_long_tail_threshold) \ + LOG(WARNING) << "Lock wait time " << std::to_string(duration) << "ms. " \ + << get_stack_trace_by_boost() << std::endl; \ + LockScopedTimer cache_lock_timer; +#else +#define SCOPED_CACHE_LOCK(MUTEX) std::lock_guard cache_lock(MUTEX); +#endif + template <class Lock> concept IsXLock = std::same_as<Lock, std::lock_guard<std::mutex>> || std::same_as<Lock, std::unique_lock<std::mutex>>; class FSFileCacheStorage; +class LockScopedTimer { +public: + LockScopedTimer() : start_(std::chrono::steady_clock::now()) {} + + ~LockScopedTimer() { + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start_).count(); + if (duration > 500) { + LOG(WARNING) << "Lock held time " << std::to_string(duration) << "ms. " + << get_stack_trace_by_boost(); + } + } + +private: + std::chrono::time_point<std::chrono::steady_clock> start_; +}; + // The BlockFileCache is responsible for the management of the blocks // The current strategies are lru and ttl. class BlockFileCache { @@ -119,6 +156,7 @@ public: // remove all blocks that belong to the key void remove_if_cached(const UInt128Wrapper& key); + void remove_if_cached_async(const UInt128Wrapper& key); // modify the expiration time about the key void modify_expiration_time(const UInt128Wrapper& key, uint64_t new_expiration_time); @@ -327,7 +365,7 @@ private: template <class T, class U> requires IsXLock<T> && IsXLock<U> - void remove(FileBlockSPtr file_block, T& cache_lock, U& segment_lock); + void remove(FileBlockSPtr file_block, T& cache_lock, U& segment_lock, bool sync = true); FileBlocks get_impl(const UInt128Wrapper& hash, const CacheContext& context, const FileBlock::Range& range, std::lock_guard<std::mutex>& cache_lock); @@ -411,6 +449,8 @@ private: void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&); + void remove_file_blocks_async(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&); + void remove_file_blocks_and_clean_time_maps(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&); @@ -458,6 +498,9 @@ private: LRUQueue _disposable_queue; LRUQueue _ttl_queue; + // keys for async remove + std::shared_ptr<boost::lockfree::spsc_queue<FileCacheKey>> _recycle_keys; + // metrics std::shared_ptr<bvar::Status<size_t>> _cache_capacity_metrics; std::shared_ptr<bvar::Status<size_t>> _cur_cache_size_metrics; diff --git a/be/src/io/cache/block_file_cache_profile.cpp b/be/src/io/cache/block_file_cache_profile.cpp index 68e6c1433de..1759d37f9e4 100644 --- a/be/src/io/cache/block_file_cache_profile.cpp +++ b/be/src/io/cache/block_file_cache_profile.cpp @@ -34,9 +34,9 @@ std::shared_ptr<AtomicStatistics> FileCacheProfile::report() { } void FileCacheProfile::update(FileCacheStatistics* stats) { - { - std::lock_guard lock(_mtx); - if (!_profile) { + if (_profile == nullptr) { + std::lock_guard<std::mutex> lock(_mtx); + if (_profile == nullptr) { _profile = std::make_shared<AtomicStatistics>(); _file_cache_metric = std::make_shared<FileCacheMetric>(this); _file_cache_metric->register_entity(); diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index b015cbd6111..4576b9dbba8 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -144,7 +144,7 @@ Status FileBlock::append(Slice data) { Status FileBlock::finalize() { if (_downloaded_size != 0 && _downloaded_size != _block_range.size()) { - std::lock_guard cache_lock(_mgr->_mutex); + SCOPED_CACHE_LOCK(_mgr->_mutex); size_t old_size = _block_range.size(); _block_range.right = _block_range.left + _downloaded_size - 1; size_t new_size = _block_range.size(); @@ -179,7 +179,7 @@ Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_typ } Status FileBlock::change_cache_type_between_normal_and_index(FileCacheType new_type) { - std::lock_guard cache_lock(_mgr->_mutex); + SCOPED_CACHE_LOCK(_mgr->_mutex); std::lock_guard block_lock(_mutex); bool expr = (new_type != FileCacheType::TTL && _key.meta.type != FileCacheType::TTL); if (!expr) { @@ -223,7 +223,7 @@ FileBlock::State FileBlock::wait() { if (_download_state == State::DOWNLOADING) { DCHECK(_downloader_id != 0 && _downloader_id != get_caller_id()); - _cv.wait_for(block_lock, std::chrono::seconds(1)); + _cv.wait_for(block_lock, std::chrono::milliseconds(config::block_cache_wait_timeout_ms)); } return _download_state; @@ -278,14 +278,24 @@ FileBlocksHolder::~FileBlocksHolder() { auto& file_block = *current_file_block_it; BlockFileCache* _mgr = file_block->_mgr; { - std::lock_guard cache_lock(_mgr->_mutex); - std::lock_guard block_lock(file_block->_mutex); - file_block->complete_unlocked(block_lock); - if (file_block.use_count() == 2) { - DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING); - // one in cache, one in here - if (file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) { - _mgr->remove(file_block, cache_lock, block_lock); + bool should_remove = false; + { + std::lock_guard block_lock(file_block->_mutex); + file_block->complete_unlocked(block_lock); + if (file_block.use_count() == 2 && + file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) { + should_remove = true; + } + } + if (should_remove) { + SCOPED_CACHE_LOCK(_mgr->_mutex); + std::lock_guard block_lock(file_block->_mutex); + if (file_block.use_count() == 2) { + DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING); + // one in cache, one in here + if (file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) { + _mgr->remove(file_block, cache_lock, block_lock); + } } } } diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index ecdf04c8830..bacd0820c66 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -471,7 +471,8 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const std::vector<BatchLoadArgs> batch_load_buffer; batch_load_buffer.reserve(scan_length); auto add_cell_batch_func = [&]() { - std::lock_guard cache_lock(_mgr->_mutex); + SCOPED_CACHE_LOCK(_mgr->_mutex); + auto f = [&](const BatchLoadArgs& args) { // in async load mode, a cell may be added twice. if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) { diff --git a/build.sh b/build.sh index db9a2c8d4c0..6f3ddfa236f 100755 --- a/build.sh +++ b/build.sh @@ -442,6 +442,10 @@ if [[ -z "${ENABLE_INJECTION_POINT}" ]]; then ENABLE_INJECTION_POINT='OFF' fi +if [[ -z "${ENABLE_CACHE_LOCK_DEBUG}" ]]; then + ENABLE_CACHE_LOCK_DEBUG='OFF' +fi + if [[ -z "${RECORD_COMPILER_SWITCHES}" ]]; then RECORD_COMPILER_SWITCHES='OFF' fi @@ -488,6 +492,7 @@ echo "Get params: USE_JEMALLOC -- ${USE_JEMALLOC} USE_BTHREAD_SCANNER -- ${USE_BTHREAD_SCANNER} ENABLE_INJECTION_POINT -- ${ENABLE_INJECTION_POINT} + ENABLE_CACHE_LOCK_DEBUG -- ${ENABLE_CACHE_LOCK_DEBUG} DENABLE_CLANG_COVERAGE -- ${DENABLE_CLANG_COVERAGE} DISPLAY_BUILD_TIME -- ${DISPLAY_BUILD_TIME} ENABLE_PCH -- ${ENABLE_PCH} @@ -574,6 +579,7 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then -DCMAKE_EXPORT_COMPILE_COMMANDS=ON \ -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \ -DENABLE_INJECTION_POINT="${ENABLE_INJECTION_POINT}" \ + -DENABLE_CACHE_LOCK_DEBUG="${ENABLE_CACHE_LOCK_DEBUG}" \ -DMAKE_TEST=OFF \ -DBUILD_FS_BENCHMARK="${BUILD_FS_BENCHMARK}" \ ${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org