freemandealer commented on code in PR #53824:
URL: https://github.com/apache/doris/pull/53824#discussion_r2275677477


##########
be/src/cloud/cloud_meta_mgr.cpp:
##########
@@ -807,8 +807,40 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
                 //   after doing EMPTY_CUMULATIVE compaction, MS cp is 13, 
get_rowset will return [2-11][12-12].
                 bool version_overlap =
                         tablet->max_version_unlocked() >= 
rowsets.front()->start_version();
-                tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
-                                    options.warmup_delta_data);
+                if (config::enable_query_driven_warmup && 
options.warmup_delta_data &&
+                    options.query_version > 0 && 
!tablet->enable_unique_key_merge_on_write()) {
+                    VLOG_DEBUG << "warmup rowset";
+                    std::vector<RowsetSharedPtr> new_rowsets;
+                    std::vector<RowsetSharedPtr> overlapping_rowsets;
+                    if (tablet->split_rowsets_by_version_overlap(rowsets, 
&new_rowsets,
+                                                                 
&overlapping_rowsets)) {
+                        VLOG_DEBUG << "warmup rowset, split into 2 sets, 
new_rowsets: "
+                                   << new_rowsets.size()
+                                   << ", overlapping_rowsets: " << 
overlapping_rowsets.size();
+                        // add all new rowsets directly, warmup async
+                        tablet->add_rowsets(std::move(new_rowsets), 
version_overlap, wlock, true);
+                        for (auto rs : overlapping_rowsets) {
+                            if (rs->version().second == 1) {
+                                // [0-1] rowset is empty for each tablet, skip 
it
+                                continue;
+                            }
+                            // the rowset will be added to tablet meta in the 
callback method after warm up
+                            tablet->warm_up_rowset_unlocked(rs, true, true);
+                        }
+                    } else {
+                        VLOG_DEBUG << "warmup rowset, can't split into 2 sets";
+                        // add all rowsets directly, warmup async
+                        tablet->add_rowsets(std::move(rowsets), 
version_overlap, wlock, true);
+                    }
+                } else {
+                    VLOG_DEBUG << "add rowset without warmup, the config is: "
+                               << config::enable_query_driven_warmup
+                               << ", is mow: " << 
tablet->enable_unique_key_merge_on_write()
+                               << ", version_overlap: " << version_overlap
+                               << ", warmup_delta_data: " << 
options.warmup_delta_data;
+                    tablet->add_rowsets(std::move(rowsets), version_overlap, 
wlock,
+                                        version_overlap || 
options.warmup_delta_data);

Review Comment:
   why add condition version_overlap?



##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema() 
const {
     return _merged_tablet_schema;
 }
 
+bool CloudTablet::split_rowsets_by_version_overlap(
+        const std::vector<RowsetSharedPtr>& input_rowsets,
+        std::vector<RowsetSharedPtr>* new_rowsets,
+        std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+    auto max_version = max_version_unlocked();
+    for (auto rs : input_rowsets) {
+        if (rs->version().first > max_version) {
+            new_rowsets->push_back(rs);
+        } else if (rs->version().second <= max_version) {
+            overlapping_rowsets->push_back(rs);
+        } else {
+            new_rowsets->clear();
+            overlapping_rowsets->clear();
+            return false;
+        }
+    }
+    return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+    std::shared_lock rlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    std::lock_guard wlock(_meta_lock);
+    return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return false;
+    }
+    if (state == WarmUpState::TRIGGERED_BY_JOB) {
+        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+    } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+    }
+    _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state, 
rowset->num_segments());
+    return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id, 
Status status) {
+    std::lock_guard wlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << 
", " << status;
+    if (status.ok()) {
+        g_file_cache_warm_up_segment_complete_num << 1;
+        _rowset_warm_up_states[rowset_id].second--;
+        if (_rowset_warm_up_states[rowset_id].second == 0) {
+            g_file_cache_warm_up_rowset_complete_num << 1;
+            _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+        }
+        return _rowset_warm_up_states[rowset_id].first;
+    }
+    // !status.ok()
+    g_file_cache_warm_up_segment_complete_num << 1;
+    g_file_cache_warm_up_rowset_complete_num << 1;
+    _rowset_warm_up_states.erase(rowset_id);
+    return WarmUpState::NONE;
+}
+
+void CloudTablet::warm_up_rowset_unlocked(RowsetSharedPtr rowset, bool 
version_overlap,
+                                          bool delay_add_rowset) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return;
+    }
+    if (delay_add_rowset) {
+        g_file_cache_query_driven_warmup_delayed_rowset_num << 1;
+        LOG(INFO) << "triggered a warm up for overlapping rowset " << 
rowset->version()
+                  << ", will add it to tablet meta latter";
+    }
+    // warmup rowset data in background
+    bool download_task_submitted = false;
+    for (int seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
+        const auto& rowset_meta = rowset->rowset_meta();
+        constexpr int64_t interval = 600; // 10 mins
+        // When BE restart and receive the `load_sync` rpc, it will sync all 
historical rowsets first time.
+        // So we need to filter out the old rowsets avoid to download the 
whole table.
+        if (!version_overlap &&
+            ::time(nullptr) - rowset_meta->newest_write_timestamp() >= 
interval) {
+            continue;
+        }
+
+        auto storage_resource = rowset_meta->remote_storage_resource();
+        if (!storage_resource) {
+            LOG(WARNING) << storage_resource.error();
+            continue;
+        }
+
+        int64_t expiration_time =
+                _tablet_meta->ttl_seconds() == 0 || 
rowset_meta->newest_write_timestamp() <= 0
+                        ? 0
+                        : rowset_meta->newest_write_timestamp() + 
_tablet_meta->ttl_seconds();
+        g_file_cache_cloud_tablet_submitted_segment_num << 1;
+        if (rowset->rowset_meta()->segment_file_size(seg_id) > 0) {
+            g_file_cache_cloud_tablet_submitted_segment_size
+                    << rowset->rowset_meta()->segment_file_size(seg_id);
+        }
+        auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
+        // clang-format off
+        
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
+                .path = 
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+                .file_size = rowset->rowset_meta()->segment_file_size(seg_id),
+                .file_system = storage_resource.value()->fs,
+                .ctx =
+                        {
+                                .expiration_time = expiration_time,
+                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                        },
+                .download_done {[self, rowset, delay_add_rowset](Status st) {
+                    self->warm_up_done_cb(rowset, st, delay_add_rowset);
+                    if (!st) {
+                        LOG_WARNING("add rowset warm up error ").error(st);
+                    }
+                }},
+        });
+        download_task_submitted = true;
+
+        auto download_idx_file = [&](const io::Path& idx_path, int64_t 
idx_size) {
+            io::DownloadFileMeta meta {
+                    .path = idx_path,
+                    .file_size = idx_size,
+                    .file_system = storage_resource.value()->fs,
+                    .ctx =
+                            {
+                                    .expiration_time = expiration_time,
+                                    .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                            },
+                     // TODO: consider index file for warm up state management

Review Comment:
   This TODO is critical.
   The easist improvement: submit index download before segment files? may 
decrease the possibility of Remote IO only for inverted index.



##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema() 
const {
     return _merged_tablet_schema;
 }
 
+bool CloudTablet::split_rowsets_by_version_overlap(
+        const std::vector<RowsetSharedPtr>& input_rowsets,
+        std::vector<RowsetSharedPtr>* new_rowsets,
+        std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+    auto max_version = max_version_unlocked();
+    for (auto rs : input_rowsets) {
+        if (rs->version().first > max_version) {
+            new_rowsets->push_back(rs);
+        } else if (rs->version().second <= max_version) {
+            overlapping_rowsets->push_back(rs);
+        } else {
+            new_rowsets->clear();
+            overlapping_rowsets->clear();
+            return false;
+        }
+    }
+    return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+    std::shared_lock rlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    std::lock_guard wlock(_meta_lock);
+    return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return false;
+    }
+    if (state == WarmUpState::TRIGGERED_BY_JOB) {
+        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+    } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+    }
+    _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state, 
rowset->num_segments());
+    return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id, 
Status status) {
+    std::lock_guard wlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << 
", " << status;
+    if (status.ok()) {
+        g_file_cache_warm_up_segment_complete_num << 1;
+        _rowset_warm_up_states[rowset_id].second--;
+        if (_rowset_warm_up_states[rowset_id].second == 0) {
+            g_file_cache_warm_up_rowset_complete_num << 1;
+            _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+        }
+        return _rowset_warm_up_states[rowset_id].first;
+    }
+    // !status.ok()
+    g_file_cache_warm_up_segment_complete_num << 1;
+    g_file_cache_warm_up_rowset_complete_num << 1;
+    _rowset_warm_up_states.erase(rowset_id);
+    return WarmUpState::NONE;

Review Comment:
   WarmUpState::PARTIAL_FAILURE?
   if one segment of the rowset is failed, all upcoming segments of the same 
rowset will return WarmUpState::NONE and the counters are never updated.



##########
be/src/cloud/cloud_cumulative_compaction.cpp:
##########
@@ -375,7 +380,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
         if (_input_rowsets.size() == 1) {
             DCHECK_EQ(_output_rowset->version(), _input_rowsets[0]->version());
             // MUST NOT move input rowset to stale path
-            cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock);
+            cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock, true);

