This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8fc26d510c3 branch-3.0: [opt](compaction) Don't check missed rows in 
cumu compaction if input rowsets are not in tablet #45279 (#45303)
8fc26d510c3 is described below

commit 8fc26d510c3f80114933db623143fdd767848b6f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 20 00:37:24 2024 +0800

    branch-3.0: [opt](compaction) Don't check missed rows in cumu compaction if 
input rowsets are not in tablet #45279 (#45303)
    
    Cherry-picked from #45279
    
    Co-authored-by: bobhan1 <bao...@selectdb.com>
---
 be/src/olap/compaction.cpp                         |  25 +++-
 be/src/olap/cumulative_compaction.cpp              |  14 ++
 be/src/olap/cumulative_compaction_policy.cpp       |  16 +++
 be/src/olap/schema_change.cpp                      |   2 +
 be/src/olap/tablet.cpp                             |   9 ++
 be/src/olap/tablet.h                               |   1 +
 .../test_compaction_on_sc_new_tablet.out           |  25 ++++
 .../test_compaction_on_sc_new_tablet.groovy        | 149 +++++++++++++++++++++
 8 files changed, 240 insertions(+), 1 deletion(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 34e52ccb5b0..5f490e99034 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1014,8 +1014,31 @@ Status CompactionMixin::modify_rowsets() {
             if 
(!_tablet->tablet_meta()->tablet_schema()->cluster_key_idxes().empty()) {
                 merged_missed_rows_size += _stats.filtered_rows;
             }
+
+            // Suppose a heavy schema change process on BE converting tablet A 
to tablet B.
+            // 1. during schema change double write, new loads write [X-Y] on 
tablet B.
+            // 2. rowsets with version [a],[a+1],...,[b-1],[b] on tablet B are 
picked for cumu compaction(X<=a<b<=Y).(cumu compaction
+            //    on new tablet during schema change double write is allowed 
after https://github.com/apache/doris/pull/16470)
+            // 3. schema change remove all rowsets on tablet B before version 
Z(b<=Z<=Y) before it begins to convert historical rowsets.
+            // 4. schema change finishes.
+            // 5. cumu compation begins on new tablet with version 
[a],...,[b]. If there are duplicate keys between these rowsets,
+            //    the compaction check will fail because these rowsets have 
skipped to calculate delete bitmap in commit phase and
+            //    publish phase because tablet B is in NOT_READY state when 
writing.
+
+            // Considering that the cumu compaction will fail finally in this 
situation because `Tablet::modify_rowsets` will check if rowsets in
+            // `to_delete`(_input_rowsets) still exist in tablet's 
`_rs_version_map`, we can just skip to check missed rows here.
+            bool need_to_check_missed_rows = true;
+            {
+                std::shared_lock rlock(_tablet->get_header_lock());
+                need_to_check_missed_rows =
+                        std::all_of(_input_rowsets.begin(), 
_input_rowsets.end(),
+                                    [&](const RowsetSharedPtr& rowset) {
+                                        return 
tablet()->rowset_exists_unlocked(rowset);
+                                    });
+            }
+
             if (_tablet->tablet_state() == TABLET_RUNNING &&
-                merged_missed_rows_size != missed_rows_size) {
+                merged_missed_rows_size != missed_rows_size && 
need_to_check_missed_rows) {
                 std::stringstream ss;
                 ss << "cumulative compaction: the merged rows(" << 
_stats.merged_rows
                    << "), filtered rows(" << _stats.filtered_rows
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index b961c694ede..fe9e5204f4c 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -100,6 +100,20 @@ Status CumulativeCompaction::prepare_compact() {
 }
 
 Status CumulativeCompaction::execute_compact() {
+    DBUG_EXECUTE_IF("CumulativeCompaction::execute_compact.block", {
+        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+        if (target_tablet_id == _tablet->tablet_id()) {
+            LOG(INFO) << "start debug block "
+                      << "CumulativeCompaction::execute_compact.block";
+            while (DebugPoints::instance()->is_enable(
+                    "CumulativeCompaction::execute_compact.block")) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(200));
+            }
+            LOG(INFO) << "end debug block "
+                      << "CumulativeCompaction::execute_compact.block";
+        }
+    })
+
     std::unique_lock<std::mutex> 
lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock);
     if (!lock.owns_lock()) {
         return Status::Error<TRY_LOCK_FAILED, false>(
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index ee7a2b1812a..c812a12b656 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -28,6 +28,7 @@
 #include "olap/olap_common.h"
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
+#include "util/debug_points.h"
 
 namespace doris {
 
@@ -246,6 +247,21 @@ int 
SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
         const int64_t max_compaction_score, const int64_t min_compaction_score,
         std::vector<RowsetSharedPtr>* input_rowsets, Version* 
last_delete_version,
         size_t* compaction_score, bool allow_delete) {
+    
DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
 {
+        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+        if (target_tablet_id == tablet->tablet_id()) {
+            auto start_version = dp->param<int64_t>("start_version", -1);
+            auto end_version = dp->param<int64_t>("end_version", -1);
+            for (auto& rowset : candidate_rowsets) {
+                if (rowset->start_version() >= start_version &&
+                    rowset->end_version() <= end_version) {
+                    input_rowsets->push_back(rowset);
+                }
+            }
+        }
+        return input_rowsets->size();
+    })
+
     size_t promotion_size = tablet->cumulative_promotion_size();
     auto max_version = tablet->max_version().first;
     int transient_size = 0;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 46191443506..ae449532182 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -838,6 +838,8 @@ Status SchemaChangeJob::_do_process_alter_tablet(const 
TAlterTabletReqV2& reques
         return_columns[i] = i;
     }
 
+    DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block", 
DBUG_BLOCK);
+
     // begin to find deltas to convert from base tablet to new tablet so that
     // obtain base tablet and new tablet's push lock and header write lock to 
prevent loading data
     {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5cf09398972..d1d48b91ff3 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -512,6 +512,15 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
     return Status::OK();
 }
 
+bool Tablet::rowset_exists_unlocked(const RowsetSharedPtr& rowset) {
+    if (auto it = _rs_version_map.find(rowset->version()); it == 
_rs_version_map.end()) {
+        return false;
+    } else if (rowset->rowset_id() != it->second->rowset_id()) {
+        return false;
+    }
+    return true;
+}
+
 Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
                               std::vector<RowsetSharedPtr>& to_delete, bool 
check_delete) {
     // the compaction process allow to compact the single version, eg: 
version[4-4].
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index a38b5806174..ca774a618fc 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -173,6 +173,7 @@ public:
     // MUST hold EXCLUSIVE `_meta_lock`.
     Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
                           std::vector<RowsetSharedPtr>& to_delete, bool 
check_delete = false);
+    bool rowset_exists_unlocked(const RowsetSharedPtr& rowset);
 
     Status add_inc_rowset(const RowsetSharedPtr& rowset);
     /// Delete stale rowset by timing. This delete policy uses now() minutes
diff --git 
a/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out 
b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out
new file mode 100644
index 00000000000..e7188943a10
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1       1
+2      2       2       2
+3      3       3       3
+4      4       4       4
+5      5       5       5
+6      6       6       6
+7      7       7       7
+8      8       8       8
+9      9       9       9
+10     10      10      10
+
+-- !sql --
+1      9       9       9
+2      2       2       2
+3      3       3       3
+4      4       4       4
+5      5       5       5
+6      6       6       6
+7      7       7       7
+8      8       8       8
+9      9       9       9
+10     10      10      10
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
 
b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
new file mode 100644
index 00000000000..2f3c44ef2dd
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_compaction_on_sc_new_tablet", "docker") {
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.enableDebugPoints()
+    options.cloudMode = false
+    options.beConfigs += [
+        'enable_java_support=false',
+        'enable_mow_compaction_correctness_check_core=true'
+    ]
+    docker(options) {
+        try {
+            GetDebugPoint().clearDebugPointsForAllFEs()
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            def table1 = "test_compaction_on_sc_new_tablet"
+            sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+            sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                    `k`  int,
+                    `c1` int,
+                    `c2` int,
+                    `c3` int
+                    ) UNIQUE KEY(k)
+                DISTRIBUTED BY HASH(k) BUCKETS 1
+                PROPERTIES (
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1",
+                "enable_unique_key_merge_on_write" = "true"); """
+
+            // [2-11]
+            for (int i = 1; i <= 10; i++) {
+                sql "insert into ${table1} values($i,$i,$i,$i);"
+            }
+            qt_sql "select * from ${table1} order by k;"
+
+
+            def beNodes = sql_return_maparray("show backends;")
+            def tabletStats = sql_return_maparray("show tablets from 
${table1};")
+            logger.info("tabletStats: \n${tabletStats}")
+            def tabletStat = tabletStats.get(0)
+            def tabletBackendId = tabletStat.BackendId
+            def tabletId = tabletStat.TabletId
+            def version = tabletStat.Version
+            def tabletBackend;
+            for (def be : beNodes) {
+                if (be.BackendId == tabletBackendId) {
+                    tabletBackend = be
+                    break;
+                }
+            }
+            logger.info("tablet ${tabletId} is on backend 
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}, 
version=${version}");
+
+            // blocking the schema change process before it gains max version
+            
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+            Thread.sleep(2000)
+
+            sql "alter table ${table1} modify column c1 varchar(100);"
+
+            Thread.sleep(4000)
+
+            // double write [11-22]
+            for (int i = 20; i <= 30; i++) {
+                sql "insert into ${table1} values(1,9,9,9);"
+            }
+
+            tabletStats = sql_return_maparray("show tablets from ${table1};")
+            logger.info("tabletStats: \n${tabletStats}")
+            assertEquals(2, tabletStats.size())
+
+            def oldTabletStat
+            def newTabletStat
+            for (def stat: tabletStats) {
+                if (!stat.TabletId.equals(tabletId)) {
+                    newTabletStat = stat
+                } else {
+                    oldTabletStat = stat
+                }
+            }
+            logger.info("old tablet=[tablet_id=${oldTabletStat.TabletId}, 
version=${oldTabletStat.Version}]")
+            logger.info("new tablet=[tablet_id=${newTabletStat.TabletId}, 
version=${newTabletStat.Version}]")
+
+            
+            // trigger cumu compaction on new tablet
+            int start_version = 15
+            int end_version = 17
+            // block compaction process on new tablet
+            
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block",
 [tablet_id: "${newTabletStat.TabletId}"])
+            // manully set cumu compaction's input rowsets on new tablet
+            
GetDebugPoint().enableDebugPointForAllBEs("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+                    [tablet_id:"${newTabletStat.TabletId}", 
start_version:"${start_version}", end_version:"${end_version}"])
+
+            Thread.sleep(2000)
+
+            logger.info("trigger compaction [15-17] on new tablet 
${newTabletStat.TabletId}")
+            def (code, out, err) = 
be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, 
newTabletStat.TabletId)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            Assert.assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            Assert.assertEquals("success", compactJson.status.toLowerCase())
+
+            // make the schema change run to complete and wait for it
+            
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+            waitForSchemaChangeDone {
+                sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' 
ORDER BY createtime DESC LIMIT 1 """
+                time 2000
+            }
+
+            Thread.sleep(2000)
+
+            // make the cumu compaction run to complete and wait for it
+            
GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block")
+
+
+            // BE should skip to check merged rows in cumu compaction, 
otherwise it will cause coredump
+            // becasue [11-22] in new tablet will skip to calc delete bitmap 
becase tablet is in NOT_READY state
+            Thread.sleep(7000)
+
+            qt_sql "select * from ${table1} order by k;"
+
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllFEs()
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to