github-actions[bot] commented on code in PR #64186:
URL: https://github.com/apache/doris/pull/64186#discussion_r3409175180
##########
be/src/io/cache/block_file_cache.cpp:
##########
@@ -941,6 +882,251 @@ FileBlocksHolder BlockFileCache::get_or_set(const
UInt128Wrapper& hash, size_t o
return FileBlocksHolder(std::move(file_blocks));
}
+void BlockFileCache::enqueue_hash_for_cleanup(const UInt128Wrapper& hash) {
+ if (_storage->get_type() != DISK) {
+ return;
+ }
+ bool ret = _recycle_hashes.enqueue(hash);
+ if (!ret) {
+ LOG_WARNING("Failed to push recycle hash to queue, hash={}",
hash.to_string());
+ }
+}
+
+void BlockFileCache::enqueue_key_dir_for_cleanup(const UInt128Wrapper& hash,
+ uint64_t expiration_time) {
+ if (_storage->get_type() != DISK) {
+ return;
+ }
+ bool ret = _recycle_key_dirs.enqueue(std::make_pair(hash,
expiration_time));
+ if (!ret) {
+ LOG_WARNING("Failed to push recycle key dir to queue, hash={},
expiration_time={}",
+ hash.to_string(), expiration_time);
+ }
+}
+
+std::optional<uint64_t>
BlockFileCache::get_canonical_storage_expiration_if_stable(
+ const UInt128Wrapper& hash, std::lock_guard<std::mutex>& cache_lock,
+ bool* skipped_unstable) {
+ if (skipped_unstable != nullptr) {
+ *skipped_unstable = false;
+ }
+ auto iter = _files.find(hash);
+ if (iter == _files.end() || iter->second.empty()) {
+ return std::nullopt;
+ }
+
+ std::optional<uint64_t> canonical_expiration;
+ for (const auto& [_, cell] : iter->second) {
+ auto state = cell.file_block->state_unsafe();
+ if (state != FileBlock::State::DOWNLOADED) {
+ if (skipped_unstable != nullptr) {
+ *skipped_unstable = true;
+ }
+ return std::nullopt;
+ }
+ auto storage_expiration = cell.file_block->storage_expiration_time();
+ if (!canonical_expiration.has_value()) {
+ canonical_expiration = storage_expiration;
+ } else if (*canonical_expiration != storage_expiration) {
+ if (skipped_unstable != nullptr) {
+ *skipped_unstable = true;
+ }
+ return std::nullopt;
+ }
+ }
+ return canonical_expiration;
+}
+
+size_t BlockFileCache::repair_duplicate_ttl_dirs_once() {
+ if (_storage->get_type() != DISK ||
!config::enable_file_cache_ttl_repair_checker) {
+ return 0;
+ }
+ if (_recycle_keys.size_approx() > 0 || _recycle_hashes.size_approx() > 0 ||
+ _recycle_key_dirs.size_approx() > 0) {
+ return 0;
+ }
+
+ std::vector<DuplicateKeyDirs> duplicate_key_dirs;
+ auto st = _storage->list_duplicate_key_dirs(&duplicate_key_dirs, [this]()
{ return _close; });
+ if (st.is<ErrorCode::CANCELLED>()) {
+ return 0;
+ }
+ if (!st.ok()) {
+ LOG_WARNING("failed to list duplicate file cache ttl dirs").error(st);
+ return 0;
+ }
+
+ *_ttl_repair_checker_suspect_hashes << duplicate_key_dirs.size();
+ size_t repaired_hashes = 0;
+ size_t max_repairs =
+
std::max<int64_t>(config::file_cache_ttl_repair_checker_max_repairs_per_round,
0);
+ for (const auto& duplicate_key_dir : duplicate_key_dirs) {
+ if (_close || repaired_hashes >= max_repairs) {
+ break;
+ }
+
+ bool skipped_unstable = false;
+ std::optional<uint64_t> canonical_expiration;
+ {
+ SCOPED_CACHE_LOCK(_mutex, this);
+ canonical_expiration = get_canonical_storage_expiration_if_stable(
+ duplicate_key_dir.hash, cache_lock, &skipped_unstable);
+ }
+
+ if (!canonical_expiration.has_value()) {
+ if (skipped_unstable) {
+ *_ttl_repair_checker_skipped_unstable_hashes << 1;
+ } else {
+ *_ttl_repair_checker_skipped_unbound_hashes << 1;
+ }
+ continue;
+ }
+
+ bool enqueued = false;
+ for (auto expiration_time : duplicate_key_dir.expiration_times) {
+ if (expiration_time == *canonical_expiration) {
+ continue;
+ }
+ enqueue_key_dir_for_cleanup(duplicate_key_dir.hash,
expiration_time);
+ *_ttl_repair_checker_repairs_enqueued << 1;
+ enqueued = true;
+ }
+ if (enqueued) {
+ ++repaired_hashes;
+ }
+ }
+ return repaired_hashes;
+}
+
+CacheContext BlockFileCache::make_cell_context(
+ const UInt128Wrapper& hash, const CacheContext& context,
+ std::lock_guard<std::mutex>& /* cache_lock */) const {
+ CacheContext result = context;
+ if (result.storage_expiration_time < 0) {
+ result.storage_expiration_time = result.expiration_time;
+ }
+
+ if (context.cache_type != FileCacheType::TTL) {
+ return result;
+ }
Review Comment:
This still lets a cached hash create a second storage directory once its
logical TTL is cleared. `make_cell_context()` returns here for every non-TTL
request before looking at existing cells, so after
`modify_expiration_time(hash, 0)` the loaded blocks are logical `NORMAL` but
still point at their old TTL storage directory. A later normal read that fills
a hole will use `storage_expiration_time=0` and write the new block under
`hash_0`, recreating the split-directory state this PR is trying to prevent;
the repair checker will then skip the hash as unstable because loaded blocks
have different storage expirations. This is distinct from the existing
absent-hash `_key_to_time` thread because the hash is already cached. Please
derive the storage expiration from existing cells for all request types, not
only TTL requests.
##########
be/src/io/cache/fs_file_cache_storage.cpp:
##########
@@ -797,57 +1066,61 @@ void
FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
if (!batch_load_buffer.empty()) {
add_cell_batch_func();
}
+ drop_unbound_loaded_blocks(_mgr);
TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2");
}
+void FSFileCacheStorage::drop_unbound_loaded_blocks(BlockFileCache* mgr) const
{
+ SCOPED_CACHE_LOCK(mgr->_mutex, mgr);
+ std::vector<FileBlockCell*> to_remove;
+ for (auto& [_, offset_to_cell] : mgr->_files) {
+ for (auto& [__, cell] : offset_to_cell) {
+ if (cell.file_block->state_unsafe() !=
FileBlock::State::DOWNLOADED) {
+ continue;
+ }
+ if (!storage_file_exists(cell.file_block->storage_key())) {
+ to_remove.push_back(&cell);
+ }
+ }
+ }
+ mgr->remove_file_blocks_and_clean_time_maps(to_remove, cache_lock);
+}
+
void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr,
const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) {
// async load, can't find key, need to check exist.
auto key_path = get_path_in_local_cache(key.hash,
key.meta.expiration_time);
bool exists = false;
- auto st = fs->exists(key_path, &exists);
if (auto st = fs->exists(key_path, &exists); !exists && st.ok()) {
- // cache miss
+ auto key_dirs = list_key_dirs(key.hash);
+ auto preferred_it = std::find_if(key_dirs.begin(), key_dirs.end(),
[&](const auto& dir) {
+ return dir.expiration_time == key.meta.expiration_time;
+ });
+ if (preferred_it != key_dirs.end()) {
+ std::rotate(key_dirs.begin(), preferred_it,
std::next(preferred_it));
+ }
+ for (const auto& key_dir : key_dirs) {
+ load_blocks_from_dir_unlocked(mgr, key.hash, key_dir, &key,
cache_lock);
+ }
return;
} else if (!st.ok()) [[unlikely]] {
LOG_WARNING("failed to exists file {}", key_path).error(st);
return;
}
- CacheContext context_original;
- context_original.query_id = TUniqueId();
- context_original.expiration_time = key.meta.expiration_time;
- std::error_code ec;
- std::filesystem::directory_iterator check_it(key_path, ec);
- if (ec) [[unlikely]] {
- LOG(WARNING) << "fail to directory_iterator " << ec.message();
+ load_blocks_from_dir_unlocked(
+ mgr, key.hash, KeyDir {.expiration_time =
key.meta.expiration_time, .path = key_path},
+ &key, cache_lock);
+ if (mgr->get_cell(key.hash, key.offset, cache_lock) != nullptr) {
return;
Review Comment:
This early return can hide cached blocks in sibling `hash_*` directories.
During async startup, if the exact directory contains the requested starting
offset but another directory for the same hash contains a later offset in the
same requested range, this loads only the exact directory and returns.
`get_impl()` then sees a hole and `fill_holes_with_empty_file_blocks()` creates
a new empty block instead of using the already cached block from the sibling
directory. The new test covers the case where the exact directory is missing,
but not this partial-hit case. Please keep scanning sibling dirs when
duplicates exist, or otherwise make the fallback range-aware before returning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]