Review Comment:
   why this branch need warm up perticularly?



##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema() 
const {
     return _merged_tablet_schema;
 }
 
+bool CloudTablet::split_rowsets_by_version_overlap(
+        const std::vector<RowsetSharedPtr>& input_rowsets,
+        std::vector<RowsetSharedPtr>* new_rowsets,
+        std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+    auto max_version = max_version_unlocked();
+    for (auto rs : input_rowsets) {
+        if (rs->version().first > max_version) {
+            new_rowsets->push_back(rs);
+        } else if (rs->version().second <= max_version) {
+            overlapping_rowsets->push_back(rs);
+        } else {
+            new_rowsets->clear();
+            overlapping_rowsets->clear();
+            return false;
+        }
+    }
+    return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+    std::shared_lock rlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    std::lock_guard wlock(_meta_lock);
+    return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return false;
+    }
+    if (state == WarmUpState::TRIGGERED_BY_JOB) {
+        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+    } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+    }
+    _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state, 
rowset->num_segments());
+    return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id, 
Status status) {
+    std::lock_guard wlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << 
", " << status;
+    if (status.ok()) {
+        g_file_cache_warm_up_segment_complete_num << 1;
+        _rowset_warm_up_states[rowset_id].second--;
+        if (_rowset_warm_up_states[rowset_id].second == 0) {
+            g_file_cache_warm_up_rowset_complete_num << 1;
+            _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+        }
+        return _rowset_warm_up_states[rowset_id].first;
+    }
+    // !status.ok()
+    g_file_cache_warm_up_segment_complete_num << 1;
+    g_file_cache_warm_up_rowset_complete_num << 1;
+    _rowset_warm_up_states.erase(rowset_id);
+    return WarmUpState::NONE;
+}
+
+void CloudTablet::warm_up_rowset_unlocked(RowsetSharedPtr rowset, bool 
version_overlap,
+                                          bool delay_add_rowset) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return;
+    }
+    if (delay_add_rowset) {
+        g_file_cache_query_driven_warmup_delayed_rowset_num << 1;
+        LOG(INFO) << "triggered a warm up for overlapping rowset " << 
rowset->version()
+                  << ", will add it to tablet meta latter";
+    }
+    // warmup rowset data in background
+    bool download_task_submitted = false;
+    for (int seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
+        const auto& rowset_meta = rowset->rowset_meta();
+        constexpr int64_t interval = 600; // 10 mins
+        // When BE restart and receive the `load_sync` rpc, it will sync all 
historical rowsets first time.
+        // So we need to filter out the old rowsets avoid to download the 
whole table.
+        if (!version_overlap &&
+            ::time(nullptr) - rowset_meta->newest_write_timestamp() >= 
interval) {
+            continue;
+        }
+
+        auto storage_resource = rowset_meta->remote_storage_resource();
+        if (!storage_resource) {
+            LOG(WARNING) << storage_resource.error();
+            continue;
+        }
+
+        int64_t expiration_time =
+                _tablet_meta->ttl_seconds() == 0 || 
rowset_meta->newest_write_timestamp() <= 0
+                        ? 0
+                        : rowset_meta->newest_write_timestamp() + 
_tablet_meta->ttl_seconds();
+        g_file_cache_cloud_tablet_submitted_segment_num << 1;
+        if (rowset->rowset_meta()->segment_file_size(seg_id) > 0) {
+            g_file_cache_cloud_tablet_submitted_segment_size
+                    << rowset->rowset_meta()->segment_file_size(seg_id);
+        }
+        auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
+        // clang-format off
+        
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
+                .path = 
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+                .file_size = rowset->rowset_meta()->segment_file_size(seg_id),
+                .file_system = storage_resource.value()->fs,
+                .ctx =
+                        {
+                                .expiration_time = expiration_time,
+                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                        },
+                .download_done {[self, rowset, delay_add_rowset](Status st) {
+                    self->warm_up_done_cb(rowset, st, delay_add_rowset);
+                    if (!st) {
+                        LOG_WARNING("add rowset warm up error ").error(st);
+                    }
+                }},
+        });
+        download_task_submitted = true;
+
+        auto download_idx_file = [&](const io::Path& idx_path, int64_t 
idx_size) {
+            io::DownloadFileMeta meta {
+                    .path = idx_path,
+                    .file_size = idx_size,
+                    .file_system = storage_resource.value()->fs,
+                    .ctx =
+                            {
+                                    .expiration_time = expiration_time,
+                                    .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                            },
+                     // TODO: consider index file for warm up state management
+                    .download_done {[](Status st) {
+                        if (!st) {
+                            LOG_WARNING("add rowset warm up error ").error(st);
+                        }
+                    }},
+            };
+            
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
+            g_file_cache_cloud_tablet_submitted_index_num << 1;
+            g_file_cache_cloud_tablet_submitted_index_size << idx_size;
+        };
+        // clang-format on
+        auto schema_ptr = rowset_meta->tablet_schema();
+        auto idx_version = schema_ptr->get_inverted_index_storage_format();
+        if (idx_version == InvertedIndexStorageFormatPB::V1) {
+            std::unordered_map<int64_t, int64_t> index_size_map;
+            auto&& inverted_index_info = 
rowset_meta->inverted_index_file_info(seg_id);
+            for (const auto& info : inverted_index_info.index_info()) {
+                if (info.index_file_size() != -1) {
+                    index_size_map[info.index_id()] = info.index_file_size();
+                } else {
+                    VLOG_DEBUG << "Invalid index_file_size for segment_id " << 
seg_id
+                               << ", index_id " << info.index_id();
+                }
+            }
+            for (const auto& index : schema_ptr->inverted_indexes()) {
+                auto idx_path = storage_resource.value()->remote_idx_v1_path(
+                        *rowset_meta, seg_id, index->index_id(), 
index->get_index_suffix());
+                download_idx_file(idx_path, index_size_map[index->index_id()]);
+            }
+        } else {
+            if (schema_ptr->has_inverted_index()) {
+                auto&& inverted_index_info = 
rowset_meta->inverted_index_file_info(seg_id);
+                int64_t idx_size = 0;
+                if (inverted_index_info.has_index_size()) {
+                    idx_size = inverted_index_info.index_size();
+                } else {
+                    VLOG_DEBUG << "index_size is not set for segment " << 
seg_id;
+                }
+                auto idx_path = 
storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
+                download_idx_file(idx_path, idx_size);
+            }
+        }
+    }
+    if (download_task_submitted) {

Review Comment:
   Is it always TRUE ?



##########
be/src/cloud/cloud_warm_up_manager.h:
##########
@@ -37,6 +37,13 @@ enum class DownloadType {
     S3,
 };
 
+enum class WarmUpState : int {
+    NONE,
+    TRIGGERED_BY_SYNC_ROWSET,
+    TRIGGERED_BY_JOB,
+    DONE,

Review Comment:
   it should be clearer if we add another state: PARTIAL_FAILURE



##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema() 
const {
     return _merged_tablet_schema;
 }
 
+bool CloudTablet::split_rowsets_by_version_overlap(
+        const std::vector<RowsetSharedPtr>& input_rowsets,
+        std::vector<RowsetSharedPtr>* new_rowsets,
+        std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+    auto max_version = max_version_unlocked();
+    for (auto rs : input_rowsets) {
+        if (rs->version().first > max_version) {
+            new_rowsets->push_back(rs);
+        } else if (rs->version().second <= max_version) {
+            overlapping_rowsets->push_back(rs);
+        } else {
+            new_rowsets->clear();
+            overlapping_rowsets->clear();
+            return false;
+        }
+    }
+    return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+    std::shared_lock rlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    std::lock_guard wlock(_meta_lock);
+    return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return false;
+    }
+    if (state == WarmUpState::TRIGGERED_BY_JOB) {
+        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+    } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+    }
+    _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state, 
rowset->num_segments());
+    return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id, 
Status status) {
+    std::lock_guard wlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << 
", " << status;
+    if (status.ok()) {
+        g_file_cache_warm_up_segment_complete_num << 1;
+        _rowset_warm_up_states[rowset_id].second--;
+        if (_rowset_warm_up_states[rowset_id].second == 0) {
+            g_file_cache_warm_up_rowset_complete_num << 1;
+            _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+        }
+        return _rowset_warm_up_states[rowset_id].first;
+    }
+    // !status.ok()
+    g_file_cache_warm_up_segment_complete_num << 1;
+    g_file_cache_warm_up_rowset_complete_num << 1;
+    _rowset_warm_up_states.erase(rowset_id);
+    return WarmUpState::NONE;
+}
+
+void CloudTablet::warm_up_rowset_unlocked(RowsetSharedPtr rowset, bool 
version_overlap,
+                                          bool delay_add_rowset) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return;
+    }
+    if (delay_add_rowset) {
+        g_file_cache_query_driven_warmup_delayed_rowset_num << 1;
+        LOG(INFO) << "triggered a warm up for overlapping rowset " << 
rowset->version()
+                  << ", will add it to tablet meta latter";
+    }
+    // warmup rowset data in background
+    bool download_task_submitted = false;
+    for (int seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
+        const auto& rowset_meta = rowset->rowset_meta();
+        constexpr int64_t interval = 600; // 10 mins
+        // When BE restart and receive the `load_sync` rpc, it will sync all 
historical rowsets first time.
+        // So we need to filter out the old rowsets avoid to download the 
whole table.
+        if (!version_overlap &&
+            ::time(nullptr) - rowset_meta->newest_write_timestamp() >= 
interval) {
+            continue;
+        }
+
+        auto storage_resource = rowset_meta->remote_storage_resource();
+        if (!storage_resource) {
+            LOG(WARNING) << storage_resource.error();
+            continue;
+        }
+
+        int64_t expiration_time =
+                _tablet_meta->ttl_seconds() == 0 || 
rowset_meta->newest_write_timestamp() <= 0
+                        ? 0
+                        : rowset_meta->newest_write_timestamp() + 
_tablet_meta->ttl_seconds();
+        g_file_cache_cloud_tablet_submitted_segment_num << 1;
+        if (rowset->rowset_meta()->segment_file_size(seg_id) > 0) {
+            g_file_cache_cloud_tablet_submitted_segment_size
+                    << rowset->rowset_meta()->segment_file_size(seg_id);
+        }
+        auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
+        // clang-format off
+        
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
+                .path = 
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+                .file_size = rowset->rowset_meta()->segment_file_size(seg_id),
+                .file_system = storage_resource.value()->fs,
+                .ctx =
+                        {
+                                .expiration_time = expiration_time,
+                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                        },
+                .download_done {[self, rowset, delay_add_rowset](Status st) {
+                    self->warm_up_done_cb(rowset, st, delay_add_rowset);
+                    if (!st) {
+                        LOG_WARNING("add rowset warm up error ").error(st);
+                    }
+                }},
+        });
+        download_task_submitted = true;
+
+        auto download_idx_file = [&](const io::Path& idx_path, int64_t 
idx_size) {
+            io::DownloadFileMeta meta {
+                    .path = idx_path,
+                    .file_size = idx_size,
+                    .file_system = storage_resource.value()->fs,
+                    .ctx =
+                            {
+                                    .expiration_time = expiration_time,
+                                    .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                            },
+                     // TODO: consider index file for warm up state management
+                    .download_done {[](Status st) {
+                        if (!st) {
+                            LOG_WARNING("add rowset warm up error ").error(st);
+                        }
+                    }},
+            };
+            
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
+            g_file_cache_cloud_tablet_submitted_index_num << 1;
+            g_file_cache_cloud_tablet_submitted_index_size << idx_size;
+        };
+        // clang-format on
+        auto schema_ptr = rowset_meta->tablet_schema();
+        auto idx_version = schema_ptr->get_inverted_index_storage_format();
+        if (idx_version == InvertedIndexStorageFormatPB::V1) {
+            std::unordered_map<int64_t, int64_t> index_size_map;
+            auto&& inverted_index_info = 
rowset_meta->inverted_index_file_info(seg_id);
+            for (const auto& info : inverted_index_info.index_info()) {
+                if (info.index_file_size() != -1) {
+                    index_size_map[info.index_id()] = info.index_file_size();
+                } else {
+                    VLOG_DEBUG << "Invalid index_file_size for segment_id " << 
seg_id
+                               << ", index_id " << info.index_id();
+                }
+            }
+            for (const auto& index : schema_ptr->inverted_indexes()) {
+                auto idx_path = storage_resource.value()->remote_idx_v1_path(
+                        *rowset_meta, seg_id, index->index_id(), 
index->get_index_suffix());
+                download_idx_file(idx_path, index_size_map[index->index_id()]);
+            }
+        } else {
+            if (schema_ptr->has_inverted_index()) {
+                auto&& inverted_index_info = 
rowset_meta->inverted_index_file_info(seg_id);
+                int64_t idx_size = 0;
+                if (inverted_index_info.has_index_size()) {
+                    idx_size = inverted_index_info.index_size();
+                } else {
+                    VLOG_DEBUG << "index_size is not set for segment " << 
seg_id;
+                }
+                auto idx_path = 
storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
+                download_idx_file(idx_path, idx_size);
+            }
+        }
+    }
+    if (download_task_submitted) {
+        VLOG_DEBUG << "warm up rowset " << rowset->version() << " triggerd by 
sync rowset";
+        add_rowset_warmup_state_unlocked(rowset->rowset_meta(),
+                                         
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+    }
+}
+
+bool CloudTablet::is_warm_up_conflict_with_compaction() {
+    std::shared_lock rdlock(_meta_lock);
+    for (auto& [rowset_id, state] : _rowset_warm_up_states) {
+        if (state.first != WarmUpState::DONE) {
+            return true;
+        }
+    }
+    return false;
+}
+
+void CloudTablet::warm_up_done_cb(RowsetSharedPtr rowset, Status status, bool 
delay_add_rowset) {
+    if (delay_add_rowset) {
+        DBUG_EXECUTE_IF("CloudTablet.warm_up_done_cb.inject_sleep_s", {
+            auto sleep_time = dp->param("sleep", 3);
+            LOG_INFO("CloudTablet.warm_up_done_cb.inject_sleep {} s", 
sleep_time)
+                    .tag("tablet_id", tablet_id());
+            std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
+        });
+    }
+    VLOG_DEBUG << "warm up rowset " << rowset->version() << " done";
+    auto res = complete_rowset_segment_warmup(rowset->rowset_id(), status);
+    if (res != WarmUpState::DONE && res != WarmUpState::NONE) {
+        if (res == WarmUpState::TRIGGERED_BY_JOB) {
+            LOG(WARNING) << "should not happen, rowset: " << rowset->version()
+                         << " warm up is triggered by warm up job but use "
+                            "CloudTablet::warm_up_done_cb as done callback";
+        }
+        // none success or failure
+        return;
+    }
+    if (config::enable_query_driven_warmup && delay_add_rowset) {
+        g_file_cache_query_driven_warmup_delayed_rowset_add_num << 1;
+        LOG(INFO) << "warm up completed, rowset: " << rowset->rowset_id()
+                  << ", version: " << rowset->version();
+        std::unique_lock<std::shared_mutex> meta_lock(_meta_lock);
+        for (auto [ver, rs] : _rs_version_map) {
+            // e.g. ver = [5-10]
+            // if rowset->version() is [5-8], it should not be added
+            // if rowset->version() is [4-8] or [6-15], it should not be added
+            // if rowset->version() is [5-10], it should be added
+            // if rowset->version() is [5-15], it should be added
+            if (ver != rowset->version() && !rowset->version().contains(ver) &&
+                !(ver.second < rowset->version().first || ver.first > 
rowset->version().second)) {
+                
g_file_cache_query_driven_warmup_delayed_rowset_add_failure_num << 1;
+                delete_rowsets({rowset}, meta_lock);
+                LOG(WARNING)
+                        << "rowset " << rowset->version()
+                        << " is not added to _rs_version_map due to version 
overlap with rowset "
+                        << rs->rowset_id() << ", version: " << ver << ", add 
it to stale rowsets";
+                return;
+            }
+        }
+        // no matter warmup success or failed, we should add rowset to tablet 
meta

Review Comment:
   a bvar is needed to show failure count, so we know who to blame if cache 
miss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to