This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f194a0baf1d (cloud-merge) Fix the coredump because of change_cache_type to origin_type (#38518) f194a0baf1d is described below commit f194a0baf1d04e8bb1517f59827314eb7d696577 Author: Lightman <31928846+lchangli...@users.noreply.github.com> AuthorDate: Wed Jul 31 20:16:30 2024 +0800 (cloud-merge) Fix the coredump because of change_cache_type to origin_type (#38518) When call method `change_cache_type_by_mgr`, if the cache_type is same as change_type, it will return true directly and execute the next codes and break the old assumptions. --- be/src/io/cache/block_file_cache.cpp | 66 +++++--- be/src/io/cache/file_block.cpp | 14 +- be/test/io/cache/block_file_cache_test.cpp | 256 ++++++++++++++++++++++++++++- 3 files changed, 306 insertions(+), 30 deletions(-) diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 33858e9ac53..2b96c59fc8f 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -280,6 +280,12 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte DCHECK(!file_blocks.empty()); // change to ttl if the blocks aren't ttl if (context.cache_type == FileCacheType::TTL && _key_to_time.find(hash) == _key_to_time.end()) { + for (auto& [_, cell] : file_blocks) { + Status st = cell.file_block->update_expiration_time(context.expiration_time); + if (!st.ok()) { + LOG_WARNING("Failed to change key meta").error(st); + } + } for (auto& [_, cell] : file_blocks) { FileCacheType origin_type = cell.file_block->cache_type(); if (origin_type == FileCacheType::TTL) continue; @@ -295,9 +301,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte } else { cell.queue_iterator.reset(); } - st = cell.file_block->update_expiration_time(context.expiration_time); - } - if (!st.ok()) { + } else { LOG_WARNING("Failed to change key meta").error(st); } } @@ -324,7 +328,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte } if (context.expiration_time == 0) { for (auto& [_, cell] : file_blocks) { - if (cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL)) { + auto cache_type = cell.file_block->cache_type(); + if (cache_type != FileCacheType::TTL) continue; + auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL); + if (st.ok()) { if (config::enable_ttl_cache_evict_using_lru) { auto& ttl_queue = get_queue(FileCacheType::TTL); ttl_queue.remove(cell.queue_iterator.value(), cache_lock); @@ -333,6 +340,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte cell.queue_iterator = queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(), cell.file_block->range().size(), cache_lock); + } else { + LOG_WARNING("Failed to change key meta").error(st); } } _key_to_time.erase(iter); @@ -681,10 +690,6 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock); auto& offsets = _files[hash]; - DCHECK((context.expiration_time == 0 && context.cache_type != FileCacheType::TTL) || - (context.cache_type == FileCacheType::TTL && context.expiration_time != 0)) - << fmt::format("expiration time {}, cache type {}", context.expiration_time, - context.cache_type); FileCacheKey key; key.hash = hash; @@ -692,11 +697,23 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha key.meta.type = context.cache_type; key.meta.expiration_time = context.expiration_time; FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock); - if (context.cache_type != FileCacheType::TTL || config::enable_ttl_cache_evict_using_lru) { - auto& queue = get_queue(context.cache_type); + Status st; + if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) { + st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL); + } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) { + st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL); + } + if (!st.ok()) { + LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time + << " cache_type=" << cache_type_to_string(context.cache_type) + << " error=" << st.msg(); + } + if (cell.file_block->cache_type() != FileCacheType::TTL || + config::enable_ttl_cache_evict_using_lru) { + auto& queue = get_queue(cell.file_block->cache_type()); cell.queue_iterator = queue.add(hash, offset, size, cache_lock); } - if (context.cache_type == FileCacheType::TTL) { + if (cell.file_block->cache_type() == FileCacheType::TTL) { if (_key_to_time.find(hash) == _key_to_time.end()) { _key_to_time[hash] = context.expiration_time; _time_to_key.insert(std::make_pair(context.expiration_time, hash)); @@ -1005,19 +1022,18 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b } } for (auto& [_, cell] : _files[file_key]) { - if (cell.file_block->cache_type() == FileCacheType::TTL) { - auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL); - if (st.ok()) { - if (config::enable_ttl_cache_evict_using_lru) { - ttl_queue.remove(cell.queue_iterator.value(), cache_lock); - } - auto& queue = get_queue(FileCacheType::NORMAL); - cell.queue_iterator = queue.add( - cell.file_block->get_hash_value(), cell.file_block->offset(), - cell.file_block->range().size(), cache_lock); - } else { - LOG_WARNING("Failed to change cache type to normal").error(st); + if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue; + auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL); + if (st.ok()) { + if (config::enable_ttl_cache_evict_using_lru) { + ttl_queue.remove(cell.queue_iterator.value(), cache_lock); } + auto& queue = get_queue(FileCacheType::NORMAL); + cell.queue_iterator = + queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(), + cell.file_block->range().size(), cache_lock); + } else { + LOG_WARNING("Failed to change cache type to normal").error(st); } } } else { @@ -1579,6 +1595,7 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, for (auto& [_, cell] : _files[hash]) { Status st = cell.file_block->update_expiration_time(new_expiration_time); if (!st.ok()) { + LOG_WARNING("Failed to modify expiration time").error(st); } } @@ -1588,12 +1605,13 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, if (auto iter = _files.find(hash); iter != _files.end()) { for (auto& [_, cell] : iter->second) { Status st = cell.file_block->update_expiration_time(new_expiration_time); - if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) { + if (!st.ok()) { LOG_WARNING("").error(st); } } for (auto& [_, cell] : iter->second) { FileCacheType origin_type = cell.file_block->cache_type(); + if (origin_type == FileCacheType::TTL) continue; auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL); if (st.ok()) { auto& queue = get_queue(origin_type); diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 5985aa95f7a..6586dcf589b 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -25,6 +25,7 @@ #include <thread> #include "common/status.h" +#include "cpp/sync_point.h" #include "io/cache/block_file_cache.h" namespace doris { @@ -162,14 +163,14 @@ Status FileBlock::read(Slice buffer, size_t read_offset) { Status FileBlock::change_cache_type_by_mgr(FileCacheType new_type) { std::lock_guard block_lock(_mutex); - if (new_type == _key.meta.type) { - return Status::OK(); - } + DCHECK(new_type != _key.meta.type); if (_download_state == State::DOWNLOADED) { KeyMeta new_meta; new_meta.expiration_time = _key.meta.expiration_time; new_meta.type = new_type; - RETURN_IF_ERROR(_mgr->_storage->change_key_meta(_key, new_meta)); + auto st = _mgr->_storage->change_key_meta(_key, new_meta); + TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st); + if (!st.ok()) return st; } _key.meta.type = new_type; return Status::OK(); @@ -198,7 +199,10 @@ Status FileBlock::update_expiration_time(uint64_t expiration_time) { KeyMeta new_meta; new_meta.expiration_time = expiration_time; new_meta.type = _key.meta.type; - RETURN_IF_ERROR(_mgr->_storage->change_key_meta(_key, new_meta)); + auto st = _mgr->_storage->change_key_meta(_key, new_meta); + if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) { + return st; + } } _key.meta.expiration_time = expiration_time; return Status::OK(); diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index e4a52c02589..180a0c8f209 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -2676,7 +2676,6 @@ TEST_F(BlockFileCacheTest, append_many_time) { auto holder = cache.get_or_set(key, 0, 5, context); auto blocks = fromHolder(holder); assert_range(1, blocks[0], io::FileBlock::Range(0, 4), io::FileBlock::State::DOWNLOADED); - ASSERT_TRUE(blocks[0]->change_cache_type_by_mgr(FileCacheType::NORMAL).ok()); ASSERT_TRUE(blocks[0]->change_cache_type_self(FileCacheType::INDEX).ok()); if (auto storage = dynamic_cast<FSFileCacheStorage*>(cache._storage.get()); storage != nullptr) { @@ -4319,4 +4318,259 @@ TEST_F(BlockFileCacheTest, reset_capacity) { } } +TEST_F(BlockFileCacheTest, change_cache_type1) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + auto sp = SyncPoint::get_instance(); + sp->set_call_back("FileBlock::change_cache_type", [](auto&& args) { + *try_any_cast<Status*>(args[0]) = Status::IOError("inject io error"); + }); + sp->enable_processing(); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.capacity = 30; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + io::CacheContext context; + context.cache_type = io::FileCacheType::TTL; + context.query_id = query_id; + int64_t cur_time = UnixSeconds(); + context.expiration_time = cur_time + 120; + int64_t modify_time = cur_time + 5; + auto key1 = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + for (int i = 0; i < 100; i++) { + if (cache.get_lazy_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + { + auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(segments[0]); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), + io::FileBlock::State::DOWNLOADED); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL); + EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time); + } + context.cache_type = io::FileCacheType::NORMAL; + context.expiration_time = 0; + { + auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), + io::FileBlock::State::DOWNLOADED); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL); + EXPECT_EQ(segments[0]->expiration_time(), 0); + } + sp->clear_call_back("FileBlock::change_cache_type"); + context.cache_type = io::FileCacheType::TTL; + context.expiration_time = modify_time; + { + auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), + io::FileBlock::State::DOWNLOADED); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL); + EXPECT_EQ(segments[0]->expiration_time(), modify_time); + } + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, change_cache_type2) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + auto sp = SyncPoint::get_instance(); + sp->set_call_back("FileBlock::change_cache_type", [](auto&& args) { + *try_any_cast<Status*>(args[0]) = Status::IOError("inject io error"); + }); + sp->enable_processing(); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.capacity = 30; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + io::CacheContext context; + context.query_id = query_id; + int64_t cur_time = UnixSeconds(); + context.cache_type = io::FileCacheType::NORMAL; + context.expiration_time = 0; + auto key1 = io::BlockFileCache::hash("key1"); + auto key2 = io::BlockFileCache::hash("key2"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + for (int i = 0; i < 100; i++) { + if (cache.get_lazy_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + { + auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(segments[0]); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), + io::FileBlock::State::DOWNLOADED); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL); + EXPECT_EQ(segments[0]->expiration_time(), 0); + } + context.cache_type = io::FileCacheType::TTL; + context.expiration_time = cur_time + 120; + { + auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), + io::FileBlock::State::DOWNLOADED); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL); + EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time); + } + sp->clear_call_back("FileBlock::change_cache_type"); + context.cache_type = io::FileCacheType::NORMAL; + context.expiration_time = 0; + { + auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), + io::FileBlock::State::DOWNLOADED); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL); + EXPECT_EQ(segments[0]->expiration_time(), 0); + } + EXPECT_EQ(cache._normal_queue.queue.size(), 1); + for (int64_t offset = 0; offset < 40; offset += 5) { + auto holder = cache.get_or_set(key2, offset, 5, context); + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(segments[0]); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::DOWNLOADED); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL); + EXPECT_EQ(segments[0]->expiration_time(), 0); + } + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, load_cache1) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + test_file_cache(FileCacheType::NORMAL); + int64_t cur_time = UnixSeconds(); + int64_t expiration_time = cur_time + 120; + auto key1 = io::BlockFileCache::hash("key1"); + + ASSERT_TRUE(global_local_filesystem() + ->rename(cache_base_path + "/" + key1.to_string().substr(0, 3) + "/" + + key1.to_string() + "_0", + cache_base_path + "/" + key1.to_string().substr(0, 3) + "/" + + key1.to_string() + "_" + std::to_string(expiration_time)) + .ok()); + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.capacity = 30; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + for (int i = 0; i < 100; i++) { + if (cache.get_lazy_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + EXPECT_EQ(cache._normal_queue.cache_size, 0); + EXPECT_TRUE(cache._key_to_time.contains(key1)); + auto& offset = cache._files[key1]; + for (auto& [offset, cell] : offset) { + EXPECT_EQ(cell.file_block->cache_type(), FileCacheType::TTL); + std::string cur_path; + if (auto storage = dynamic_cast<FSFileCacheStorage*>(cache._storage.get()); + storage != nullptr) { + std::string dir = + storage->get_path_in_local_cache(key1, cell.file_block->expiration_time()); + cur_path = storage->get_path_in_local_cache(dir, cell.file_block->offset(), + cell.file_block->cache_type()); + } + EXPECT_EQ(cur_path, cache_base_path + key1.to_string().substr(0, 3) + "/" + + key1.to_string() + "_" + std::to_string(expiration_time) + "/" + + std::to_string(offset) + "_ttl"); + } +} + +TEST_F(BlockFileCacheTest, load_cache2) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + test_file_cache(FileCacheType::NORMAL); + auto key1 = io::BlockFileCache::hash("key1"); + + ASSERT_TRUE(global_local_filesystem() + ->rename(cache_base_path + "/" + key1.to_string().substr(0, 3) + "/" + + key1.to_string() + "_0/0", + cache_base_path + "/" + key1.to_string().substr(0, 3) + "/" + + key1.to_string() + "_0/0_ttl") + .ok()); + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.capacity = 30; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + for (int i = 0; i < 100; i++) { + if (cache.get_lazy_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + auto& offset = cache._files[key1]; + for (auto& [offset, cell] : offset) { + EXPECT_EQ(cell.file_block->cache_type(), FileCacheType::NORMAL); + std::string cur_path; + if (auto storage = dynamic_cast<FSFileCacheStorage*>(cache._storage.get()); + storage != nullptr) { + std::string dir = + storage->get_path_in_local_cache(key1, cell.file_block->expiration_time()); + cur_path = storage->get_path_in_local_cache(dir, cell.file_block->offset(), + cell.file_block->cache_type()); + } + EXPECT_EQ(cur_path, cache_base_path + key1.to_string().substr(0, 3) + "/" + + key1.to_string() + "_0/" + std::to_string(offset)); + } +} + } // namespace doris::io --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org