freemandealer commented on code in PR #49456:
URL: https://github.com/apache/doris/pull/49456#discussion_r2163159383


##########
be/src/io/cache/block_file_cache.cpp:
##########
@@ -2097,6 +2160,367 @@ void BlockFileCache::update_ttl_atime(const 
UInt128Wrapper& hash) {
     };
 }
 
+BlockFileCache::CacheLRULogQueue& 
BlockFileCache::get_lru_log_queue(FileCacheType type) {
+    switch (type) {
+    case FileCacheType::INDEX:
+        return _index_lru_log_queue;
+    case FileCacheType::DISPOSABLE:
+        return _disposable_lru_log_queue;
+    case FileCacheType::NORMAL:
+        return _normal_lru_log_queue;
+    case FileCacheType::TTL:
+        return _ttl_lru_log_queue;
+    default:
+        DCHECK(false);
+    }
+    return _normal_lru_log_queue;
+}
+
+void BlockFileCache::record_queue_event(CacheLRULogQueue& log_queue, 
CacheLRULogType log_type,
+                                        const UInt128Wrapper hash, const 
size_t offset,
+                                        const size_t size) {
+    log_queue.push_back(std::make_unique<CacheLRULog>(log_type, hash, offset, 
size));
+}
+
+void BlockFileCache::replay_queue_event(CacheLRULogQueue& log_queue, LRUQueue& 
shadow_queue) {
+    // we don't need the real cache lock for the shadow queue, but we do need 
a lock to prevent read/write contension
+    std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
+    while (!log_queue.empty()) {
+        auto log = std::move(log_queue.front());
+        log_queue.pop_front();
+        try {
+            switch (log->type) {
+            case CacheLRULogType::ADD: {
+                shadow_queue.add(log->hash, log->offset, log->size, 
lru_log_lock);
+                break;
+            }
+            case CacheLRULogType::REMOVE: {
+                auto it = shadow_queue.get(log->hash, log->offset, 
lru_log_lock);
+                if (it != shadow_queue.end()) {
+                    shadow_queue.remove(it, lru_log_lock);
+                } else {
+                    LOG(WARNING) << "REMOVE failed, doesn't exist in shadow 
queue";
+                }
+                break;
+            }
+            case CacheLRULogType::MOVETOBACK: {
+                auto it = shadow_queue.get(log->hash, log->offset, 
lru_log_lock);
+                if (it != shadow_queue.end()) {
+                    shadow_queue.move_to_end(it, lru_log_lock);
+                } else {
+                    LOG(WARNING) << "MOVETOBACK failed, doesn't exist in 
shadow queue";
+                }
+                break;
+            }
+            default:
+                LOG(WARNING) << "Unknown CacheLRULogType: " << 
static_cast<int>(log->type);
+                break;
+            }
+        } catch (const std::exception& e) {
+            LOG(WARNING) << "Failed to replay queue event: " << e.what();
+        }
+    }
+}
+
+void BlockFileCache::run_background_lru_log_replay() {
+    while (!_close) {
+        int64_t interval_ms = 
config::file_cache_background_lru_log_replay_interval_ms;
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(close_lock, 
std::chrono::milliseconds(interval_ms));
+            if (_close) {
+                break;
+            }
+        }
+
+        replay_queue_event(_ttl_lru_log_queue, _shadow_ttl_queue);
+        replay_queue_event(_index_lru_log_queue, _shadow_index_queue);
+        replay_queue_event(_normal_lru_log_queue, _shadow_normal_queue);
+        replay_queue_event(_disposable_lru_log_queue, 
_shadow_disposable_queue);
+
+        //TODO(zhengyu): add debug facilities to check diff between real and 
shadow queue
+    }
+}
+
+Status BlockFileCache::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status BlockFileCache::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+#ifndef USE_PROTOBUF_DUMP
+    // the greatest thing when using raw dump is that it saves 18.75% dump size
+    // with version in footer, we can still do upgrade easily
+    out.write(reinterpret_cast<const char*>(&hash), sizeof(hash));
+    out.write(reinterpret_cast<const char*>(&offset), sizeof(offset));
+    out.write(reinterpret_cast<const char*>(&size), sizeof(size));
+#else
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+    doris::io::cache::LruDumpEntryPB entry;
+    auto* hash_pb = entry.mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry.set_offset(offset);
+    entry.set_size(size);
+
+    std::string serialized;
+    if (!entry.SerializeToString(&serialized)) {
+        std::string warn_msg = fmt::format("dump lru serialize failed, 
file={}, hash={}, offset={}",
+                                           filename, hash.to_string(), offset);
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    out.write(serialized.data(), serialized.size());
+#endif
+    return check_ofstream_status(out, filename);
+}
+
+Status BlockFileCache::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // write footer: size_t entry_num, version, magic, totally 12 bytes
+    int8_t version = 1;
+    std::string magic_str = "DOR";

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to