This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4c540f5e0e9 [fix](cloud) fix file cache potential leakage (#46561) 4c540f5e0e9 is described below commit 4c540f5e0e9c6cf78c3b6ada59b95e7b8563a928 Author: zhengyu <zhangzhen...@selectdb.com> AuthorDate: Thu Jan 23 19:18:53 2025 +0800 [fix](cloud) fix file cache potential leakage (#46561) 将原来的 同步/异步删除 cache meta + 同步/异步删除 cache data file 多维度的删除策略降维简化: 所有 cache meta 都是同步删除(除正在使用,此case处理方式见下文),data file在 critical 场景同步删除、gc 场景下异步删除 异步清理调度的优化: - 之前的调度逻辑会提前中断,导致清理效率低下 - 甚至调度会有概率进入某些状态导致清理无法继续进行 - 优化 CPU 使用,避免额外无效队列遍历 - 增加窗口算法对异步删除 data file 进行 qps 限制 优化标记删除: - 之前的标记删除机制对 TTL data file 有两个方面的空间泄漏问题 - 扩展应用场景:从原来只能用于 clear_cache、reset_capacity缩容,扩展任意异步删除的场景 - 将新的标记删除机制 除应用在 正在引用的数据之外,还解决了 DOWNLOADING 状态数据的删除泄漏问题 fix 删除正在引用的数据过程的多处泄漏: - 之前没有机制对于正在引用的数据进行标记删除,只能放任赦免 - 现在配合优化后的标记删除机制,使用析构函数在释放引用后自动删除 发现并修复队列操作中存在的内存写飞隐患 - reset_capacity 在迭代内部 erase容器条目,可能会导致指针悬空 其它小优化: - 使用 concurrentqueue 代替之前的静态无锁队列:保持性能的同时减少队列满、进入同步删文件带来的 IO burst 及伴随的 cache lock 开销 - 清理弃用的 file_cache_ttl_valid_check_interval_second 配置:现在 ttl 支持 LRU 了,不用额外定时清理 - 多线程拆分:避免 metrics、resource limit、data file 清理、ttl 超时清理 相互影响 Signed-off-by: zhengyu <zhangzhen...@selectdb.com> --- be/src/cloud/cloud_storage_engine.cpp | 31 --- be/src/cloud/cloud_storage_engine.h | 1 - be/src/cloud/cloud_tablet_mgr.cpp | 4 +- be/src/common/config.cpp | 3 +- be/src/common/config.h | 4 +- be/src/io/cache/block_file_cache.cpp | 317 +++++++++++++---------------- be/src/io/cache/block_file_cache.h | 40 ++-- be/src/io/cache/file_block.cpp | 8 +- be/src/io/cache/file_block.h | 5 + be/src/olap/compaction.cpp | 2 +- be/test/io/cache/block_file_cache_test.cpp | 108 ++-------- 11 files changed, 196 insertions(+), 327 deletions(-) diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 650909a2915..766f83563f7 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -300,42 +300,11 @@ Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sp "StorageEngine", "lease_compaction_thread", [this]() { this->_lease_compaction_thread_callback(); }, &_bg_threads.emplace_back())); - if (config::file_cache_ttl_valid_check_interval_second != 0) { - RETURN_IF_ERROR(Thread::create( - "StorageEngine", "check_file_cache_ttl_block_valid_thread", - [this]() { this->_check_file_cache_ttl_block_valid(); }, - &_bg_threads.emplace_back())); - LOG(INFO) << "check file cache ttl block valid thread started"; - } - LOG(INFO) << "lease compaction thread started"; return Status::OK(); } -void CloudStorageEngine::_check_file_cache_ttl_block_valid() { - int64_t interval_seconds = config::file_cache_ttl_valid_check_interval_second / 2; - auto check_ttl = [](const std::weak_ptr<CloudTablet>& tablet_wk) { - auto tablet = tablet_wk.lock(); - if (!tablet) return; - if (tablet->tablet_meta()->ttl_seconds() == 0) return; - auto rowsets = tablet->get_snapshot_rowset(); - for (const auto& rowset : rowsets) { - int64_t ttl_seconds = tablet->tablet_meta()->ttl_seconds(); - if (rowset->newest_write_timestamp() + ttl_seconds <= UnixSeconds()) continue; - for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); seg_id++) { - auto hash = Segment::file_cache_key(rowset->rowset_id().to_string(), seg_id); - auto* file_cache = io::FileCacheFactory::instance()->get_by_path(hash); - file_cache->update_ttl_atime(hash); - } - } - }; - while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval_seconds))) { - auto weak_tablets = tablet_mgr().get_weak_tablets(); - std::for_each(weak_tablets.begin(), weak_tablets.end(), check_ttl); - } -} - void CloudStorageEngine::sync_storage_vault() { cloud::StorageVaultInfos vault_infos; bool enable_storage_vault = false; diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 2cd47c52dbe..34bde2e75f7 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -72,7 +72,6 @@ public: ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const { return *_calc_tablet_delete_bitmap_task_thread_pool; } - void _check_file_cache_ttl_block_valid(); std::optional<StorageResource> get_storage_resource(const std::string& vault_id) { VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index f60d0eeb5ba..d597ccc42a1 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -238,7 +238,9 @@ void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) { num_vacuumed += t->delete_expired_stale_rowsets(); } - LOG_INFO("finish vacuum stale rowsets").tag("num_vacuumed", num_vacuumed); + LOG_INFO("finish vacuum stale rowsets") + .tag("num_vacuumed", num_vacuumed) + .tag("num_tablets", tablets_to_vacuum.size()); } std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() { diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2ffd1085345..39549d18986 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1050,7 +1050,6 @@ DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88"); DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80"); DEFINE_mBool(enable_read_cache_file_directly, "false"); DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true"); -DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not checking // If true, evict the ttl cache using LRU when full. // Otherwise, only expiration can evict ttl and new data won't add to cache when full. DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true"); @@ -1058,8 +1057,8 @@ DEFINE_mBool(enbale_dump_error_file, "false"); // limit the max size of error log on disk DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB DEFINE_mInt64(cache_lock_long_tail_threshold, "1000"); -DEFINE_Int64(file_cache_recycle_keys_size, "1000000"); DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false"); +DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000"); DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 6c84d89faba..7e177bb7236 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1090,7 +1090,6 @@ DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent); DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent); DECLARE_mBool(enable_read_cache_file_directly); DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size); -DECLARE_mInt64(file_cache_ttl_valid_check_interval_second); // If true, evict the ttl cache using LRU when full. // Otherwise, only expiration can evict ttl and new data won't add to cache when full. DECLARE_Bool(enable_ttl_cache_evict_using_lru); @@ -1098,7 +1097,6 @@ DECLARE_mBool(enbale_dump_error_file); // limit the max size of error log on disk DECLARE_mInt64(file_cache_error_log_limit_bytes); DECLARE_mInt64(cache_lock_long_tail_threshold); -DECLARE_Int64(file_cache_recycle_keys_size); // Base compaction may retrieve and produce some less frequently accessed data, // potentially affecting the file cache hit rate. // This configuration determines whether to retain the output within the file cache. @@ -1106,7 +1104,7 @@ DECLARE_Int64(file_cache_recycle_keys_size); // If your file cache is ample enough to accommodate all the data in your database, // enable this option; otherwise, it is recommended to leave it disabled. DECLARE_mBool(enable_file_cache_keep_base_compaction_output); - +DECLARE_mInt64(file_cache_remove_block_qps_limit); // inverted index searcher cache // cache entry stay time after lookup DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s); diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index b5f48d09648..59bb1becb5d 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -202,6 +202,11 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>( _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0); + _storage_sync_remove_latency = std::make_shared<bvar::LatencyRecorder>( + _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_ns"); + _storage_async_remove_latency = std::make_shared<bvar::LatencyRecorder>( + _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_ns"); + _disposable_queue = LRUQueue(cache_settings.disposable_queue_size, cache_settings.disposable_queue_elements, 60 * 60); _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements, @@ -210,9 +215,6 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, 24 * 60 * 60); _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, std::numeric_limits<int>::max()); - - _recycle_keys = std::make_shared<boost::lockfree::spsc_queue<FileCacheKey>>( - config::file_cache_recycle_keys_size); if (cache_settings.storage == "memory") { _storage = std::make_unique<MemFileCacheStorage>(); _cache_base_path = "memory"; @@ -328,7 +330,9 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo DCHECK(!_is_initialized); _is_initialized = true; RETURN_IF_ERROR(_storage->init(this)); - _cache_background_thread = std::thread(&BlockFileCache::run_background_operation, this); + _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this); + _cache_background_ttl_gc_thread = std::thread(&BlockFileCache::run_background_ttl_gc, this); + _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this); return Status::OK(); } @@ -346,7 +350,6 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, boo } cell.update_atime(); - cell.is_deleted = false; } template <class T> @@ -534,130 +537,48 @@ std::string BlockFileCache::clear_file_cache_async() { LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path; int64_t num_cells_all = 0; int64_t num_cells_to_delete = 0; + int64_t num_cells_wait_recycle = 0; int64_t num_files_all = 0; + TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async"); { SCOPED_CACHE_LOCK(_mutex); - if (!_async_clear_file_cache) { - for (auto& [_, offset_to_cell] : _files) { - ++num_files_all; - for (auto& [_, cell] : offset_to_cell) { - ++num_cells_all; - if (cell.releasable()) { - cell.is_deleted = true; - ++num_cells_to_delete; - } - } + + std::vector<FileBlockCell*> deleting_cells; + for (auto& [_, offset_to_cell] : _files) { + ++num_files_all; + for (auto& [_, cell] : offset_to_cell) { + ++num_cells_all; + deleting_cells.push_back(&cell); + } + } + + // we cannot delete the element in the loop above, because it will break the iterator + for (auto& cell : deleting_cells) { + if (!cell->releasable()) { + LOG(INFO) << "cell is not releasable, hash=" + << " offset=" << cell->file_block->offset(); + cell->file_block->set_deleting(); + ++num_cells_wait_recycle; + continue; + } + FileBlockSPtr file_block = cell->file_block; + if (file_block) { + std::lock_guard block_lock(file_block->_mutex); + remove(file_block, cache_lock, block_lock, false); + ++num_cells_to_delete; } - _async_clear_file_cache = true; } } std::stringstream ss; ss << "finish clear_file_cache_async, path=" << _cache_base_path << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all - << " num_cells_to_delete=" << num_cells_to_delete; + << " num_cells_to_delete=" << num_cells_to_delete + << " num_cells_wait_recycle=" << num_cells_wait_recycle; auto msg = ss.str(); LOG(INFO) << msg; return msg; } -void BlockFileCache::recycle_deleted_blocks() { - using namespace std::chrono; - static int remove_batch = 100; - TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", &remove_batch); - TEST_SYNC_POINT_CALLBACK("BlockFileCache::recycle_deleted_blocks"); - std::unique_lock cache_lock(_mutex); - auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) { - std::lock_guard segment_lock(cell->file_block->_mutex); - remove(cell->file_block, cache_lock, segment_lock); - }; - int i = 0; - std::condition_variable cond; - auto start_time = steady_clock::time_point(); - if (_async_clear_file_cache) { - LOG_INFO("Start clear file cache async").tag("path", _cache_base_path); - auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) { - std::lock_guard segment_lock(cell->file_block->_mutex); - remove(cell->file_block, cache_lock, segment_lock); - }; - static int remove_batch = 100; - TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", &remove_batch); - int i = 0; - std::condition_variable cond; - auto iter_queue = [&](LRUQueue& queue) { - bool end = false; - while (queue.get_capacity(cache_lock) != 0 && !end) { - std::vector<FileBlockCell*> cells; - for (const auto& [entry_key, entry_offset, _] : queue) { - if (i == remove_batch) { - i = 0; - break; - } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); - if (!cell) continue; - if (!cell->is_deleted) { - end = true; - break; - } else if (cell->releasable()) { - i++; - cells.push_back(cell); - } - } - std::ranges::for_each(cells, remove_file_block); - // just for sleep - cond.wait_for(cache_lock, std::chrono::microseconds(100)); - } - }; - iter_queue(get_queue(FileCacheType::DISPOSABLE)); - iter_queue(get_queue(FileCacheType::NORMAL)); - iter_queue(get_queue(FileCacheType::INDEX)); - } - if (_async_clear_file_cache || config::file_cache_ttl_valid_check_interval_second != 0) { - std::vector<UInt128Wrapper> ttl_keys; - ttl_keys.reserve(_key_to_time.size()); - for (auto& [key, _] : _key_to_time) { - ttl_keys.push_back(key); - } - for (UInt128Wrapper& hash : ttl_keys) { - if (i >= remove_batch) { - // just for sleep - cond.wait_for(cache_lock, std::chrono::microseconds(100)); - i = 0; - } - if (auto iter = _files.find(hash); iter != _files.end()) { - std::vector<FileBlockCell*> cells; - cells.reserve(iter->second.size()); - for (auto& [_, cell] : iter->second) { - cell.is_deleted = - cell.is_deleted - ? true - : (config::file_cache_ttl_valid_check_interval_second == 0 - ? false - : std::chrono::duration_cast<std::chrono::seconds>( - std::chrono::steady_clock::now() - .time_since_epoch()) - .count() - - cell.atime > - config::file_cache_ttl_valid_check_interval_second); - if (!cell.is_deleted) { - continue; - } else if (cell.releasable()) { - cells.emplace_back(&cell); - i++; - } - } - std::ranges::for_each(cells, remove_file_block); - } - } - if (_async_clear_file_cache) { - _async_clear_file_cache = false; - auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - start_time); - LOG_INFO("End clear file cache async") - .tag("path", _cache_base_path) - .tag("use_time", static_cast<int64_t>(use_time.count())); - } - } -} - FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash, const CacheContext& context, size_t offset, size_t size, FileBlock::State state, @@ -866,6 +787,8 @@ size_t BlockFileCache::try_release() { for (auto& [offset, cell] : blocks) { if (cell.releasable()) { trash.emplace_back(&cell); + } else { + cell.file_block->set_deleting(); } } } @@ -914,24 +837,12 @@ const BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) co } void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict, - std::lock_guard<std::mutex>& cache_lock) { + std::lock_guard<std::mutex>& cache_lock, bool sync) { auto remove_file_block_if = [&](FileBlockCell* cell) { FileBlockSPtr file_block = cell->file_block; if (file_block) { std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); -} - -void BlockFileCache::remove_file_blocks_async(std::vector<FileBlockCell*>& to_evict, - std::lock_guard<std::mutex>& cache_lock) { - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock, /*sync*/ false); + remove(file_block, cache_lock, block_lock, sync); } }; std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); @@ -1100,8 +1011,8 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& return true; } -bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, bool remove_directly, - std::lock_guard<std::mutex>& cache_lock) { +bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, bool remove_directly, + std::lock_guard<std::mutex>& cache_lock, bool sync) { auto& ttl_queue = get_queue(FileCacheType::TTL); if (auto iter = _key_to_time.find(file_key); _key_to_time.find(file_key) != _key_to_time.end()) { @@ -1136,13 +1047,13 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b if (cell.releasable()) { to_remove.push_back(&cell); } else { - cell.is_deleted = true; + cell.file_block->set_deleting(); } } std::for_each(to_remove.begin(), to_remove.end(), [&](FileBlockCell* cell) { FileBlockSPtr file_block = cell->file_block; std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); + remove(file_block, cache_lock, block_lock, sync); }); } // remove from _time_to_key @@ -1161,9 +1072,11 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b return false; } +// remove specific cache synchronously, for critical operations +// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { SCOPED_CACHE_LOCK(_mutex); - bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock); + bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, true); if (!is_ttl_file) { auto iter = _files.find(file_key); std::vector<FileBlockCell*> to_remove; @@ -1171,16 +1084,21 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { for (auto& [_, cell] : iter->second) { if (cell.releasable()) { to_remove.push_back(&cell); + } else { + cell.file_block->set_deleting(); } } } - remove_file_blocks(to_remove, cache_lock); + remove_file_blocks(to_remove, cache_lock, true); } } +// the async version of remove_if_cached, for background operations +// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously +// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) { SCOPED_CACHE_LOCK(_mutex); - bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock); + bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, /*sync*/ false); if (!is_ttl_file) { auto iter = _files.find(file_key); std::vector<FileBlockCell*> to_remove; @@ -1188,10 +1106,12 @@ void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) { for (auto& [_, cell] : iter->second) { if (cell.releasable()) { to_remove.push_back(&cell); + } else { + cell.file_block->set_deleting(); } } } - remove_file_blocks_async(to_remove, cache_lock); + remove_file_blocks(to_remove, cache_lock, false); } } @@ -1281,7 +1201,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_time_interval( } *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type; } - remove_file_blocks(to_evict, cache_lock); + remove_file_blocks(to_evict, cache_lock, true); return !is_overflow(removed_size, size, cur_cache_size); } @@ -1319,7 +1239,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size( cur_removed_size); *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size; } - remove_file_blocks(to_evict, cache_lock); + remove_file_blocks(to_evict, cache_lock, true); return !is_overflow(removed_size, size, cur_cache_size); } @@ -1362,7 +1282,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, size_t cur_removed_size = 0; find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, cur_removed_size); - remove_file_blocks(to_evict, cache_lock); + remove_file_blocks(to_evict, cache_lock, true); *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size; if (is_overflow(removed_size, size, cur_cache_size)) { @@ -1385,6 +1305,7 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo auto expiration_time = file_block->expiration_time(); auto* cell = get_cell(hash, offset, cache_lock); DCHECK(cell); + DCHECK(cell->queue_iterator); if (cell->queue_iterator) { auto& queue = get_queue(file_block->cache_type()); queue.remove(*cell->queue_iterator, cache_lock); @@ -1392,14 +1313,17 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())] << file_block->range().size(); *_total_evict_size_metrics << file_block->range().size(); - if (cell->file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) { + if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) { FileCacheKey key; key.hash = hash; key.offset = offset; key.meta.type = type; key.meta.expiration_time = expiration_time; if (sync) { + int64_t duration_ns = 0; + SCOPED_RAW_TIMER(&duration_ns); Status st = _storage->remove(key); + *_storage_sync_remove_latency << duration_ns; if (!st.ok()) { LOG_WARNING("").error(st); } @@ -1407,16 +1331,21 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo // the file will be deleted in the bottom half // so there will be a window that the file is not in the cache but still in the storage // but it's ok, because the rowset is stale already - // in case something unexpected happen, set the _recycle_keys queue to zero to fallback - bool ret = _recycle_keys->push(key); + bool ret = _recycle_keys.enqueue(key); if (!ret) { LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); + int64_t duration_ns = 0; + SCOPED_RAW_TIMER(&duration_ns); Status st = _storage->remove(key); + *_storage_sync_remove_latency << duration_ns; if (!st.ok()) { LOG_WARNING("").error(st); } } } + } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) { + file_block->set_deleting(); + return; } _cur_cache_size -= file_block->range().size(); if (FileCacheType::TTL == type) { @@ -1430,16 +1359,6 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo *_num_removed_blocks << 1; } -void BlockFileCache::recycle_stale_rowset_async_bottom_half() { - FileCacheKey key; - while (_recycle_keys->pop(key)) { - Status st = _storage->remove(key); - if (!st.ok()) { - LOG_WARNING("").error(st); - } - } -} - size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const { SCOPED_CACHE_LOCK(_mutex); return get_used_cache_size_unlocked(cache_type, cache_lock); @@ -1632,14 +1551,25 @@ std::string BlockFileCache::reset_capacity(size_t new_capacity) { int64_t need_remove_size = _cur_cache_size - new_capacity; auto remove_blocks = [&](LRUQueue& queue) -> int64_t { int64_t queue_released = 0; + std::vector<FileBlockCell*> to_evict; for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (need_remove_size <= 0) return queue_released; - auto* cell = get_cell(entry_key, entry_offset, cache_lock); - if (!cell->releasable()) continue; - cell->is_deleted = true; + if (need_remove_size <= 0) { + break; + } need_remove_size -= entry_size; space_released += entry_size; queue_released += entry_size; + auto* cell = get_cell(entry_key, entry_offset, cache_lock); + if (!cell->releasable()) { + cell->file_block->set_deleting(); + continue; + } + to_evict.push_back(cell); + } + for (auto& cell : to_evict) { + FileBlockSPtr file_block = cell->file_block; + std::lock_guard block_lock(file_block->_mutex); + remove(file_block, cache_lock, block_lock); } return queue_released; }; @@ -1649,22 +1579,11 @@ std::string BlockFileCache::reset_capacity(size_t new_capacity) { ss << " normal_queue released " << queue_released; queue_released = remove_blocks(_index_queue); ss << " index_queue released " << queue_released; - if (need_remove_size >= 0) { - queue_released = 0; - for (auto& [_, key] : _time_to_key) { - for (auto& [_, cell] : _files[key]) { - if (need_remove_size <= 0) break; - cell.is_deleted = true; - need_remove_size -= cell.file_block->range().size(); - space_released += cell.file_block->range().size(); - queue_released += cell.file_block->range().size(); - } - } - ss << " ttl_queue released " << queue_released; - } + queue_released = remove_blocks(_ttl_queue); + ss << " ttl_queue released " << queue_released; + _disk_resource_limit_mode = true; _disk_limit_mode_metrics->set_value(1); - _async_clear_file_cache = true; ss << " total_space_released=" << space_released; } old_capacity = _capacity; @@ -1729,7 +1648,7 @@ void BlockFileCache::check_disk_resource_limit() { } } -void BlockFileCache::run_background_operation() { +void BlockFileCache::run_background_monitor() { int64_t interval_time_seconds = 20; while (!_close) { TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds); @@ -1777,10 +1696,20 @@ void BlockFileCache::run_background_operation() { _num_read_blocks_1h->get_value()); } } + } +} - recycle_stale_rowset_async_bottom_half(); - recycle_deleted_blocks(); - // gc +void BlockFileCache::run_background_ttl_gc() { // TODO(zhengyu): fix! + int64_t interval_time_seconds = 20; + while (!_close) { + TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds); + { + std::unique_lock close_lock(_close_mtx); + _close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds)); + if (_close) { + break; + } + } { int64_t cur_time = UnixSeconds(); SCOPED_CACHE_LOCK(_mutex); @@ -1789,9 +1718,41 @@ void BlockFileCache::run_background_operation() { if (cur_time < begin->first) { break; } - remove_if_ttl_file_unlock(begin->second, false, cache_lock); + remove_if_ttl_file_blocks(begin->second, false, cache_lock, false); + } + } + } +} + +void BlockFileCache::run_background_gc() { + FileCacheKey key; + static const size_t interval_ms = 100; + const size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000; + size_t batch_count = 0; + while (!_close) { + { + std::unique_lock close_lock(_close_mtx); + _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); + if (_close) { + break; + } + } + while (_recycle_keys.try_dequeue(key)) { + if (batch_count >= batch_limit) { + break; + } + + int64_t duration_ns = 0; + SCOPED_RAW_TIMER(&duration_ns); + Status st = _storage->remove(key); + *_storage_async_remove_latency << duration_ns; + + if (!st.ok()) { + LOG_WARNING("").error(st); } + batch_count++; } + batch_count = 0; } } @@ -1800,7 +1761,7 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, SCOPED_CACHE_LOCK(_mutex); // 1. If new_expiration_time is equal to zero if (new_expiration_time == 0) { - remove_if_ttl_file_unlock(hash, false, cache_lock); + remove_if_ttl_file_blocks(hash, false, cache_lock, false); return; } // 2. If the hash in ttl cache, modify its expiration time. @@ -1917,7 +1878,7 @@ bool BlockFileCache::try_reserve_during_async_load(size_t size, if (index_queue_size != 0) { collect_eliminate_fragments(get_queue(FileCacheType::INDEX)); } - remove_file_blocks(to_evict, cache_lock); + remove_file_blocks(to_evict, cache_lock, true); return !_disk_resource_limit_mode || removed_size >= size; } diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index f23d5a3799e..f93a72cbc62 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -18,6 +18,7 @@ #pragma once #include <bvar/bvar.h> +#include <concurrentqueue.h> #include <boost/lockfree/spsc_queue.hpp> #include <memory> @@ -31,7 +32,7 @@ #include "util/threadpool.h" namespace doris::io { - +using RecycleFileCacheKeys = moodycamel::ConcurrentQueue<FileCacheKey>; // Note: the cache_lock is scoped, so do not add do...while(0) here. #ifdef ENABLE_CACHE_LOCK_DEBUG #define SCOPED_CACHE_LOCK(MUTEX) \ @@ -95,8 +96,14 @@ public: _close = true; } _close_cv.notify_all(); - if (_cache_background_thread.joinable()) { - _cache_background_thread.join(); + if (_cache_background_monitor_thread.joinable()) { + _cache_background_monitor_thread.join(); + } + if (_cache_background_ttl_gc_thread.joinable()) { + _cache_background_ttl_gc_thread.join(); + } + if (_cache_background_gc_thread.joinable()) { + _cache_background_gc_thread.join(); } } @@ -336,7 +343,6 @@ private: std::optional<LRUQueue::Iterator> queue_iterator; mutable int64_t atime {0}; - mutable bool is_deleted {false}; void update_atime() const { atime = std::chrono::duration_cast<std::chrono::seconds>( std::chrono::steady_clock::now().time_since_epoch()) @@ -425,12 +431,12 @@ private: bool need_to_move(FileCacheType cell_type, FileCacheType query_type) const; - bool remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, bool remove_directly, - std::lock_guard<std::mutex>&); - - void run_background_operation(); + bool remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, bool remove_directly, + std::lock_guard<std::mutex>&, bool sync); - void recycle_deleted_blocks(); + void run_background_monitor(); + void run_background_ttl_gc(); + void run_background_gc(); bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, @@ -443,9 +449,7 @@ private: bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size) const; - void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&); - - void remove_file_blocks_async(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&); + void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&, bool sync); void remove_file_blocks_and_clean_time_maps(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&); @@ -454,8 +458,6 @@ private: size_t& removed_size, std::vector<FileBlockCell*>& to_evict, std::lock_guard<std::mutex>& cache_lock, size_t& cur_removed_size); - void recycle_stale_rowset_async_bottom_half(); - // info std::string _cache_base_path; size_t _capacity = 0; @@ -467,9 +469,10 @@ private: bool _close {false}; std::mutex _close_mtx; std::condition_variable _close_cv; - std::thread _cache_background_thread; + std::thread _cache_background_monitor_thread; + std::thread _cache_background_ttl_gc_thread; + std::thread _cache_background_gc_thread; std::atomic_bool _async_open_done {false}; - bool _async_clear_file_cache {false}; // disk space or inode is less than the specified value bool _disk_resource_limit_mode {false}; bool _is_initialized {false}; @@ -495,7 +498,7 @@ private: LRUQueue _ttl_queue; // keys for async remove - std::shared_ptr<boost::lockfree::spsc_queue<FileCacheKey>> _recycle_keys; + RecycleFileCacheKeys _recycle_keys; // metrics std::shared_ptr<bvar::Status<size_t>> _cache_capacity_metrics; @@ -529,6 +532,9 @@ private: std::shared_ptr<bvar::Status<double>> _hit_ratio_5m; std::shared_ptr<bvar::Status<double>> _hit_ratio_1h; std::shared_ptr<bvar::Status<size_t>> _disk_limit_mode_metrics; + + std::shared_ptr<bvar::LatencyRecorder> _storage_sync_remove_latency; + std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency; }; } // namespace doris::io diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 44cad5520ea..8c911ab8f24 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -287,7 +287,8 @@ FileBlocksHolder::~FileBlocksHolder() { std::lock_guard block_lock(file_block->_mutex); file_block->complete_unlocked(block_lock); if (file_block.use_count() == 2 && - file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) { + (file_block->is_deleting() || + file_block->state_unlock(block_lock) == FileBlock::State::EMPTY)) { should_remove = true; } } @@ -297,8 +298,9 @@ FileBlocksHolder::~FileBlocksHolder() { if (file_block.use_count() == 2) { DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING); // one in cache, one in here - if (file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) { - _mgr->remove(file_block, cache_lock, block_lock); + if (file_block->is_deleting() || + file_block->state_unlock(block_lock) == FileBlock::State::EMPTY) { + _mgr->remove(file_block, cache_lock, block_lock, false); } } } diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h index 3a4490d67a3..93c3841693d 100644 --- a/be/src/io/cache/file_block.h +++ b/be/src/io/cache/file_block.h @@ -130,6 +130,10 @@ public: FileBlock& operator=(const FileBlock&) = delete; FileBlock(const FileBlock&) = delete; + // block is being using by other thread when deleting, so tag it is_deleting and delete later on¬ + void set_deleting() { _is_deleting = true; } + bool is_deleting() const { return _is_deleting; }; + private: std::string get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) const; @@ -155,6 +159,7 @@ private: std::condition_variable _cv; FileCacheKey _key; size_t _downloaded_size {0}; + bool _is_deleting {false}; }; extern std::ostream& operator<<(std::ostream& os, const FileBlock::State& value); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 8ef296607eb..ade099f0f2d 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1493,7 +1493,7 @@ void CloudCompactionMixin::garbage_collection() { for (const auto& [_, file_writer] : beta_rowset_writer->get_file_writers()) { auto file_key = io::BlockFileCache::hash(file_writer->path().filename().native()); auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); - file_cache->remove_if_cached(file_key); + file_cache->remove_if_cached_async(file_key); } } } diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index d8c2dbe384c..117f01d63e3 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -2919,22 +2919,18 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) { auto key = io::BlockFileCache::hash("key1"); io::BlockFileCache cache(cache_base_path, settings); auto sp = SyncPoint::get_instance(); + FileBlocksHolder* holder; SyncPoint::CallbackGuard guard1; + // use first block before clean cache sp->set_call_back( - "BlockFileCache::set_sleep_time", - [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; }, &guard1); - SyncPoint::CallbackGuard guard2; - sp->set_call_back( - "BlockFileCache::set_remove_batch", - [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; }, &guard2); - SyncPoint::CallbackGuard guard3; - sp->set_call_back( - "BlockFileCache::recycle_deleted_blocks", + "BlockFileCache::clear_file_cache_async", [&](auto&&) { context.cache_type = io::FileCacheType::NORMAL; - cache.get_or_set(key, 0, 5, context); + FileBlocksHolder h = cache.get_or_set(key, 0, 5, context); + holder = new FileBlocksHolder(std::move(h)); }, - &guard3); + &guard1); + sp->enable_processing(); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { @@ -2956,13 +2952,12 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) { io::FileBlock::State::DOWNLOADED); } cache.clear_file_cache_async(); - while (cache._async_clear_file_cache) - ; - EXPECT_EQ(cache._cur_cache_size, 20); // 0-4 is used again, so all the cache data in DISPOSABLE - // remain unremoved + + EXPECT_EQ(cache._cur_cache_size, 5); // only one block is used, other is cleared if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } + delete holder; } TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) { @@ -2992,6 +2987,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) { io::BlockFileCache cache(cache_base_path, settings); context.cache_type = io::FileCacheType::TTL; context.expiration_time = UnixSeconds() + 3600; + FileBlocksHolder* holder; auto sp = SyncPoint::get_instance(); SyncPoint::CallbackGuard guard1; sp->set_call_back( @@ -3003,10 +2999,11 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) { [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; }, &guard2); SyncPoint::CallbackGuard guard3; sp->set_call_back( - "BlockFileCache::recycle_deleted_blocks", + "BlockFileCache::clear_file_cache_async", [&](auto&&) { context.cache_type = io::FileCacheType::NORMAL; - cache.get_or_set(key, 0, 5, context); + FileBlocksHolder h = cache.get_or_set(key, 0, 5, context); + holder = new FileBlocksHolder(std::move(h)); }, &guard3); sp->enable_processing(); @@ -3040,12 +3037,12 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) { io::FileBlock::State::DOWNLOADED); } cache.clear_file_cache_async(); - while (cache._async_clear_file_cache) - ; + EXPECT_EQ(cache._cur_cache_size, 5); if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } + delete holder; } TEST_F(BlockFileCacheTest, remove_directly) { @@ -3175,8 +3172,7 @@ TEST_F(BlockFileCacheTest, test_factory_1) { io::FileBlock::State::DOWNLOADED); } FileCacheFactory::instance()->clear_file_caches(false); - while (cache->_async_clear_file_cache) - ; + EXPECT_EQ(cache->_cur_cache_size, 0); for (int64_t offset = 0; offset < 60; offset += 5) { @@ -4868,73 +4864,6 @@ TEST_F(BlockFileCacheTest, remove_from_other_queue_2) { } } -TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) { - config::file_cache_ttl_valid_check_interval_second = 4; - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } - fs::create_directories(cache_base_path); - TUniqueId query_id; - query_id.hi = 1; - query_id.lo = 1; - io::FileCacheSettings settings; - settings.query_queue_size = 30; - settings.query_queue_elements = 5; - settings.index_queue_size = 30; - settings.index_queue_elements = 5; - settings.disposable_queue_size = 30; - settings.disposable_queue_elements = 5; - settings.capacity = 90; - settings.max_file_block_size = 30; - settings.max_query_cache_size = 30; - io::CacheContext context; - ReadStatistics rstats; - context.stats = &rstats; - context.query_id = query_id; - auto key = io::BlockFileCache::hash("key1"); - io::BlockFileCache cache(cache_base_path, settings); - context.cache_type = io::FileCacheType::TTL; - context.expiration_time = UnixSeconds() + 3600; - auto sp = SyncPoint::get_instance(); - Defer defer {[sp] { - sp->clear_call_back("BlockFileCache::set_remove_batch"); - sp->clear_call_back("BlockFileCache::recycle_deleted_blocks"); - sp->clear_call_back("BlockFileCache::set_sleep_time"); - }}; - sp->set_call_back("BlockFileCache::set_sleep_time", - [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; }); - sp->set_call_back("BlockFileCache::set_remove_batch", - [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; }); - sp->set_call_back("BlockFileCache::recycle_deleted_blocks", - [&](auto&&) { cache.get_or_set(key, 0, 5, context); }); - sp->enable_processing(); - ASSERT_TRUE(cache.initialize()); - for (int i = 0; i < 100; i++) { - if (cache.get_async_open_success()) { - break; - }; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - for (int64_t offset = 0; offset < 60; offset += 5) { - auto holder = cache.get_or_set(key, offset, 5, context); - auto segments = fromHolder(holder); - ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::EMPTY); - ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download(segments[0]); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::DOWNLOADED); - } - std::this_thread::sleep_for( - std::chrono::seconds(config::file_cache_ttl_valid_check_interval_second + 2)); - config::file_cache_ttl_valid_check_interval_second = 0; - EXPECT_EQ(cache._cur_cache_size, 5); - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } -} - TEST_F(BlockFileCacheTest, reset_capacity) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); @@ -5004,8 +4933,7 @@ TEST_F(BlockFileCacheTest, reset_capacity) { io::FileBlock::State::DOWNLOADED); } std::cout << cache.reset_capacity(30) << std::endl; - while (cache._async_clear_file_cache) - ; + EXPECT_EQ(cache._cur_cache_size, 30); if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org