This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8fc26d510c3 branch-3.0: [opt](compaction) Don't check missed rows in cumu compaction if input rowsets are not in tablet #45279 (#45303) 8fc26d510c3 is described below commit 8fc26d510c3f80114933db623143fdd767848b6f Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Dec 20 00:37:24 2024 +0800 branch-3.0: [opt](compaction) Don't check missed rows in cumu compaction if input rowsets are not in tablet #45279 (#45303) Cherry-picked from #45279 Co-authored-by: bobhan1 <bao...@selectdb.com> --- be/src/olap/compaction.cpp | 25 +++- be/src/olap/cumulative_compaction.cpp | 14 ++ be/src/olap/cumulative_compaction_policy.cpp | 16 +++ be/src/olap/schema_change.cpp | 2 + be/src/olap/tablet.cpp | 9 ++ be/src/olap/tablet.h | 1 + .../test_compaction_on_sc_new_tablet.out | 25 ++++ .../test_compaction_on_sc_new_tablet.groovy | 149 +++++++++++++++++++++ 8 files changed, 240 insertions(+), 1 deletion(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 34e52ccb5b0..5f490e99034 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1014,8 +1014,31 @@ Status CompactionMixin::modify_rowsets() { if (!_tablet->tablet_meta()->tablet_schema()->cluster_key_idxes().empty()) { merged_missed_rows_size += _stats.filtered_rows; } + + // Suppose a heavy schema change process on BE converting tablet A to tablet B. + // 1. during schema change double write, new loads write [X-Y] on tablet B. + // 2. rowsets with version [a],[a+1],...,[b-1],[b] on tablet B are picked for cumu compaction(X<=a<b<=Y).(cumu compaction + // on new tablet during schema change double write is allowed after https://github.com/apache/doris/pull/16470) + // 3. schema change remove all rowsets on tablet B before version Z(b<=Z<=Y) before it begins to convert historical rowsets. + // 4. schema change finishes. + // 5. cumu compation begins on new tablet with version [a],...,[b]. If there are duplicate keys between these rowsets, + // the compaction check will fail because these rowsets have skipped to calculate delete bitmap in commit phase and + // publish phase because tablet B is in NOT_READY state when writing. + + // Considering that the cumu compaction will fail finally in this situation because `Tablet::modify_rowsets` will check if rowsets in + // `to_delete`(_input_rowsets) still exist in tablet's `_rs_version_map`, we can just skip to check missed rows here. + bool need_to_check_missed_rows = true; + { + std::shared_lock rlock(_tablet->get_header_lock()); + need_to_check_missed_rows = + std::all_of(_input_rowsets.begin(), _input_rowsets.end(), + [&](const RowsetSharedPtr& rowset) { + return tablet()->rowset_exists_unlocked(rowset); + }); + } + if (_tablet->tablet_state() == TABLET_RUNNING && - merged_missed_rows_size != missed_rows_size) { + merged_missed_rows_size != missed_rows_size && need_to_check_missed_rows) { std::stringstream ss; ss << "cumulative compaction: the merged rows(" << _stats.merged_rows << "), filtered rows(" << _stats.filtered_rows diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index b961c694ede..fe9e5204f4c 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -100,6 +100,20 @@ Status CumulativeCompaction::prepare_compact() { } Status CumulativeCompaction::execute_compact() { + DBUG_EXECUTE_IF("CumulativeCompaction::execute_compact.block", { + auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); + if (target_tablet_id == _tablet->tablet_id()) { + LOG(INFO) << "start debug block " + << "CumulativeCompaction::execute_compact.block"; + while (DebugPoints::instance()->is_enable( + "CumulativeCompaction::execute_compact.block")) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + LOG(INFO) << "end debug block " + << "CumulativeCompaction::execute_compact.block"; + } + }) + std::unique_lock<std::mutex> lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { return Status::Error<TRY_LOCK_FAILED, false>( diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index ee7a2b1812a..c812a12b656 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -28,6 +28,7 @@ #include "olap/olap_common.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" +#include "util/debug_points.h" namespace doris { @@ -246,6 +247,21 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, size_t* compaction_score, bool allow_delete) { + DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", { + auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); + if (target_tablet_id == tablet->tablet_id()) { + auto start_version = dp->param<int64_t>("start_version", -1); + auto end_version = dp->param<int64_t>("end_version", -1); + for (auto& rowset : candidate_rowsets) { + if (rowset->start_version() >= start_version && + rowset->end_version() <= end_version) { + input_rowsets->push_back(rowset); + } + } + } + return input_rowsets->size(); + }) + size_t promotion_size = tablet->cumulative_promotion_size(); auto max_version = tablet->max_version().first; int transient_size = 0; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 46191443506..ae449532182 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -838,6 +838,8 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques return_columns[i] = i; } + DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block", DBUG_BLOCK); + // begin to find deltas to convert from base tablet to new tablet so that // obtain base tablet and new tablet's push lock and header write lock to prevent loading data { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 5cf09398972..d1d48b91ff3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -512,6 +512,15 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) { return Status::OK(); } +bool Tablet::rowset_exists_unlocked(const RowsetSharedPtr& rowset) { + if (auto it = _rs_version_map.find(rowset->version()); it == _rs_version_map.end()) { + return false; + } else if (rowset->rowset_id() != it->second->rowset_id()) { + return false; + } + return true; +} + Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add, std::vector<RowsetSharedPtr>& to_delete, bool check_delete) { // the compaction process allow to compact the single version, eg: version[4-4]. diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index a38b5806174..ca774a618fc 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -173,6 +173,7 @@ public: // MUST hold EXCLUSIVE `_meta_lock`. Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add, std::vector<RowsetSharedPtr>& to_delete, bool check_delete = false); + bool rowset_exists_unlocked(const RowsetSharedPtr& rowset); Status add_inc_rowset(const RowsetSharedPtr& rowset); /// Delete stale rowset by timing. This delete policy uses now() minutes diff --git a/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out new file mode 100644 index 00000000000..e7188943a10 --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +6 6 6 6 +7 7 7 7 +8 8 8 8 +9 9 9 9 +10 10 10 10 + +-- !sql -- +1 9 9 9 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +6 6 6 6 +7 7 7 7 +8 8 8 8 +9 9 9 9 +10 10 10 10 + diff --git a/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy new file mode 100644 index 00000000000..2f3c44ef2dd --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_compaction_on_sc_new_tablet", "docker") { + def options = new ClusterOptions() + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.cloudMode = false + options.beConfigs += [ + 'enable_java_support=false', + 'enable_mow_compaction_correctness_check_core=true' + ] + docker(options) { + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + def table1 = "test_compaction_on_sc_new_tablet" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k` int, + `c1` int, + `c2` int, + `c3` int + ) UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true"); """ + + // [2-11] + for (int i = 1; i <= 10; i++) { + sql "insert into ${table1} values($i,$i,$i,$i);" + } + qt_sql "select * from ${table1} order by k;" + + + def beNodes = sql_return_maparray("show backends;") + def tabletStats = sql_return_maparray("show tablets from ${table1};") + logger.info("tabletStats: \n${tabletStats}") + def tabletStat = tabletStats.get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def version = tabletStat.Version + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} is on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}, version=${version}"); + + // blocking the schema change process before it gains max version + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block") + Thread.sleep(2000) + + sql "alter table ${table1} modify column c1 varchar(100);" + + Thread.sleep(4000) + + // double write [11-22] + for (int i = 20; i <= 30; i++) { + sql "insert into ${table1} values(1,9,9,9);" + } + + tabletStats = sql_return_maparray("show tablets from ${table1};") + logger.info("tabletStats: \n${tabletStats}") + assertEquals(2, tabletStats.size()) + + def oldTabletStat + def newTabletStat + for (def stat: tabletStats) { + if (!stat.TabletId.equals(tabletId)) { + newTabletStat = stat + } else { + oldTabletStat = stat + } + } + logger.info("old tablet=[tablet_id=${oldTabletStat.TabletId}, version=${oldTabletStat.Version}]") + logger.info("new tablet=[tablet_id=${newTabletStat.TabletId}, version=${newTabletStat.Version}]") + + + // trigger cumu compaction on new tablet + int start_version = 15 + int end_version = 17 + // block compaction process on new tablet + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block", [tablet_id: "${newTabletStat.TabletId}"]) + // manully set cumu compaction's input rowsets on new tablet + GetDebugPoint().enableDebugPointForAllBEs("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id:"${newTabletStat.TabletId}", start_version:"${start_version}", end_version:"${end_version}"]) + + Thread.sleep(2000) + + logger.info("trigger compaction [15-17] on new tablet ${newTabletStat.TabletId}") + def (code, out, err) = be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, newTabletStat.TabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // make the schema change run to complete and wait for it + GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block") + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 2000 + } + + Thread.sleep(2000) + + // make the cumu compaction run to complete and wait for it + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block") + + + // BE should skip to check merged rows in cumu compaction, otherwise it will cause coredump + // becasue [11-22] in new tablet will skip to calc delete bitmap becase tablet is in NOT_READY state + Thread.sleep(7000) + + qt_sql "select * from ${table1} order by k;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org