freemandealer commented on code in PR #43440: URL: https://github.com/apache/doris/pull/43440#discussion_r1834352909
########## be/src/io/cache/block_file_cache.cpp: ########## @@ -2084,4 +2084,28 @@ std::map<std::string, double> BlockFileCache::get_stats_unsafe() { template void BlockFileCache::remove(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock, std::lock_guard<std::mutex>& block_lock, bool sync); + +std::vector<std::string> BlockFileCache::check_file_cache_consistency() { + std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks; + std::vector<InconsistentCacheContext> inconsistent_cache_context; + std::lock_guard<std::mutex> lock(_mutex); + _storage->check_consistency(this, confirmed_blocks, inconsistent_cache_context, lock); + for (const auto& [hash, offset_to_cell] : _files) { + for (const auto& [offset, cell] : offset_to_cell) { + if (cell.is_deleted || confirmed_blocks.contains({hash, offset})) { Review Comment: is_deleted works as a flag, the file is not deleted yet ########## be/src/io/cache/fs_file_cache_storage.cpp: ########## @@ -588,6 +588,114 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2"); } +void FSFileCacheStorage::check_consistency( + BlockFileCache* _mgr, + std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash>& confirmed_blocks, + std::vector<InconsistentCacheContext>& inconsistent_cache_context, + std::lock_guard<std::mutex>& cache_lock) const { + std::error_code ec; + auto check = [_mgr, &cache_lock, &confirmed_blocks, &inconsistent_cache_context, + this](std::filesystem::directory_iterator key_it) { + for (; key_it != std::filesystem::directory_iterator(); ++key_it) { + auto key_with_suffix = key_it->path().filename().native(); + auto delim_pos = key_with_suffix.find('_'); + DCHECK(delim_pos != std::string::npos); + std::string key_str = key_with_suffix.substr(0, delim_pos); + std::string expiration_time_str = key_with_suffix.substr(delim_pos + 1); + auto hash = UInt128Wrapper(vectorized::unhex_uint<uint128_t>(key_str.c_str())); + std::error_code ec; + std::filesystem::directory_iterator offset_it(key_it->path(), ec); + if (ec) [[unlikely]] { + LOG(WARNING) << "filesystem error, failed to remove file, file=" << key_it->path() + << " error=" << ec.message(); + continue; + } + long expiration_time = std::stoul(expiration_time_str); + for (; offset_it != std::filesystem::directory_iterator(); ++offset_it) { + size_t size = offset_it->file_size(ec); + size_t offset = 0; + bool is_tmp = false; + FileCacheType cache_type = FileCacheType::NORMAL; + if (!this->parse_filename_suffix_to_cache_type( + fs, offset_it->path().filename().native(), expiration_time, size, + &offset, &is_tmp, &cache_type)) { + continue; + } + confirmed_blocks.insert({hash, offset}); + std::string offset_path = offset_it->path(); + auto* cell = _mgr->get_cell(hash, offset, cache_lock); + + if (!cell || cell->is_deleted) { + inconsistent_cache_context.emplace_back( + hash, expiration_time, offset, cache_type, + InconsistentCacheContext::InconsistentType::FILES_INCONSISTENT); + continue; + } + + size_t expected_size = + (is_tmp && cell->file_block->state() == FileBlock::State::DOWNLOADING) + ? cell->dowloading_size() + : cell->size(); + InconsistentCacheContext::InconsistentType inconsistent_type; + if (size != expected_size) { + inconsistent_type |= + InconsistentCacheContext::InconsistentType::SIZE_INCONSISTENT; + } + if (cache_type != cell->file_block->cache_type()) { + inconsistent_type |= + InconsistentCacheContext::InconsistentType::CACHE_TYPE_INCONSISTENT; + } + if (expiration_time != cell->file_block->expiration_time()) { + inconsistent_type |= InconsistentCacheContext::InconsistentType:: + EXPIRATION_TIME_INCONSISTENT; + } + + if (inconsistent_type != InconsistentCacheContext::InconsistentType::NONE) { + inconsistent_cache_context.emplace_back(hash, expiration_time, offset, + cache_type, inconsistent_type); + } + } + } + }; + if constexpr (USE_CACHE_VERSION2) { + std::filesystem::directory_iterator key_prefix_it {_cache_base_path, ec}; + if (ec) { + LOG(WARNING) << ec.message(); + return; + } + for (; key_prefix_it != std::filesystem::directory_iterator(); ++key_prefix_it) { + if (!key_prefix_it->is_directory()) { + // skip version file + continue; + } + if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) { + LOG(WARNING) << "Unknown directory " << key_prefix_it->path().native() Review Comment: no deletion here. this is just a checking tool, don't touch anything! ########## be/test/io/cache/block_file_cache_test.cpp: ########## @@ -5284,6 +5285,145 @@ TEST_F(BlockFileCacheTest, file_cache_path_storage_parse) { } } +TEST_F(BlockFileCacheTest, check_fs_file_cache_consistency) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.disposable_queue_size = 30; + settings.disposable_queue_elements = 5; + settings.capacity = 90; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + auto key1 = io::BlockFileCache::hash("key1"); + auto key2 = io::BlockFileCache::hash("key2"); + + io::BlockFileCache mgr(cache_base_path, settings); + ASSERT_TRUE(mgr.initialize()); + for (int i = 0; i < 100; i++) { + if (mgr.get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + io::CacheContext cache_context; + cache_context.cache_type = io::FileCacheType::TTL; + cache_context.query_id = query_id; + cache_context.expiration_time = 0; + { + cache_context.cache_type = io::FileCacheType::NORMAL; + auto holder = mgr.get_or_set(key1, 0, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto result = mgr.check_file_cache_consistency(); + ASSERT_TRUE(result.empty()); + } + + { + auto holder = mgr.get_or_set(key1, 10, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + mgr._files[key1].erase(10); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 20, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(20, 28), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(20, 28), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto* fs_file_cache_storage = dynamic_cast<FSFileCacheStorage*>(mgr._storage.get()); + std::string dir_path = fs_file_cache_storage->get_path_in_local_cache(key1, 0); + fs::path block_file_path = std::filesystem::path(dir_path) / "20"; + fs::remove(block_file_path); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 30, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(30, 38), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(30, 38), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto* fs_file_cache_storage = dynamic_cast<FSFileCacheStorage*>(mgr._storage.get()); + std::string dir_path = fs_file_cache_storage->get_path_in_local_cache(key1, 0); + fs::path block_file_path = std::filesystem::path(dir_path) / "30"; + std::string data = "This is a test message."; + std::ofstream out_file(block_file_path, std::ios::out | std::ios::app); + out_file << data; + out_file.close(); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 40, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(40, 48), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(40, 48), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + blocks[0]->_key.meta.type = io::FileCacheType::INDEX; + mgr.check_file_cache_consistency(); + } + + int64_t expiration_time = UnixSeconds() + 120; + { + cache_context.cache_type = FileCacheType::TTL; + cache_context.expiration_time = expiration_time; + auto holder = mgr.get_or_set(key2, 0, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + blocks[0]->_key.meta.expiration_time = 0; + mgr.check_file_cache_consistency(); Review Comment: ditto ########## be/test/io/cache/block_file_cache_test.cpp: ########## @@ -5284,6 +5285,145 @@ TEST_F(BlockFileCacheTest, file_cache_path_storage_parse) { } } +TEST_F(BlockFileCacheTest, check_fs_file_cache_consistency) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.disposable_queue_size = 30; + settings.disposable_queue_elements = 5; + settings.capacity = 90; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + auto key1 = io::BlockFileCache::hash("key1"); + auto key2 = io::BlockFileCache::hash("key2"); + + io::BlockFileCache mgr(cache_base_path, settings); + ASSERT_TRUE(mgr.initialize()); + for (int i = 0; i < 100; i++) { + if (mgr.get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + io::CacheContext cache_context; + cache_context.cache_type = io::FileCacheType::TTL; + cache_context.query_id = query_id; + cache_context.expiration_time = 0; + { + cache_context.cache_type = io::FileCacheType::NORMAL; + auto holder = mgr.get_or_set(key1, 0, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto result = mgr.check_file_cache_consistency(); + ASSERT_TRUE(result.empty()); + } + + { + auto holder = mgr.get_or_set(key1, 10, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + mgr._files[key1].erase(10); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 20, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(20, 28), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(20, 28), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto* fs_file_cache_storage = dynamic_cast<FSFileCacheStorage*>(mgr._storage.get()); + std::string dir_path = fs_file_cache_storage->get_path_in_local_cache(key1, 0); + fs::path block_file_path = std::filesystem::path(dir_path) / "20"; + fs::remove(block_file_path); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 30, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(30, 38), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(30, 38), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto* fs_file_cache_storage = dynamic_cast<FSFileCacheStorage*>(mgr._storage.get()); + std::string dir_path = fs_file_cache_storage->get_path_in_local_cache(key1, 0); + fs::path block_file_path = std::filesystem::path(dir_path) / "30"; + std::string data = "This is a test message."; + std::ofstream out_file(block_file_path, std::ios::out | std::ios::app); + out_file << data; + out_file.close(); + mgr.check_file_cache_consistency(); Review Comment: ditto ########## be/test/io/cache/block_file_cache_test.cpp: ########## @@ -5284,6 +5285,145 @@ TEST_F(BlockFileCacheTest, file_cache_path_storage_parse) { } } +TEST_F(BlockFileCacheTest, check_fs_file_cache_consistency) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.disposable_queue_size = 30; + settings.disposable_queue_elements = 5; + settings.capacity = 90; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + auto key1 = io::BlockFileCache::hash("key1"); + auto key2 = io::BlockFileCache::hash("key2"); + + io::BlockFileCache mgr(cache_base_path, settings); + ASSERT_TRUE(mgr.initialize()); + for (int i = 0; i < 100; i++) { + if (mgr.get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + io::CacheContext cache_context; + cache_context.cache_type = io::FileCacheType::TTL; + cache_context.query_id = query_id; + cache_context.expiration_time = 0; + { + cache_context.cache_type = io::FileCacheType::NORMAL; + auto holder = mgr.get_or_set(key1, 0, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto result = mgr.check_file_cache_consistency(); + ASSERT_TRUE(result.empty()); + } + + { + auto holder = mgr.get_or_set(key1, 10, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + mgr._files[key1].erase(10); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 20, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(20, 28), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(20, 28), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto* fs_file_cache_storage = dynamic_cast<FSFileCacheStorage*>(mgr._storage.get()); + std::string dir_path = fs_file_cache_storage->get_path_in_local_cache(key1, 0); + fs::path block_file_path = std::filesystem::path(dir_path) / "20"; + fs::remove(block_file_path); + mgr.check_file_cache_consistency(); Review Comment: ditto ########## be/test/io/cache/block_file_cache_test.cpp: ########## @@ -5284,6 +5285,145 @@ TEST_F(BlockFileCacheTest, file_cache_path_storage_parse) { } } +TEST_F(BlockFileCacheTest, check_fs_file_cache_consistency) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.disposable_queue_size = 30; + settings.disposable_queue_elements = 5; + settings.capacity = 90; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + auto key1 = io::BlockFileCache::hash("key1"); + auto key2 = io::BlockFileCache::hash("key2"); + + io::BlockFileCache mgr(cache_base_path, settings); + ASSERT_TRUE(mgr.initialize()); + for (int i = 0; i < 100; i++) { + if (mgr.get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + io::CacheContext cache_context; + cache_context.cache_type = io::FileCacheType::TTL; + cache_context.query_id = query_id; + cache_context.expiration_time = 0; + { + cache_context.cache_type = io::FileCacheType::NORMAL; + auto holder = mgr.get_or_set(key1, 0, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto result = mgr.check_file_cache_consistency(); + ASSERT_TRUE(result.empty()); + } + + { + auto holder = mgr.get_or_set(key1, 10, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + mgr._files[key1].erase(10); + mgr.check_file_cache_consistency(); Review Comment: unnecessary ########## be/src/io/cache/file_cache_common.h: ########## @@ -134,4 +137,51 @@ struct CacheContext { bool is_cold_data {false}; }; +struct InconsistentCacheContext { + UInt128Wrapper hash; + int64_t expiration_time; + size_t offset; + FileCacheType cache_type; + class InconsistentType { + uint32_t type; + + public: + enum : uint32_t { + NONE = 0, + FILES_INCONSISTENT = 1 << 0, Review Comment: ditto, may be 'NOT_LOADED'? ########## be/src/io/cache/fs_file_cache_storage.cpp: ########## @@ -588,6 +588,114 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2"); } +void FSFileCacheStorage::check_consistency( + BlockFileCache* _mgr, + std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash>& confirmed_blocks, + std::vector<InconsistentCacheContext>& inconsistent_cache_context, + std::lock_guard<std::mutex>& cache_lock) const { + std::error_code ec; + auto check = [_mgr, &cache_lock, &confirmed_blocks, &inconsistent_cache_context, + this](std::filesystem::directory_iterator key_it) { + for (; key_it != std::filesystem::directory_iterator(); ++key_it) { + auto key_with_suffix = key_it->path().filename().native(); + auto delim_pos = key_with_suffix.find('_'); + DCHECK(delim_pos != std::string::npos); + std::string key_str = key_with_suffix.substr(0, delim_pos); + std::string expiration_time_str = key_with_suffix.substr(delim_pos + 1); + auto hash = UInt128Wrapper(vectorized::unhex_uint<uint128_t>(key_str.c_str())); + std::error_code ec; + std::filesystem::directory_iterator offset_it(key_it->path(), ec); + if (ec) [[unlikely]] { + LOG(WARNING) << "filesystem error, failed to remove file, file=" << key_it->path() + << " error=" << ec.message(); + continue; + } + long expiration_time = std::stoul(expiration_time_str); + for (; offset_it != std::filesystem::directory_iterator(); ++offset_it) { + size_t size = offset_it->file_size(ec); + size_t offset = 0; + bool is_tmp = false; + FileCacheType cache_type = FileCacheType::NORMAL; + if (!this->parse_filename_suffix_to_cache_type( + fs, offset_it->path().filename().native(), expiration_time, size, + &offset, &is_tmp, &cache_type)) { + continue; + } + confirmed_blocks.insert({hash, offset}); + std::string offset_path = offset_it->path(); + auto* cell = _mgr->get_cell(hash, offset, cache_lock); + + if (!cell || cell->is_deleted) { Review Comment: is_deleted works as a flag, the file is not deleted yet ########## be/src/io/cache/fs_file_cache_storage.cpp: ########## @@ -588,6 +588,114 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2"); } +void FSFileCacheStorage::check_consistency( + BlockFileCache* _mgr, + std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash>& confirmed_blocks, + std::vector<InconsistentCacheContext>& inconsistent_cache_context, + std::lock_guard<std::mutex>& cache_lock) const { + std::error_code ec; + auto check = [_mgr, &cache_lock, &confirmed_blocks, &inconsistent_cache_context, + this](std::filesystem::directory_iterator key_it) { + for (; key_it != std::filesystem::directory_iterator(); ++key_it) { + auto key_with_suffix = key_it->path().filename().native(); + auto delim_pos = key_with_suffix.find('_'); + DCHECK(delim_pos != std::string::npos); + std::string key_str = key_with_suffix.substr(0, delim_pos); + std::string expiration_time_str = key_with_suffix.substr(delim_pos + 1); + auto hash = UInt128Wrapper(vectorized::unhex_uint<uint128_t>(key_str.c_str())); + std::error_code ec; + std::filesystem::directory_iterator offset_it(key_it->path(), ec); + if (ec) [[unlikely]] { + LOG(WARNING) << "filesystem error, failed to remove file, file=" << key_it->path() + << " error=" << ec.message(); + continue; + } + long expiration_time = std::stoul(expiration_time_str); + for (; offset_it != std::filesystem::directory_iterator(); ++offset_it) { + size_t size = offset_it->file_size(ec); + size_t offset = 0; + bool is_tmp = false; + FileCacheType cache_type = FileCacheType::NORMAL; + if (!this->parse_filename_suffix_to_cache_type( + fs, offset_it->path().filename().native(), expiration_time, size, + &offset, &is_tmp, &cache_type)) { + continue; + } + confirmed_blocks.insert({hash, offset}); + std::string offset_path = offset_it->path(); + auto* cell = _mgr->get_cell(hash, offset, cache_lock); + + if (!cell || cell->is_deleted) { + inconsistent_cache_context.emplace_back( + hash, expiration_time, offset, cache_type, + InconsistentCacheContext::InconsistentType::FILES_INCONSISTENT); + continue; + } + + size_t expected_size = + (is_tmp && cell->file_block->state() == FileBlock::State::DOWNLOADING) + ? cell->dowloading_size() + : cell->size(); + InconsistentCacheContext::InconsistentType inconsistent_type; + if (size != expected_size) { + inconsistent_type |= + InconsistentCacheContext::InconsistentType::SIZE_INCONSISTENT; + } + if (cache_type != cell->file_block->cache_type()) { + inconsistent_type |= + InconsistentCacheContext::InconsistentType::CACHE_TYPE_INCONSISTENT; + } + if (expiration_time != cell->file_block->expiration_time()) { + inconsistent_type |= InconsistentCacheContext::InconsistentType:: + EXPIRATION_TIME_INCONSISTENT; + } + + if (inconsistent_type != InconsistentCacheContext::InconsistentType::NONE) { + inconsistent_cache_context.emplace_back(hash, expiration_time, offset, + cache_type, inconsistent_type); + } + } + } + }; + if constexpr (USE_CACHE_VERSION2) { + std::filesystem::directory_iterator key_prefix_it {_cache_base_path, ec}; + if (ec) { + LOG(WARNING) << ec.message(); + return; + } + for (; key_prefix_it != std::filesystem::directory_iterator(); ++key_prefix_it) { + if (!key_prefix_it->is_directory()) { + // skip version file + continue; + } + if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) { + LOG(WARNING) << "Unknown directory " << key_prefix_it->path().native() + << ", try to remove it"; + std::error_code ec; + std::filesystem::remove(key_prefix_it->path(), ec); + if (ec) { + LOG(WARNING) << "failed to remove=" << key_prefix_it->path() + << " msg=" << ec.message(); + } + continue; + } + std::filesystem::directory_iterator key_it {key_prefix_it->path(), ec}; + if (ec) { + LOG(WARNING) << ec.message(); + continue; + } + check(key_it); + } + } else { + std::filesystem::directory_iterator key_it {_cache_base_path, ec}; Review Comment: are you sure this check works for both VERSION1 and VERSION2? if not, just warning here "only support version2" ########## be/test/io/cache/block_file_cache_test.cpp: ########## @@ -5284,6 +5285,145 @@ TEST_F(BlockFileCacheTest, file_cache_path_storage_parse) { } } +TEST_F(BlockFileCacheTest, check_fs_file_cache_consistency) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.disposable_queue_size = 30; + settings.disposable_queue_elements = 5; + settings.capacity = 90; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + auto key1 = io::BlockFileCache::hash("key1"); + auto key2 = io::BlockFileCache::hash("key2"); + + io::BlockFileCache mgr(cache_base_path, settings); + ASSERT_TRUE(mgr.initialize()); + for (int i = 0; i < 100; i++) { + if (mgr.get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + io::CacheContext cache_context; + cache_context.cache_type = io::FileCacheType::TTL; + cache_context.query_id = query_id; + cache_context.expiration_time = 0; + { + cache_context.cache_type = io::FileCacheType::NORMAL; + auto holder = mgr.get_or_set(key1, 0, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto result = mgr.check_file_cache_consistency(); + ASSERT_TRUE(result.empty()); + } + + { + auto holder = mgr.get_or_set(key1, 10, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(10, 18), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + mgr._files[key1].erase(10); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 20, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(20, 28), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(20, 28), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto* fs_file_cache_storage = dynamic_cast<FSFileCacheStorage*>(mgr._storage.get()); + std::string dir_path = fs_file_cache_storage->get_path_in_local_cache(key1, 0); + fs::path block_file_path = std::filesystem::path(dir_path) / "20"; + fs::remove(block_file_path); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 30, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(30, 38), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(30, 38), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + auto* fs_file_cache_storage = dynamic_cast<FSFileCacheStorage*>(mgr._storage.get()); + std::string dir_path = fs_file_cache_storage->get_path_in_local_cache(key1, 0); + fs::path block_file_path = std::filesystem::path(dir_path) / "30"; + std::string data = "This is a test message."; + std::ofstream out_file(block_file_path, std::ios::out | std::ios::app); + out_file << data; + out_file.close(); + mgr.check_file_cache_consistency(); + } + + { + auto holder = mgr.get_or_set(key1, 40, 9, cache_context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(40, 48), io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, blocks[0], io::FileBlock::Range(40, 48), io::FileBlock::State::DOWNLOADING); + download(blocks[0]); + blocks[0]->_key.meta.type = io::FileCacheType::INDEX; + mgr.check_file_cache_consistency(); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org