This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0ea1879e62c branch-3.0: [Fix](cloud-mow) Full compaction should only 
update delete bitmap of its output rowset #50974 (#50985)
0ea1879e62c is described below

commit 0ea1879e62cd274fd34388606ecb597885a429ef
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 17 10:47:58 2025 +0800

    branch-3.0: [Fix](cloud-mow) Full compaction should only update delete 
bitmap of its output rowset #50974 (#50985)
    
    Cherry-picked from #50974
    
    Co-authored-by: bobhan1 <bao...@selectdb.com>
---
 be/src/cloud/cloud_full_compaction.cpp             |   3 +-
 .../test_cloud_full_compaction_multi_segments.out  | Bin 0 -> 168 bytes
 ...est_cloud_full_compaction_multi_segments.groovy | 181 +++++++++++++++++++++
 3 files changed, 183 insertions(+), 1 deletion(-)

diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index 1d86210225f..d2c3176bed0 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -377,7 +377,8 @@ Status 
CloudFullCompaction::_cloud_full_compaction_calc_delete_bitmap(
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
     for (const auto& [k, v] : tmp_delete_bitmap->delete_bitmap) {
-        if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
+        if (std::get<0>(k) == _output_rowset->rowset_id() &&
+            std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
             delete_bitmap->merge({std::get<0>(k), std::get<1>(k), 
cur_version}, v);
         }
     }
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_multi_segments.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_multi_segments.out
new file mode 100644
index 00000000000..72d283c5d8c
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_multi_segments.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_multi_segments.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_multi_segments.groovy
new file mode 100644
index 00000000000..1a49be36de7
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_multi_segments.groovy
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite("test_cloud_full_compaction_multi_segments","multi_cluster,docker") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'calculate_delete_bitmap_task_timeout_seconds=10',
+        'mow_calculate_delete_bitmap_retry_times=10',
+        'enable_workload_group=false',
+    ]
+    options.beConfigs += [
+        'enable_debug_points=true',
+        'doris_scanner_row_bytes=1' // to cause multi segments
+    ]
+    docker(options) {
+        def tableName = "test_cloud_full_compaction_multi_segments"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k` int ,
+                `v` int ,
+                `c` int
+            ) engine=olap
+            UNIQUE KEY(k)
+            DISTRIBUTED BY HASH(`k`) BUCKETS 1
+            properties(
+                "replication_num" = "1",
+                "enable_unique_key_merge_on_write" = "true",
+                "disable_auto_compaction" = "true",
+                "function_column.sequence_col" = 'c');"""
+
+        sql """ INSERT INTO ${tableName} VALUES (-2,-2,100)"""
+        sql """ INSERT INTO ${tableName} VALUES (-1,-1,100)"""
+        qt_sql """ SELECT * FROM ${tableName} ORDER BY k; """
+
+        def tabletStats = sql_return_maparray("show tablets from 
${tableName};")
+        def tabletId = tabletStats[0].TabletId
+        def tabletBackendId = tabletStats[0].BackendId
+        def tabletBackend
+        def backends = sql_return_maparray('show backends')
+        for (def be : backends) {
+            if (be.BackendId == tabletBackendId) {
+                tabletBackend = be
+                break;
+            }
+        }
+        logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}");
+
+        def checkSegmentNum = { rowsetNum, lastRowsetSegmentNum ->
+            def tablets = sql_return_maparray """ show tablets from 
${tableName}; """
+            logger.info("tablets: ${tablets}")
+            String compactionUrl = tablets[0]["CompactionStatus"]
+            def (code, out, err) = curl("GET", compactionUrl)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            assert tabletJson.rowsets.size() == rowsetNum + 1
+            def rowset = tabletJson.rowsets.get(tabletJson.rowsets.size() - 1)
+            logger.info("rowset: ${rowset}")
+            int start_index = rowset.indexOf("]")
+            int end_index = rowset.indexOf("DATA")
+            def segmentNumStr = rowset.substring(start_index + 1, 
end_index).trim()
+            logger.info("segmentNumStr: ${segmentNumStr}")
+            assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
+        }
+
+        def loadMultiSegmentData = { rows->
+            // load data that will have multi segments and there are duplicate 
keys between segments
+            String content = ""
+            (1..rows).each {
+                content += "${it},${it},${it}\n"
+            }
+            content += content
+
+            // key=-1, in segment 0, sequence column is smaller than existing 
row in table
+            // full compaction will generate delete bitmap mark on this 
segment when calculating for incremental rowset
+            content = "-1,-1,1\n${content}"
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', ','
+                inputStream new ByteArrayInputStream(content.getBytes())
+                time 30000
+            }
+        }
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+
+        try {
+            // batch_size is 4164 in csv_reader.cpp
+            // _batch_size is 8192 in vtablet_writer.cpp
+            // to cause multi segments
+            GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
+            // block load and compaction
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block")
+
+            def newThreadInDocker = { Closure actionSupplier ->
+                def connInfo = context.threadLocalConn.get()
+                return Thread.start {
+                    connect(connInfo.username, connInfo.password, 
connInfo.conn.getMetaData().getURL(), actionSupplier)
+                }
+            }
+
+            def t1 = newThreadInDocker {
+                loadMultiSegmentData(4096)
+            }
+
+            // trigger full compaction
+            logger.info("trigger full compaction on BE ${tabletBackend.Host} 
with backendId=${tabletBackend.BackendId}")
+            def (code, out, err) = be_run_full_compaction(tabletBackend.Host, 
tabletBackend.HttpPort, tabletId)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assert  code == 0
+            def compactJson = parseJson(out.trim())
+            assert "success" == compactJson.status.toLowerCase()
+
+
+            Thread.sleep(1000)
+
+            // let the load publish
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+            t1.join()
+            Thread.sleep(1000)
+
+
+            // let full compaction continue and wait for compaction to finish
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block")
+            def running = true
+            do {
+                Thread.sleep(1000)
+                (code, out, err) = 
be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+
+
+            // use new cluster to force it read delete bitmaps from MS rather 
than BE's cache
+            cluster.addBackend(1, "cluster1")
+            sql """ use @cluster1 """
+            qt_dup_key_count "select count() from (select k, count(*) from 
${tableName} group by k having count(*) > 1) t"
+            qt_sql "select count() from ${tableName};"
+            checkSegmentNum(2, 3)
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to