gavinchou commented on code in PR #59892:
URL: https://github.com/apache/doris/pull/59892#discussion_r2693327076
##########
be/src/io/fs/packed_file_manager.cpp:
##########
@@ -108,6 +113,91 @@ Status append_packed_info_trailer(FileWriter* writer,
const std::string& packed_
return writer->append(Slice(trailer));
}
+// write small file data to file cache
+void do_write_to_file_cache(const std::string& small_file_path, const
std::string& data,
+ int64_t tablet_id) {
+ if (data.empty()) {
+ return;
+ }
+
+ // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat")
+ Path path(small_file_path);
+ UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native());
+
+ VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
+ << " filename=" << path.filename().native() << " hash=" <<
cache_hash.to_string()
+ << " size=" << data.size() << " tablet_id=" << tablet_id;
+
+ BlockFileCache* file_cache =
FileCacheFactory::instance()->get_by_path(cache_hash);
+ if (file_cache == nullptr) {
+ return; // Cache not available, skip
+ }
+
+ // Allocate cache blocks
+ CacheContext ctx;
+ ctx.cache_type = FileCacheType::NORMAL;
+ ctx.tablet_id = tablet_id;
+ ReadStatistics stats;
+ ctx.stats = &stats;
+
+ FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0,
data.size(), ctx);
+
+ // Write data to cache blocks
+ size_t data_offset = 0;
+ for (auto& block : holder.file_blocks) {
+ if (data_offset >= data.size()) {
+ break;
+ }
+ size_t block_size = block->range().size();
+ size_t write_size = std::min(block_size, data.size() - data_offset);
+
+ if (block->state() == FileBlock::State::EMPTY) {
+ block->get_or_set_downloader();
+ if (block->is_downloader()) {
+ Slice s(data.data() + data_offset, write_size);
+ Status st = block->append(s);
+ if (st.ok()) {
+ st = block->finalize();
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "Write small file to cache failed: " <<
st.msg();
+ }
+ }
+ }
+ data_offset += write_size;
+ }
+}
+
+// Async wrapper: submit cache write task to thread pool
+// Note: data is copied to ensure lifetime beyond async execution
+void write_small_file_to_cache_async(const std::string& small_file_path, const
Slice& data,
+ int64_t tablet_id) {
+ if (!config::enable_file_cache || data.size == 0) {
+ return;
+ }
+
+ // Copy data since original buffer may be reused before async task executes
+ // For small files (< 1MB), copy overhead is acceptable
+ std::string data_copy(data.data, data.size);
+
+ auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
+ if (thread_pool == nullptr) {
+ // Fallback to sync write if thread pool not available
+ do_write_to_file_cache(small_file_path, data_copy, tablet_id);
+ return;
+ }
+
+ Status st = thread_pool->submit_func(
+ [path = small_file_path, data = std::move(data_copy), tablet_id]()
{
+ do_write_to_file_cache(path, data, tablet_id);
+ });
+
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to submit cache write task: " << st.msg();
Review Comment:
we should also log the small file path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]