github-actions[bot] commented on code in PR #63376:
URL: https://github.com/apache/doris/pull/63376#discussion_r3412270183
##########
be/src/io/fs/file_meta_cache.cpp:
##########
@@ -17,27 +17,391 @@
#include "io/fs/file_meta_cache.h"
+#include <crc32c/crc32c.h>
+#include <gen_cpp/Types_types.h>
+
+#include <algorithm>
+#include <cstring>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/file_cache_common.h"
+#include "util/coding.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+#include "util/stopwatch.hpp"
+
namespace doris {
+namespace {
-std::string FileMetaCache::get_key(const std::string file_name, int64_t
modification_time,
- int64_t file_size) {
- std::string meta_cache_key;
- meta_cache_key.resize(file_name.size() + sizeof(int64_t));
+constexpr std::string_view FILE_META_CACHE_DISK_MAGIC = "DFMC";
+constexpr uint8_t FILE_META_CACHE_DISK_VERSION = 1;
+constexpr size_t FILE_META_CACHE_DISK_HEADER_SIZE = 4 + 1 + 1 + 2 + 8 + 8 + 8
+ 4;
- memcpy(meta_cache_key.data(), file_name.data(), file_name.size());
- if (modification_time != 0) {
- memcpy(meta_cache_key.data() + file_name.size(), &modification_time,
sizeof(int64_t));
- } else {
- memcpy(meta_cache_key.data() + file_name.size(), &file_size,
sizeof(int64_t));
+std::string_view format_name(FileMetaCacheFormat format) {
+ switch (format) {
+ case FileMetaCacheFormat::PARQUET:
+ return "parquet";
+ case FileMetaCacheFormat::ORC:
+ return "orc";
+ }
+ DCHECK(false) << "unknown file meta cache format";
+ return "unknown";
+}
+
+struct FileMetaCacheDiskHeader {
+ FileMetaCacheFormat format;
+ int64_t modification_time = 0;
+ int64_t file_size = 0;
+ uint64_t payload_size = 0;
+ uint32_t checksum = 0;
+};
+
+Status parse_disk_cache_header(std::string_view header,
FileMetaCacheDiskHeader* parsed) {
+ DCHECK(header.size() == FILE_META_CACHE_DISK_HEADER_SIZE);
+ if (std::memcmp(header.data(), FILE_META_CACHE_DISK_MAGIC.data(),
+ FILE_META_CACHE_DISK_MAGIC.size()) != 0) {
+ return Status::NotFound("file meta disk cache magic mismatch");
+ }
+
+ const auto* ptr =
+ reinterpret_cast<const uint8_t*>(header.data() +
FILE_META_CACHE_DISK_MAGIC.size());
+ const uint8_t version = *ptr++;
+ if (version != FILE_META_CACHE_DISK_VERSION) {
+ return Status::NotFound("file meta disk cache version mismatch");
+ }
+
+ parsed->format = static_cast<FileMetaCacheFormat>(*ptr++);
+ ptr += 2;
+ parsed->file_size = static_cast<int64_t>(decode_fixed64_le(ptr));
+ ptr += sizeof(uint64_t);
+ parsed->modification_time = static_cast<int64_t>(decode_fixed64_le(ptr));
+ ptr += sizeof(uint64_t);
+ parsed->payload_size = decode_fixed64_le(ptr);
+ ptr += sizeof(uint64_t);
+ parsed->checksum = decode_fixed32_le(ptr);
+ return Status::OK();
+}
+
+std::string build_disk_cache_value(FileMetaCacheFormat format, int64_t
modification_time,
+ int64_t file_size, std::string_view
payload) {
+ std::string value;
+ value.reserve(FILE_META_CACHE_DISK_HEADER_SIZE + payload.size());
+ value.append(FILE_META_CACHE_DISK_MAGIC.data(),
FILE_META_CACHE_DISK_MAGIC.size());
+ value.push_back(static_cast<char>(FILE_META_CACHE_DISK_VERSION));
+ value.push_back(static_cast<char>(format));
+ value.push_back(0);
+ value.push_back(0);
+ put_fixed64_le(&value, static_cast<uint64_t>(file_size));
+ put_fixed64_le(&value, static_cast<uint64_t>(modification_time));
+ put_fixed64_le(&value, payload.size());
+ put_fixed32_le(&value, crc32c::Crc32c(payload.data(), payload.size()));
+ value.append(payload.data(), payload.size());
+ return value;
+}
+
+io::CacheContext build_meta_cache_context() {
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::INDEX;
+ context.query_id = TUniqueId();
+ context.expiration_time = 0;
+ context.is_cold_data = false;
+ context.is_warmup = false;
+ return context;
+}
+
+void update_profile_counter(int64_t* counter, int64_t value = 1) {
+ if (counter != nullptr) {
+ *counter += value;
}
+}
+
+Status read_cached_file_cache(io::BlockFileCache* cache, const
io::UInt128Wrapper& hash,
+ size_t offset, Slice buffer,
+ std::vector<io::FileBlockSPtr>* read_blocks =
nullptr) {
+ if (buffer.size == 0) {
+ return Status::OK();
+ }
+
+ auto blocks = cache->get_blocks_by_key(hash);
+ Defer reset_owned_by_cached_reader {[&] {
+ for (auto& [_, block] : blocks) {
+ block->_owned_by_cached_reader = false;
+ }
+ }};
+ if (blocks.empty()) {
+ return Status::NotFound("file cache block not found, hash={}",
hash.to_string());
+ }
+
+ const size_t right = offset + buffer.size - 1;
+ size_t current_pos = offset;
+ size_t written_size = 0;
+ auto it = blocks.upper_bound(offset);
+ if (it != blocks.begin()) {
+ --it;
+ }
+ for (; it != blocks.end() && current_pos <= right; ++it) {
+ const auto& block = it->second;
+ const auto& range = block->range();
+ if (range.right < current_pos) {
+ continue;
+ }
+ if (range.left > current_pos) {
+ return Status::NotFound("file cache block range has holes,
hash={}", hash.to_string());
+ }
+
+ const size_t read_size = std::min(range.right, right) - current_pos +
1;
+ const size_t read_offset = current_pos - range.left;
+ RETURN_IF_ERROR(block->read(Slice(buffer.data + written_size,
read_size), read_offset));
+ if (read_blocks != nullptr) {
+ read_blocks->push_back(block);
+ }
+ written_size += read_size;
+ current_pos += read_size;
+ }
+ if (current_pos <= right) {
+ return Status::NotFound("file cache block range has holes, hash={}",
hash.to_string());
+ }
+ return Status::OK();
+}
+
+} // namespace
+
+FileMetaCache::FileMetaCache(int64_t capacity, io::BlockFileCache*
block_file_cache)
+ : _cache(capacity), _block_file_cache(block_file_cache) {}
+
+std::string FileMetaCache::get_key(const std::string& file_name, int64_t
modification_time,
+ int64_t file_size) {
+ std::string meta_cache_key;
+ meta_cache_key.reserve(sizeof(uint64_t) + file_name.size() +
sizeof(int64_t) * 2);
+ put_fixed64_le(&meta_cache_key, file_name.size());
+ meta_cache_key.append(file_name);
+ put_fixed64_le(&meta_cache_key, static_cast<uint64_t>(modification_time));
+ put_fixed64_le(&meta_cache_key, static_cast<uint64_t>(file_size));
return meta_cache_key;
}
std::string FileMetaCache::get_key(io::FileReaderSPtr file_reader,
const io::FileDescription&
_file_description) {
+ const std::string& file_path = file_reader->path().native();
+ std::string file_identity;
+ if (_file_description.fs_name.empty()) {
+ file_identity = file_path;
+ } else {
+ file_identity.reserve(_file_description.fs_name.size() + 1 +
file_path.size());
+ file_identity.append(_file_description.fs_name);
+ file_identity.push_back('\0');
+ file_identity.append(file_path);
+ }
return FileMetaCache::get_key(
- file_reader->path().native(), _file_description.mtime,
+ file_identity, _file_description.mtime,
_file_description.file_size == -1 ? file_reader->size() :
_file_description.file_size);
}
+bool FileMetaCache::is_persistent_cache_enabled() {
+ const int64_t max_entry_bytes =
config::external_file_meta_disk_cache_max_entry_bytes;
+ return config::enable_external_file_meta_disk_cache && max_entry_bytes > 0;
+}
+
+bool FileMetaCache::is_persistent_cache_payload_size_allowed(uint64_t
payload_size) {
+ const int64_t max_entry_bytes =
config::external_file_meta_disk_cache_max_entry_bytes;
+ return config::enable_external_file_meta_disk_cache && max_entry_bytes > 0
&&
+ std::cmp_less_equal(payload_size, max_entry_bytes);
+}
+
+std::string FileMetaCache::get_persistent_cache_key(FileMetaCacheFormat format,
+ std::string_view
file_meta_cache_key) {
+ std::string key;
+ key.reserve(32 + file_meta_cache_key.size());
+ key.append("file_meta_cache:v1:");
+ key.append(format_name(format));
+ key.push_back(':');
+ key.append(file_meta_cache_key.data(), file_meta_cache_key.size());
+ return key;
+}
+
+FileMetaCacheLookupResult FileMetaCache::lookup(const FileMetaCacheContext&
context,
+ ObjLRUCache::CacheHandle*
handle,
+ std::string* serialized_meta,
+ FileMetaCacheProfile* profile)
{
+ DCHECK(handle != nullptr);
+ DCHECK(serialized_meta != nullptr);
+ if (context.enable_memory_cache && lookup(context.key, handle)) {
+ serialized_meta->clear();
+ if (profile != nullptr) {
+ update_profile_counter(profile->hit_cache);
+ update_profile_counter(profile->hit_memory_cache);
+ }
+ return {.state = FileMetaCacheLookupState::MEMORY_HIT};
+ }
+
+ FileMetaCacheLookupResult result;
+ int64_t persisted_read_time = 0;
+ if (lookup_persistent_cache(context, serialized_meta,
&persisted_read_time)) {
+ result.state = FileMetaCacheLookupState::PERSISTED_HIT;
+ if (profile != nullptr) {
+ update_profile_counter(profile->hit_cache);
+ update_profile_counter(profile->hit_disk_cache);
+ update_profile_counter(profile->read_disk_cache_time,
persisted_read_time);
+ }
+ } else if (is_persistent_cache_enabled() && profile != nullptr) {
+ update_profile_counter(profile->miss_disk_cache);
+ }
+ return result;
+}
+
+bool FileMetaCache::lookup_persistent_cache(const FileMetaCacheContext&
context,
+ std::string* payload, int64_t*
read_time) {
+ DCHECK(payload != nullptr);
+ DCHECK(read_time != nullptr);
+ payload->clear();
+ *read_time = 0;
+ if (!is_persistent_cache_enabled()) {
+ return false;
+ }
+
+ MonotonicStopWatch watch;
+ watch.start();
+ auto stop_watch = [&]() { *read_time = watch.elapsed_time(); };
+
+ const std::string disk_cache_key =
get_persistent_cache_key(context.format, context.key);
+ const auto hash = io::BlockFileCache::hash(disk_cache_key);
+ io::BlockFileCache* cache = get_block_file_cache(hash);
+ if (cache == nullptr) {
+ stop_watch();
+ return false;
+ }
+
+ std::vector<io::FileBlockSPtr> read_blocks;
+ auto invalidate_entry = [&](const Status& status) {
+ payload->clear();
+ read_blocks.clear();
+ cache->remove_if_cached(hash);
+ VLOG_DEBUG << "lookup file meta disk cache failed: " << status;
+ stop_watch();
+ return false;
+ };
+
+ std::string header(FILE_META_CACHE_DISK_HEADER_SIZE, '\0');
+ Status status = read_cached_file_cache(cache, hash, 0,
Slice(header.data(), header.size()),
+ &read_blocks);
+ if (!status.ok()) {
+ VLOG_DEBUG << "lookup file meta disk cache failed: " << status;
+ stop_watch();
+ return false;
+ }
+
+ FileMetaCacheDiskHeader parsed;
+ status = parse_disk_cache_header(header, &parsed);
+ if (!status.ok()) {
+ return invalidate_entry(status);
+ }
+ if (parsed.format != context.format || parsed.modification_time !=
context.modification_time ||
+ parsed.file_size != context.file_size ||
+ !is_persistent_cache_payload_size_allowed(parsed.payload_size)) {
+ return invalidate_entry(Status::NotFound("file meta disk cache header
mismatch"));
+ }
+
+ payload->resize(parsed.payload_size);
+ if (parsed.payload_size > 0) {
+ status = read_cached_file_cache(cache, hash,
FILE_META_CACHE_DISK_HEADER_SIZE,
+ Slice(payload->data(),
payload->size()), &read_blocks);
+ if (!status.ok()) {
+ payload->clear();
+ VLOG_DEBUG << "lookup file meta disk cache failed: " << status;
+ stop_watch();
+ return false;
+ }
+ }
+ const uint32_t checksum = crc32c::Crc32c(payload->data(), payload->size());
+ if (checksum != parsed.checksum) {
+ return invalidate_entry(Status::NotFound("file meta disk cache
checksum mismatch"));
+ }
+
+ for (auto& block : read_blocks) {
+ cache->add_need_update_lru_block(std::move(block));
+ }
+ stop_watch();
+ return true;
+}
+
+bool FileMetaCache::insert_persistent_cache(const FileMetaCacheContext&
context,
+ std::string_view payload, int64_t*
write_time) {
+ DCHECK(write_time != nullptr);
+ *write_time = 0;
+ if
(!is_persistent_cache_payload_size_allowed(static_cast<uint64_t>(payload.size())))
{
+ return false;
+ }
+
+ MonotonicStopWatch watch;
+ watch.start();
+ auto stop_watch = [&]() { *write_time = watch.elapsed_time(); };
+
+ const std::string disk_cache_key =
get_persistent_cache_key(context.format, context.key);
+ const auto hash = io::BlockFileCache::hash(disk_cache_key);
+ io::BlockFileCache* cache = get_block_file_cache(hash);
+ if (cache == nullptr) {
+ stop_watch();
+ return false;
+ }
+
+ const std::string value = build_disk_cache_value(context.format,
context.modification_time,
+ context.file_size,
payload);
+ io::ReadStatistics stats;
+ io::CacheContext cache_context = build_meta_cache_context();
+ cache_context.stats = &stats;
+ auto holder = cache->get_or_set(hash, 0, value.size(), cache_context);
+ for (const auto& block : holder.file_blocks) {
+ auto state = block->state();
+ if (state == io::FileBlock::State::DOWNLOADING &&
!block->is_downloader()) {
+ state = block->wait();
+ }
+ if (state == io::FileBlock::State::DOWNLOADED) {
+ continue;
+ }
+ if (state != io::FileBlock::State::EMPTY) {
+ VLOG_DEBUG << "insert file meta disk cache failed: file block is
not writable";
+ stop_watch();
+ return false;
+ }
+
+ if (block->get_or_set_downloader() != io::FileBlock::get_caller_id()) {
+ VLOG_DEBUG << "insert file meta disk cache failed: file block has
another downloader";
+ stop_watch();
+ return false;
+ }
+ const auto& range = block->range();
+ DCHECK_LT(range.right, value.size());
+ Status status = block->append(Slice(value.data() + range.left,
range.size()));
+ if (!status.ok()) {
+ VLOG_DEBUG << "insert file meta disk cache failed: " << status;
+ stop_watch();
+ return false;
+ }
+ status = block->finalize();
Review Comment:
`get_or_set()` can return a mixed holder when the INDEX queue cannot reserve
the full footer value: earlier blocks may be `EMPTY` and get finalized, while a
later block is `SKIP_CACHE` or otherwise not writable. This path returns
`false` after leaving those earlier blocks under the metadata `hash`. A later
lookup can read the header from the finalized prefix, then fail the payload
read because of holes; that miss path only returns `false` and does not
invalidate the entry, so the partial value keeps consuming INDEX cache and
every subsequent lookup for this footer misses until eviction. Please remove
the whole `hash` on any insert failure after `get_or_set()` has returned
blocks, or pre-reserve/write the value atomically so failed inserts cannot
leave a poisoned persistent metadata entry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]