This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9d8c32d5af4 [Improve](Variant) only `merge_schema` when `sync_tablets` or scan in… (#48570) 9d8c32d5af4 is described below commit 9d8c32d5af451bfcf6c4bef443fcf5e8bb62e80f Author: lihangyu <lihan...@selectdb.com> AuthorDate: Thu Mar 27 15:56:16 2025 +0800 [Improve](Variant) only `merge_schema` when `sync_tablets` or scan in… (#48570) … cloud mode 1. refactor some options 2. set merge_schema only when `sync_tablets` or scan This will reduce cost of `merge_schema` typically in MOW model with variant type with large number of subcolumns --- be/src/cloud/cloud_backend_service.cpp | 4 +++- be/src/cloud/cloud_meta_mgr.cpp | 17 ++++++++++------- be/src/cloud/cloud_meta_mgr.h | 4 ++-- be/src/cloud/cloud_schema_change_job.cpp | 4 +++- be/src/cloud/cloud_tablet.cpp | 12 ++++++------ be/src/cloud/cloud_tablet.h | 10 +++++++++- be/src/cloud/cloud_tablet_mgr.cpp | 12 ++++++++---- be/src/http/action/calc_file_crc_action.cpp | 2 +- be/src/http/action/delete_bitmap_action.cpp | 6 +++++- be/src/pipeline/exec/olap_scan_operator.cpp | 5 ++++- be/src/service/point_query_executor.cpp | 4 +++- 11 files changed, 54 insertions(+), 26 deletions(-) diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 265e6c44aac..63f76632d79 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -68,7 +68,9 @@ void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&, if (!result.has_value()) { return; } - Status st = result.value()->sync_rowsets(-1, true); + SyncOptions options; + options.warmup_delta_data = true; + Status st = result.value()->sync_rowsets(options); if (!st.ok()) { LOG_WARNING("failed to sync load for tablet").error(st); } diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index d6ac44e5d08..2ab18b9ad9d 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -481,8 +481,7 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab return Status::OK(); } -Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data, - bool sync_delete_bitmap, bool full_sync) { +Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options) { using namespace std::chrono; TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); @@ -509,7 +508,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ idx->set_partition_id(tablet->partition_id()); { std::shared_lock rlock(tablet->get_header_lock()); - if (full_sync) { + if (options.full_sync) { req.set_start_version(0); } else { req.set_start_version(tablet->max_version_unlocked() + 1); @@ -569,12 +568,13 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ // If is mow, the tablet has no delete bitmap in base rowsets. // So dont need to sync it. - if (sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() && + if (options.sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() && tablet->tablet_state() == TABLET_RUNNING) { DeleteBitmap delete_bitmap(tablet_id); int64_t old_max_version = req.start_version() - 1; auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), - resp.stats(), req.idx(), &delete_bitmap, full_sync); + resp.stats(), req.idx(), &delete_bitmap, + options.full_sync); if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) { LOG_WARNING("rowset meta is expired, need to retry") .tag("tablet", tablet->tablet_id()) @@ -679,8 +679,11 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ // after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12]. bool version_overlap = tablet->max_version_unlocked() >= rowsets.front()->start_version(); - tablet->add_rowsets(std::move(rowsets), version_overlap, wlock, warmup_delta_data); - RETURN_IF_ERROR(tablet->merge_rowsets_schema()); + tablet->add_rowsets(std::move(rowsets), version_overlap, wlock, + options.warmup_delta_data); + if (options.merge_schema) { + RETURN_IF_ERROR(tablet->merge_rowsets_schema()); + } } tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms(); tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms(); diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index da82e025da2..1dd09de3705 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -24,6 +24,7 @@ #include <variant> #include <vector> +#include "cloud/cloud_tablet.h" #include "common/status.h" #include "olap/rowset/rowset_meta.h" #include "util/s3_util.h" @@ -62,8 +63,7 @@ public: Status get_schema_dict(int64_t index_id, std::shared_ptr<SchemaCloudDictionary>* schema_dict); - Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false, - bool sync_delete_bitmap = true, bool full_sync = false); + Status sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options = {}); Status prepare_rowset(const RowsetMeta& rs_meta, std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 5571ee166ba..514abed08d4 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -90,7 +90,9 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque request.base_tablet_id); } // MUST sync rowsets before capturing rowset readers and building DeleteHandler - RETURN_IF_ERROR(_base_tablet->sync_rowsets(request.alter_version)); + SyncOptions options; + options.query_version = request.alter_version; + RETURN_IF_ERROR(_base_tablet->sync_rowsets(options)); // ATTN: Only convert rowsets of version larger than 1, MUST let the new tablet cache have rowset [0-1] _output_cumulative_point = _base_tablet->cumulative_layer_point(); std::vector<RowSetSplits> rs_splits; diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index f3203afd76d..692bf0a84c5 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -144,26 +144,26 @@ Status CloudTablet::merge_rowsets_schema() { // There are only two tablet_states RUNNING and NOT_READY in cloud mode // This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS. -Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) { +Status CloudTablet::sync_rowsets(const SyncOptions& options) { RETURN_IF_ERROR(sync_if_not_running()); - if (query_version > 0) { + if (options.query_version > 0) { std::shared_lock rlock(_meta_lock); - if (_max_version >= query_version) { + if (_max_version >= options.query_version) { return Status::OK(); } } // serially execute sync to reduce unnecessary network overhead std::lock_guard lock(_sync_meta_lock); - if (query_version > 0) { + if (options.query_version > 0) { std::shared_lock rlock(_meta_lock); - if (_max_version >= query_version) { + if (_max_version >= options.query_version) { return Status::OK(); } } - auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data); + auto st = _engine.meta_mgr().sync_tablet_rowsets(this, options); if (st.is<ErrorCode::NOT_FOUND>()) { clear_cache(); } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 84664bb37da..03ba47b27a9 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -26,6 +26,14 @@ namespace doris { class CloudStorageEngine; +struct SyncOptions { + bool warmup_delta_data = false; + bool sync_delete_bitmap = true; + bool full_sync = false; + bool merge_schema = false; + int64_t query_version = -1; +}; + class CloudTablet final : public BaseTablet { public: CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta); @@ -68,7 +76,7 @@ public: // If `query_version` > 0 and local max_version of the tablet >= `query_version`, do nothing. // If 'need_download_data_async' is true, it means that we need to download the new version // rowsets datum async. - Status sync_rowsets(int64_t query_version = -1, bool warmup_delta_data = false); + Status sync_rowsets(const SyncOptions& options = {}); // Synchronize the tablet meta from meta service. Status sync_meta(); diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 04a1c33d5c3..9744626af6f 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -180,8 +180,10 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta)); auto value = std::make_unique<Value>(tablet, *_tablet_map); // MUST sync stats to let compaction scheduler work correctly - st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data, - sync_delete_bitmap); + SyncOptions options; + options.warmup_delta_data = warmup_data; + options.sync_delete_bitmap = sync_delete_bitmap; + st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), options); if (!st.ok()) { LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st; return nullptr; @@ -289,8 +291,10 @@ void CloudTabletMgr::sync_tablets(const CountDownLatch& stop_latch) { continue; } } - - st = tablet->sync_rowsets(-1); + SyncOptions options; + options.query_version = -1; + options.merge_schema = true; + st = tablet->sync_rowsets(options); if (!st) { LOG_WARNING("failed to sync tablet rowsets {}", tablet->tablet_id()).error(st); } diff --git a/be/src/http/action/calc_file_crc_action.cpp b/be/src/http/action/calc_file_crc_action.cpp index 123f55dd7fd..64433a4aa54 100644 --- a/be/src/http/action/calc_file_crc_action.cpp +++ b/be/src/http/action/calc_file_crc_action.cpp @@ -64,7 +64,7 @@ Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req, uint32_t* crc_value if (auto cloudEngine = dynamic_cast<CloudStorageEngine*>(&_engine)) { tablet = DORIS_TRY(cloudEngine->get_tablet(tablet_id)); // sync all rowsets - RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(-1)); + RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets()); } else if (auto storageEngine = dynamic_cast<StorageEngine*>(&_engine)) { auto tabletPtr = storageEngine->tablet_manager()->get_tablet(tablet_id); tablet = std::dynamic_pointer_cast<Tablet>(tabletPtr); diff --git a/be/src/http/action/delete_bitmap_action.cpp b/be/src/http/action/delete_bitmap_action.cpp index 2fa0a73c2f3..b5738475710 100644 --- a/be/src/http/action/delete_bitmap_action.cpp +++ b/be/src/http/action/delete_bitmap_action.cpp @@ -135,7 +135,11 @@ Status DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req, return st; } auto tablet = std::make_shared<CloudTablet>(_engine.to_cloud(), std::move(tablet_meta)); - st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true); + SyncOptions options; + options.warmup_delta_data = false; + options.sync_delete_bitmap = true; + options.full_sync = true; + st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(), options); if (!st.ok()) { LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st; return st; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index a81cd2df436..b93c22274d9 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -445,8 +445,11 @@ Status OlapScanLocalState::hold_tablets() { tasks.reserve(_scan_ranges.size()); for (auto&& [cur_tablet, cur_version] : _tablets) { tasks.emplace_back([cur_tablet, cur_version]() { + SyncOptions options; + options.query_version = cur_version; + options.merge_schema = true; return std::dynamic_pointer_cast<CloudTablet>(cur_tablet) - ->sync_rowsets(cur_version); + ->sync_rowsets(options); }); } RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10)); diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index dc50ef9937f..12975fbf9fe 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -404,7 +404,9 @@ Status PointQueryExecutor::_lookup_row_key() { Status st; if (_version >= 0) { CHECK(config::is_cloud_mode()) << "Only cloud mode support snapshot read at present"; - RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(_version)); + SyncOptions options; + options.query_version = _version; + RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(options)); } std::vector<RowsetSharedPtr> specified_rowsets; { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org