freemandealer commented on code in PR #53824:
URL: https://github.com/apache/doris/pull/53824#discussion_r2275677477
##########
be/src/cloud/cloud_meta_mgr.cpp:
##########
@@ -807,8 +807,40 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
// after doing EMPTY_CUMULATIVE compaction, MS cp is 13,
get_rowset will return [2-11][12-12].
bool version_overlap =
tablet->max_version_unlocked() >=
rowsets.front()->start_version();
- tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
- options.warmup_delta_data);
+ if (config::enable_query_driven_warmup &&
options.warmup_delta_data &&
+ options.query_version > 0 &&
!tablet->enable_unique_key_merge_on_write()) {
+ VLOG_DEBUG << "warmup rowset";
+ std::vector<RowsetSharedPtr> new_rowsets;
+ std::vector<RowsetSharedPtr> overlapping_rowsets;
+ if (tablet->split_rowsets_by_version_overlap(rowsets,
&new_rowsets,
+
&overlapping_rowsets)) {
+ VLOG_DEBUG << "warmup rowset, split into 2 sets,
new_rowsets: "
+ << new_rowsets.size()
+ << ", overlapping_rowsets: " <<
overlapping_rowsets.size();
+ // add all new rowsets directly, warmup async
+ tablet->add_rowsets(std::move(new_rowsets),
version_overlap, wlock, true);
+ for (auto rs : overlapping_rowsets) {
+ if (rs->version().second == 1) {
+ // [0-1] rowset is empty for each tablet, skip
it
+ continue;
+ }
+ // the rowset will be added to tablet meta in the
callback method after warm up
+ tablet->warm_up_rowset_unlocked(rs, true, true);
+ }
+ } else {
+ VLOG_DEBUG << "warmup rowset, can't split into 2 sets";
+ // add all rowsets directly, warmup async
+ tablet->add_rowsets(std::move(rowsets),
version_overlap, wlock, true);
+ }
+ } else {
+ VLOG_DEBUG << "add rowset without warmup, the config is: "
+ << config::enable_query_driven_warmup
+ << ", is mow: " <<
tablet->enable_unique_key_merge_on_write()
+ << ", version_overlap: " << version_overlap
+ << ", warmup_delta_data: " <<
options.warmup_delta_data;
+ tablet->add_rowsets(std::move(rowsets), version_overlap,
wlock,
+ version_overlap ||
options.warmup_delta_data);
Review Comment:
why add condition version_overlap?
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema()
const {
return _merged_tablet_schema;
}
+bool CloudTablet::split_rowsets_by_version_overlap(
+ const std::vector<RowsetSharedPtr>& input_rowsets,
+ std::vector<RowsetSharedPtr>* new_rowsets,
+ std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+ auto max_version = max_version_unlocked();
+ for (auto rs : input_rowsets) {
+ if (rs->version().first > max_version) {
+ new_rowsets->push_back(rs);
+ } else if (rs->version().second <= max_version) {
+ overlapping_rowsets->push_back(rs);
+ } else {
+ new_rowsets->clear();
+ overlapping_rowsets->clear();
+ return false;
+ }
+ }
+ return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+ std::shared_lock rlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset,
WarmUpState state) {
+ std::lock_guard wlock(_meta_lock);
+ return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset,
WarmUpState state) {
+ if (_rowset_warm_up_states.find(rowset->rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return false;
+ }
+ if (state == WarmUpState::TRIGGERED_BY_JOB) {
+ g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+ } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+ g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+ }
+ _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state,
rowset->num_segments());
+ return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id,
Status status) {
+ std::lock_guard wlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id <<
", " << status;
+ if (status.ok()) {
+ g_file_cache_warm_up_segment_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].second--;
+ if (_rowset_warm_up_states[rowset_id].second == 0) {
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+ }
+ // !status.ok()
+ g_file_cache_warm_up_segment_complete_num << 1;
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states.erase(rowset_id);
+ return WarmUpState::NONE;
+}
+
+void CloudTablet::warm_up_rowset_unlocked(RowsetSharedPtr rowset, bool
version_overlap,
+ bool delay_add_rowset) {
+ if (_rowset_warm_up_states.find(rowset->rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return;
+ }
+ if (delay_add_rowset) {
+ g_file_cache_query_driven_warmup_delayed_rowset_num << 1;
+ LOG(INFO) << "triggered a warm up for overlapping rowset " <<
rowset->version()
+ << ", will add it to tablet meta latter";
+ }
+ // warmup rowset data in background
+ bool download_task_submitted = false;
+ for (int seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
+ const auto& rowset_meta = rowset->rowset_meta();
+ constexpr int64_t interval = 600; // 10 mins
+ // When BE restart and receive the `load_sync` rpc, it will sync all
historical rowsets first time.
+ // So we need to filter out the old rowsets avoid to download the
whole table.
+ if (!version_overlap &&
+ ::time(nullptr) - rowset_meta->newest_write_timestamp() >=
interval) {
+ continue;
+ }
+
+ auto storage_resource = rowset_meta->remote_storage_resource();
+ if (!storage_resource) {
+ LOG(WARNING) << storage_resource.error();
+ continue;
+ }
+
+ int64_t expiration_time =
+ _tablet_meta->ttl_seconds() == 0 ||
rowset_meta->newest_write_timestamp() <= 0
+ ? 0
+ : rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
+ g_file_cache_cloud_tablet_submitted_segment_num << 1;
+ if (rowset->rowset_meta()->segment_file_size(seg_id) > 0) {
+ g_file_cache_cloud_tablet_submitted_segment_size
+ << rowset->rowset_meta()->segment_file_size(seg_id);
+ }
+ auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
+ // clang-format off
+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
+ .path =
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+ .file_size = rowset->rowset_meta()->segment_file_size(seg_id),
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done {[self, rowset, delay_add_rowset](Status st) {
+ self->warm_up_done_cb(rowset, st, delay_add_rowset);
+ if (!st) {
+ LOG_WARNING("add rowset warm up error ").error(st);
+ }
+ }},
+ });
+ download_task_submitted = true;
+
+ auto download_idx_file = [&](const io::Path& idx_path, int64_t
idx_size) {
+ io::DownloadFileMeta meta {
+ .path = idx_path,
+ .file_size = idx_size,
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ // TODO: consider index file for warm up state management
Review Comment:
This TODO is critical.
The easist improvement: submit index download before segment files? may
decrease the possibility of Remote IO only for inverted index.
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema()
const {
return _merged_tablet_schema;
}
+bool CloudTablet::split_rowsets_by_version_overlap(
+ const std::vector<RowsetSharedPtr>& input_rowsets,
+ std::vector<RowsetSharedPtr>* new_rowsets,
+ std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+ auto max_version = max_version_unlocked();
+ for (auto rs : input_rowsets) {
+ if (rs->version().first > max_version) {
+ new_rowsets->push_back(rs);
+ } else if (rs->version().second <= max_version) {
+ overlapping_rowsets->push_back(rs);
+ } else {
+ new_rowsets->clear();
+ overlapping_rowsets->clear();
+ return false;
+ }
+ }
+ return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+ std::shared_lock rlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset,
WarmUpState state) {
+ std::lock_guard wlock(_meta_lock);
+ return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset,
WarmUpState state) {
+ if (_rowset_warm_up_states.find(rowset->rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return false;
+ }
+ if (state == WarmUpState::TRIGGERED_BY_JOB) {
+ g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+ } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+ g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+ }
+ _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state,
rowset->num_segments());
+ return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id,
Status status) {
+ std::lock_guard wlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id <<
", " << status;
+ if (status.ok()) {
+ g_file_cache_warm_up_segment_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].second--;
+ if (_rowset_warm_up_states[rowset_id].second == 0) {
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+ }
+ // !status.ok()
+ g_file_cache_warm_up_segment_complete_num << 1;
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states.erase(rowset_id);
+ return WarmUpState::NONE;
Review Comment:
WarmUpState::PARTIAL_FAILURE?
if one segment of the rowset is failed, all upcoming segments of the same
rowset will return WarmUpState::NONE and the counters are never updated.
##########
be/src/cloud/cloud_cumulative_compaction.cpp:
##########
@@ -375,7 +380,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
if (_input_rowsets.size() == 1) {
DCHECK_EQ(_output_rowset->version(), _input_rowsets[0]->version());
// MUST NOT move input rowset to stale path
- cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock);
+ cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock, true);
Review Comment:
why this branch need warm up perticularly?
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema()
const {
return _merged_tablet_schema;
}
+bool CloudTablet::split_rowsets_by_version_overlap(
+ const std::vector<RowsetSharedPtr>& input_rowsets,
+ std::vector<RowsetSharedPtr>* new_rowsets,
+ std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+ auto max_version = max_version_unlocked();
+ for (auto rs : input_rowsets) {
+ if (rs->version().first > max_version) {
+ new_rowsets->push_back(rs);
+ } else if (rs->version().second <= max_version) {
+ overlapping_rowsets->push_back(rs);
+ } else {
+ new_rowsets->clear();
+ overlapping_rowsets->clear();
+ return false;
+ }
+ }
+ return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+ std::shared_lock rlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset,
WarmUpState state) {
+ std::lock_guard wlock(_meta_lock);
+ return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset,
WarmUpState state) {
+ if (_rowset_warm_up_states.find(rowset->rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return false;
+ }
+ if (state == WarmUpState::TRIGGERED_BY_JOB) {
+ g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+ } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+ g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+ }
+ _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state,
rowset->num_segments());
+ return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id,
Status status) {
+ std::lock_guard wlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id <<
", " << status;
+ if (status.ok()) {
+ g_file_cache_warm_up_segment_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].second--;
+ if (_rowset_warm_up_states[rowset_id].second == 0) {
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+ }
+ // !status.ok()
+ g_file_cache_warm_up_segment_complete_num << 1;
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states.erase(rowset_id);
+ return WarmUpState::NONE;
+}
+
+void CloudTablet::warm_up_rowset_unlocked(RowsetSharedPtr rowset, bool
version_overlap,
+ bool delay_add_rowset) {
+ if (_rowset_warm_up_states.find(rowset->rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return;
+ }
+ if (delay_add_rowset) {
+ g_file_cache_query_driven_warmup_delayed_rowset_num << 1;
+ LOG(INFO) << "triggered a warm up for overlapping rowset " <<
rowset->version()
+ << ", will add it to tablet meta latter";
+ }
+ // warmup rowset data in background
+ bool download_task_submitted = false;
+ for (int seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
+ const auto& rowset_meta = rowset->rowset_meta();
+ constexpr int64_t interval = 600; // 10 mins
+ // When BE restart and receive the `load_sync` rpc, it will sync all
historical rowsets first time.
+ // So we need to filter out the old rowsets avoid to download the
whole table.
+ if (!version_overlap &&
+ ::time(nullptr) - rowset_meta->newest_write_timestamp() >=
interval) {
+ continue;
+ }
+
+ auto storage_resource = rowset_meta->remote_storage_resource();
+ if (!storage_resource) {
+ LOG(WARNING) << storage_resource.error();
+ continue;
+ }
+
+ int64_t expiration_time =
+ _tablet_meta->ttl_seconds() == 0 ||
rowset_meta->newest_write_timestamp() <= 0
+ ? 0
+ : rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
+ g_file_cache_cloud_tablet_submitted_segment_num << 1;
+ if (rowset->rowset_meta()->segment_file_size(seg_id) > 0) {
+ g_file_cache_cloud_tablet_submitted_segment_size
+ << rowset->rowset_meta()->segment_file_size(seg_id);
+ }
+ auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
+ // clang-format off
+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
+ .path =
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+ .file_size = rowset->rowset_meta()->segment_file_size(seg_id),
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done {[self, rowset, delay_add_rowset](Status st) {
+ self->warm_up_done_cb(rowset, st, delay_add_rowset);
+ if (!st) {
+ LOG_WARNING("add rowset warm up error ").error(st);
+ }
+ }},
+ });
+ download_task_submitted = true;
+
+ auto download_idx_file = [&](const io::Path& idx_path, int64_t
idx_size) {
+ io::DownloadFileMeta meta {
+ .path = idx_path,
+ .file_size = idx_size,
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ // TODO: consider index file for warm up state management
+ .download_done {[](Status st) {
+ if (!st) {
+ LOG_WARNING("add rowset warm up error ").error(st);
+ }
+ }},
+ };
+
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
+ g_file_cache_cloud_tablet_submitted_index_num << 1;
+ g_file_cache_cloud_tablet_submitted_index_size << idx_size;
+ };
+ // clang-format on
+ auto schema_ptr = rowset_meta->tablet_schema();
+ auto idx_version = schema_ptr->get_inverted_index_storage_format();
+ if (idx_version == InvertedIndexStorageFormatPB::V1) {
+ std::unordered_map<int64_t, int64_t> index_size_map;
+ auto&& inverted_index_info =
rowset_meta->inverted_index_file_info(seg_id);
+ for (const auto& info : inverted_index_info.index_info()) {
+ if (info.index_file_size() != -1) {
+ index_size_map[info.index_id()] = info.index_file_size();
+ } else {
+ VLOG_DEBUG << "Invalid index_file_size for segment_id " <<
seg_id
+ << ", index_id " << info.index_id();
+ }
+ }
+ for (const auto& index : schema_ptr->inverted_indexes()) {
+ auto idx_path = storage_resource.value()->remote_idx_v1_path(
+ *rowset_meta, seg_id, index->index_id(),
index->get_index_suffix());
+ download_idx_file(idx_path, index_size_map[index->index_id()]);
+ }
+ } else {
+ if (schema_ptr->has_inverted_index()) {
+ auto&& inverted_index_info =
rowset_meta->inverted_index_file_info(seg_id);
+ int64_t idx_size = 0;
+ if (inverted_index_info.has_index_size()) {
+ idx_size = inverted_index_info.index_size();
+ } else {
+ VLOG_DEBUG << "index_size is not set for segment " <<
seg_id;
+ }
+ auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
+ download_idx_file(idx_path, idx_size);
+ }
+ }
+ }
+ if (download_task_submitted) {
Review Comment:
Is it always TRUE ?
##########
be/src/cloud/cloud_warm_up_manager.h:
##########
@@ -37,6 +37,13 @@ enum class DownloadType {
S3,
};
+enum class WarmUpState : int {
+ NONE,
+ TRIGGERED_BY_SYNC_ROWSET,
+ TRIGGERED_BY_JOB,
+ DONE,
Review Comment:
it should be clearer if we add another state: PARTIAL_FAILURE
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema()
const {
return _merged_tablet_schema;
}
+bool CloudTablet::split_rowsets_by_version_overlap(
+ const std::vector<RowsetSharedPtr>& input_rowsets,
+ std::vector<RowsetSharedPtr>* new_rowsets,
+ std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+ auto max_version = max_version_unlocked();
+ for (auto rs : input_rowsets) {
+ if (rs->version().first > max_version) {
+ new_rowsets->push_back(rs);
+ } else if (rs->version().second <= max_version) {
+ overlapping_rowsets->push_back(rs);
+ } else {
+ new_rowsets->clear();
+ overlapping_rowsets->clear();
+ return false;
+ }
+ }
+ return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+ std::shared_lock rlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset,
WarmUpState state) {
+ std::lock_guard wlock(_meta_lock);
+ return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset,
WarmUpState state) {
+ if (_rowset_warm_up_states.find(rowset->rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return false;
+ }
+ if (state == WarmUpState::TRIGGERED_BY_JOB) {
+ g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+ } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+ g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+ }
+ _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state,
rowset->num_segments());
+ return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id,
Status status) {
+ std::lock_guard wlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id <<
", " << status;
+ if (status.ok()) {
+ g_file_cache_warm_up_segment_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].second--;
+ if (_rowset_warm_up_states[rowset_id].second == 0) {
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+ }
+ // !status.ok()
+ g_file_cache_warm_up_segment_complete_num << 1;
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states.erase(rowset_id);
+ return WarmUpState::NONE;
+}
+
+void CloudTablet::warm_up_rowset_unlocked(RowsetSharedPtr rowset, bool
version_overlap,
+ bool delay_add_rowset) {
+ if (_rowset_warm_up_states.find(rowset->rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return;
+ }
+ if (delay_add_rowset) {
+ g_file_cache_query_driven_warmup_delayed_rowset_num << 1;
+ LOG(INFO) << "triggered a warm up for overlapping rowset " <<
rowset->version()
+ << ", will add it to tablet meta latter";
+ }
+ // warmup rowset data in background
+ bool download_task_submitted = false;
+ for (int seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
+ const auto& rowset_meta = rowset->rowset_meta();
+ constexpr int64_t interval = 600; // 10 mins
+ // When BE restart and receive the `load_sync` rpc, it will sync all
historical rowsets first time.
+ // So we need to filter out the old rowsets avoid to download the
whole table.
+ if (!version_overlap &&
+ ::time(nullptr) - rowset_meta->newest_write_timestamp() >=
interval) {
+ continue;
+ }
+
+ auto storage_resource = rowset_meta->remote_storage_resource();
+ if (!storage_resource) {
+ LOG(WARNING) << storage_resource.error();
+ continue;
+ }
+
+ int64_t expiration_time =
+ _tablet_meta->ttl_seconds() == 0 ||
rowset_meta->newest_write_timestamp() <= 0
+ ? 0
+ : rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
+ g_file_cache_cloud_tablet_submitted_segment_num << 1;
+ if (rowset->rowset_meta()->segment_file_size(seg_id) > 0) {
+ g_file_cache_cloud_tablet_submitted_segment_size
+ << rowset->rowset_meta()->segment_file_size(seg_id);
+ }
+ auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
+ // clang-format off
+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
+ .path =
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+ .file_size = rowset->rowset_meta()->segment_file_size(seg_id),
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done {[self, rowset, delay_add_rowset](Status st) {
+ self->warm_up_done_cb(rowset, st, delay_add_rowset);
+ if (!st) {
+ LOG_WARNING("add rowset warm up error ").error(st);
+ }
+ }},
+ });
+ download_task_submitted = true;
+
+ auto download_idx_file = [&](const io::Path& idx_path, int64_t
idx_size) {
+ io::DownloadFileMeta meta {
+ .path = idx_path,
+ .file_size = idx_size,
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ // TODO: consider index file for warm up state management
+ .download_done {[](Status st) {
+ if (!st) {
+ LOG_WARNING("add rowset warm up error ").error(st);
+ }
+ }},
+ };
+
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
+ g_file_cache_cloud_tablet_submitted_index_num << 1;
+ g_file_cache_cloud_tablet_submitted_index_size << idx_size;
+ };
+ // clang-format on
+ auto schema_ptr = rowset_meta->tablet_schema();
+ auto idx_version = schema_ptr->get_inverted_index_storage_format();
+ if (idx_version == InvertedIndexStorageFormatPB::V1) {
+ std::unordered_map<int64_t, int64_t> index_size_map;
+ auto&& inverted_index_info =
rowset_meta->inverted_index_file_info(seg_id);
+ for (const auto& info : inverted_index_info.index_info()) {
+ if (info.index_file_size() != -1) {
+ index_size_map[info.index_id()] = info.index_file_size();
+ } else {
+ VLOG_DEBUG << "Invalid index_file_size for segment_id " <<
seg_id
+ << ", index_id " << info.index_id();
+ }
+ }
+ for (const auto& index : schema_ptr->inverted_indexes()) {
+ auto idx_path = storage_resource.value()->remote_idx_v1_path(
+ *rowset_meta, seg_id, index->index_id(),
index->get_index_suffix());
+ download_idx_file(idx_path, index_size_map[index->index_id()]);
+ }
+ } else {
+ if (schema_ptr->has_inverted_index()) {
+ auto&& inverted_index_info =
rowset_meta->inverted_index_file_info(seg_id);
+ int64_t idx_size = 0;
+ if (inverted_index_info.has_index_size()) {
+ idx_size = inverted_index_info.index_size();
+ } else {
+ VLOG_DEBUG << "index_size is not set for segment " <<
seg_id;
+ }
+ auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
+ download_idx_file(idx_path, idx_size);
+ }
+ }
+ }
+ if (download_task_submitted) {
+ VLOG_DEBUG << "warm up rowset " << rowset->version() << " triggerd by
sync rowset";
+ add_rowset_warmup_state_unlocked(rowset->rowset_meta(),
+
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+ }
+}
+
+bool CloudTablet::is_warm_up_conflict_with_compaction() {
+ std::shared_lock rdlock(_meta_lock);
+ for (auto& [rowset_id, state] : _rowset_warm_up_states) {
+ if (state.first != WarmUpState::DONE) {
+ return true;
+ }
+ }
+ return false;
+}
+
+void CloudTablet::warm_up_done_cb(RowsetSharedPtr rowset, Status status, bool
delay_add_rowset) {
+ if (delay_add_rowset) {
+ DBUG_EXECUTE_IF("CloudTablet.warm_up_done_cb.inject_sleep_s", {
+ auto sleep_time = dp->param("sleep", 3);
+ LOG_INFO("CloudTablet.warm_up_done_cb.inject_sleep {} s",
sleep_time)
+ .tag("tablet_id", tablet_id());
+ std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
+ });
+ }
+ VLOG_DEBUG << "warm up rowset " << rowset->version() << " done";
+ auto res = complete_rowset_segment_warmup(rowset->rowset_id(), status);
+ if (res != WarmUpState::DONE && res != WarmUpState::NONE) {
+ if (res == WarmUpState::TRIGGERED_BY_JOB) {
+ LOG(WARNING) << "should not happen, rowset: " << rowset->version()
+ << " warm up is triggered by warm up job but use "
+ "CloudTablet::warm_up_done_cb as done callback";
+ }
+ // none success or failure
+ return;
+ }
+ if (config::enable_query_driven_warmup && delay_add_rowset) {
+ g_file_cache_query_driven_warmup_delayed_rowset_add_num << 1;
+ LOG(INFO) << "warm up completed, rowset: " << rowset->rowset_id()
+ << ", version: " << rowset->version();
+ std::unique_lock<std::shared_mutex> meta_lock(_meta_lock);
+ for (auto [ver, rs] : _rs_version_map) {
+ // e.g. ver = [5-10]
+ // if rowset->version() is [5-8], it should not be added
+ // if rowset->version() is [4-8] or [6-15], it should not be added
+ // if rowset->version() is [5-10], it should be added
+ // if rowset->version() is [5-15], it should be added
+ if (ver != rowset->version() && !rowset->version().contains(ver) &&
+ !(ver.second < rowset->version().first || ver.first >
rowset->version().second)) {
+
g_file_cache_query_driven_warmup_delayed_rowset_add_failure_num << 1;
+ delete_rowsets({rowset}, meta_lock);
+ LOG(WARNING)
+ << "rowset " << rowset->version()
+ << " is not added to _rs_version_map due to version
overlap with rowset "
+ << rs->rowset_id() << ", version: " << ver << ", add
it to stale rowsets";
+ return;
+ }
+ }
+ // no matter warmup success or failed, we should add rowset to tablet
meta
Review Comment:
a bvar is needed to show failure count, so we know who to blame if cache
miss.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]