freemandealer commented on code in PR #54295:
URL: https://github.com/apache/doris/pull/54295#discussion_r2269293868
##########
be/src/io/cache/block_file_cache.cpp:
##########
@@ -2359,4 +2360,84 @@ template void BlockFileCache::remove(FileBlockSPtr
file_block,
#include "common/compile_check_end.h"
+
+Status
BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>&
results) {
+ InconsistencyContext inconsistency_context;
+ RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
+ auto n = inconsistency_context.types.size();
+ results.reserve(n);
+ for (size_t i = 0; i < n; i++) {
+ std::string result;
+ result += "File cahce info in manager:\n";
Review Comment:
typo
##########
be/test/io/cache/block_file_cache_test.cpp:
##########
@@ -5322,6 +5323,155 @@ TEST_F(BlockFileCacheTest,
file_cache_path_storage_parse) {
}
}
+TEST_F(BlockFileCacheTest, check_file_cache_consistency) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 30;
+ settings.query_queue_elements = 5;
+ settings.index_queue_size = 30;
+ settings.index_queue_elements = 5;
+ settings.disposable_queue_size = 30;
+ settings.disposable_queue_elements = 5;
+ settings.capacity = 90;
+ settings.max_file_block_size = 30;
+ settings.max_query_cache_size = 30;
+ auto key1 = io::BlockFileCache::hash("key1");
+ auto key2 = io::BlockFileCache::hash("key2");
+
+ io::BlockFileCache mgr(cache_base_path, settings);
+ ASSERT_TRUE(mgr.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (mgr.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ io::CacheContext cache_context;
+ cache_context.cache_type = io::FileCacheType::TTL;
+ cache_context.query_id = query_id;
+ cache_context.expiration_time = 0;
+ {
+ cache_context.cache_type = io::FileCacheType::NORMAL;
+ auto holder = mgr.get_or_set(key1, 0, 9, cache_context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(0, 8),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ assert_range(2, blocks[0], io::FileBlock::Range(0, 8),
io::FileBlock::State::DOWNLOADING);
+ download(blocks[0]);
+ std::vector<std::string> result;
+ Status status = mgr.report_file_cache_inconsistency(result);
+ ASSERT_TRUE(result.empty());
+ }
+
+ {
+ auto holder = mgr.get_or_set(key1, 10, 9, cache_context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(10, 18),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ assert_range(2, blocks[0], io::FileBlock::Range(10, 18),
io::FileBlock::State::DOWNLOADING);
+ download(blocks[0]);
+ mgr._files[key1].erase(10);
+ }
+
+ {
+ auto holder = mgr.get_or_set(key1, 20, 9, cache_context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(20, 28),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ assert_range(2, blocks[0], io::FileBlock::Range(20, 28),
io::FileBlock::State::DOWNLOADING);
+ download(blocks[0]);
+ auto* fs_file_cache_storage =
dynamic_cast<FSFileCacheStorage*>(mgr._storage.get());
+ std::string dir_path =
fs_file_cache_storage->get_path_in_local_cache(key1, 0);
+ fs::path block_file_path = std::filesystem::path(dir_path) / "20";
+ fs::remove(block_file_path);
+ }
+
+ {
+ auto holder = mgr.get_or_set(key1, 30, 9, cache_context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(30, 38),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ assert_range(2, blocks[0], io::FileBlock::Range(30, 38),
io::FileBlock::State::DOWNLOADING);
+ download(blocks[0]);
+ auto* fs_file_cache_storage =
dynamic_cast<FSFileCacheStorage*>(mgr._storage.get());
+ std::string dir_path =
fs_file_cache_storage->get_path_in_local_cache(key1, 0);
+ fs::path block_file_path = std::filesystem::path(dir_path) / "30";
+ std::string data = "This is a test message.";
+ std::ofstream out_file(block_file_path, std::ios::out | std::ios::app);
+ out_file << data;
+ out_file.close();
+ }
+
+ {
+ auto holder = mgr.get_or_set(key1, 40, 9, cache_context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(40, 48),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ assert_range(2, blocks[0], io::FileBlock::Range(40, 48),
io::FileBlock::State::DOWNLOADING);
+ download(blocks[0]);
+ blocks[0]->_key.meta.type = io::FileCacheType::INDEX;
+ }
+
+ int64_t expiration_time = UnixSeconds() + 120;
+ {
+ cache_context.cache_type = FileCacheType::TTL;
+ cache_context.expiration_time = expiration_time;
+ auto holder = mgr.get_or_set(key2, 0, 9, cache_context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(0, 8),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ assert_range(2, blocks[0], io::FileBlock::Range(0, 8),
io::FileBlock::State::DOWNLOADING);
+ download(blocks[0]);
+ blocks[0]->_key.meta.expiration_time = 0;
+ }
+ std::vector<std::string> results;
+ Status status = mgr.report_file_cache_inconsistency(results);
+ std::unordered_set<std::string> expected_results = {
+ "File cahce info in manager:\nHash: "
Review Comment:
multiple typos below
##########
regression-test/suites/cloud_p2/cache/http/test_check_file_cache_consistency.groovy:
##########
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_check_file_cache_consistency", "docker") {
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.beConfigs.add("enable_file_cache=true")
+
options.beConfigs.add('file_cache_path=[{"path":"/data/doris_cloud/file_cache","total_size":104857600,"query_limit":104857600}]')
+
+ docker(options) {
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort)
+
+ def beId = backendId_to_backendIP.keySet()[0]
+ def beIp = backendId_to_backendIP.get(beId)
+ def beHttpPort = backendId_to_backendHttpPort.get(beId)
+ def socket = "${beIp}:${beHttpPort}"
+
+ // wait for be start
+ Thread.sleep(5000)
+
+ sql """
+ CREATE DATABASE IF NOT EXISTS test_file_cache_db
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS test_file_cache_db.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ ) DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """
+ INSERT INTO test_file_cache_db.test_table VALUES
+ (1, 'test1', 1.1),
+ (2, 'test2', 2.2),
+ (3, 'test3', 3.3),
+ (4, 'test4', 4.4),
+ (5, 'test5', 5.5)
+ """
+
+ // query to trigger cache
+ sql """
+ SELECT * FROM test_file_cache_db.test_table WHERE id > 0
+ """
+
+ sql """
+ SELECT COUNT(*) FROM test_file_cache_db.test_table
+ """
+
+ sql """
+ SELECT AVG(value) FROM test_file_cache_db.test_table WHERE id > 2
+ """
+
+ // wait for cache to be created
+ Thread.sleep(5000)
+
+ // Test 1: list_base_paths operation
+ httpTest {
+ endpoint ""
+ uri "${socket}/api/file_cache?op=list_base_paths"
+ op "post"
+ body ""
+ check {respCode, body ->
+ assertEquals(respCode, 200)
+ println("List base paths response: ${body}")
+
+ assertNotNull(body, "Response body should not be null")
+ assertNotEquals(body, "null", "Response body should not be
'null'")
+
+ def respJson = parseJson(body)
+ assertTrue(respJson instanceof List, "Response should be a
JSON array")
+ assertEquals(1, respJson.size(), "Should return exactly one
base path")
+
+ def basePath = respJson[0]
+ assertEquals("/data/doris_cloud/file_cache", basePath, "Base
path should match configured path")
+
+ println("Verified base path: ${basePath}")
+ }
+ }
+
+ // Test 2: check_consistency operation
+ httpTest {
+ endpoint ""
+ uri
"${socket}/api/file_cache?op=check_consistency&base_path=/data/doris_cloud/file_cache"
+ op "post"
+ body ""
+ check {respCode, body ->
+ assertEquals(respCode, 200)
+
+ if (body == null || body == "null") {
+ println("No inconsistencies found in file cache - this is
expected for a clean cache")
+ } else {
+ // should't be any inconsistency
+ try {
+ def respJson = parseJson(body)
+
+ if (respJson instanceof List) {
+ println("Found ${respJson.size()} inconsistencies
in file cache")
+ respJson.each { inconsistency ->
+ assertTrue(inconsistency instanceof String,
"Each inconsistency should be a string")
+ assertTrue(inconsistency.contains("Hash:") ||
+ inconsistency.contains("Inconsistency
Reason") ||
+ inconsistency.contains("File cache
info") ||
+ inconsistency.contains("File cahce
info"),
Review Comment:
typo
##########
be/src/io/cache/block_file_cache.cpp:
##########
@@ -2359,4 +2360,84 @@ template void BlockFileCache::remove(FileBlockSPtr
file_block,
#include "common/compile_check_end.h"
+
+Status
BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>&
results) {
+ InconsistencyContext inconsistency_context;
+ RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
+ auto n = inconsistency_context.types.size();
+ results.reserve(n);
+ for (size_t i = 0; i < n; i++) {
+ std::string result;
+ result += "File cahce info in manager:\n";
+ result += inconsistency_context.infos_in_manager[i].to_string();
+ result += "File cahce info in storage:\n";
Review Comment:
typo
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]