This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 866c4fdd9bb [Opt](Variant) merge schema in sync_rowsets to prevents from CPU overhead each time describe table (#42856) 866c4fdd9bb is described below commit 866c4fdd9bb038d93dacac9f3ecca096c1d637ae Author: lihangyu <15605149...@163.com> AuthorDate: Fri Nov 1 14:52:02 2024 +0800 [Opt](Variant) merge schema in sync_rowsets to prevents from CPU overhead each time describe table (#42856) Should prevent from merge schema each time calling `merged_tablet_schema`. So this pr put the merge logic in `sync_rowsets` stage. --- be/src/cloud/cloud_tablet.cpp | 45 ++++++++++++++++++++++++++++--------- be/src/cloud/cloud_tablet.h | 6 +++++ be/src/service/internal_service.cpp | 5 ++++- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index d3b131d055d..54ea450f204 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -108,6 +108,36 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version, return capture_rs_readers_unlocked(version_path, rs_splits); } +Status CloudTablet::merge_rowsets_schema() { + // Find the rowset with the max version + auto max_version_rowset = + std::max_element( + _rs_version_map.begin(), _rs_version_map.end(), + [](const auto& a, const auto& b) { + return !a.second->tablet_schema() + ? true + : (!b.second->tablet_schema() + ? false + : a.second->tablet_schema()->schema_version() < + b.second->tablet_schema() + ->schema_version()); + }) + ->second; + TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema(); + // If the schema has variant columns, perform a merge to create a wide tablet schema + if (max_version_schema->num_variant_columns() > 0) { + std::vector<TabletSchemaSPtr> schemas; + std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas), + [](const auto& rs_meta) { return rs_meta.second->tablet_schema(); }); + // Merge the collected schemas to obtain the least common schema + RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr, + max_version_schema)); + VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema(); + _merged_tablet_schema = max_version_schema; + } + return Status::OK(); +} + // There are only two tablet_states RUNNING and NOT_READY in cloud mode // This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS. Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) { @@ -133,6 +163,10 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) if (st.is<ErrorCode::NOT_FOUND>()) { clear_cache(); } + + // Merge all rowset schemas within a CloudTablet + RETURN_IF_ERROR(merge_rowsets_schema()); + return st; } @@ -188,16 +222,7 @@ Status CloudTablet::sync_if_not_running() { } TabletSchemaSPtr CloudTablet::merged_tablet_schema() const { - std::shared_lock rdlock(_meta_lock); - TabletSchemaSPtr target_schema; - std::vector<TabletSchemaSPtr> schemas; - for (const auto& [_, rowset] : _rs_version_map) { - schemas.push_back(rowset->tablet_schema()); - } - // get the max version schema and merge all schema - static_cast<void>( - vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema)); - return target_schema; + return _merged_tablet_schema; } void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap, diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 53747dc19e2..a79d25f7540 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -208,6 +208,9 @@ private: Status sync_if_not_running(); + // Merge all rowset schemas within a CloudTablet + Status merge_rowsets_schema(); + CloudStorageEngine& _engine; // this mutex MUST ONLY be used when sync meta @@ -246,6 +249,9 @@ private: std::mutex _base_compaction_lock; std::mutex _cumulative_compaction_lock; mutable std::mutex _rowset_update_lock; + + // Schema will be merged from all rowsets when sync_rowsets + TabletSchemaSPtr _merged_tablet_schema; }; using CloudTabletSPtr = std::shared_ptr<CloudTablet>; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 8217bd11bb9..c23cc057584 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1159,7 +1159,10 @@ void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id; continue; } - tablet_schemas.push_back(res.value()->merged_tablet_schema()); + auto schema = res.value()->merged_tablet_schema(); + if (schema != nullptr) { + tablet_schemas.push_back(schema); + } } if (!tablet_schemas.empty()) { // merge all --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org