This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 315e6a95942 [Fix](cloud) Should calculate delete bitmaps between 
segments when re-calculate for all historical data if cache missed in publish 
phase (#48748)
315e6a95942 is described below

commit 315e6a9594287f1171eabfe3247ff330ec59a6c2
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Mon Mar 10 11:01:23 2025 +0800

    [Fix](cloud) Should calculate delete bitmaps between segments when 
re-calculate for all historical data if cache missed in publish phase (#48748)
---
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp |  10 ++
 ...est_cloud_multi_segments_re_calc_in_publish.out | Bin 0 -> 202 bytes
 ..._cloud_multi_segments_re_calc_in_publish.groovy | 123 +++++++++++++++++++++
 3 files changed, 133 insertions(+)

diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index f4e25302374..06ebf249edb 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -29,6 +29,7 @@
 #include "common/status.h"
 #include "olap/base_tablet.h"
 #include "olap/olap_common.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_meta.h"
@@ -318,6 +319,15 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
         LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
                   << ", publish_status=SUCCEED, not need to re-calculate 
delete_bitmaps.";
     } else {
+        if (rowset->num_segments() > 1 &&
+            
!delete_bitmap->has_calculated_for_multi_segments(rowset->rowset_id())) {
+            // delete bitmap cache missed, should re-calculate delete bitmaps 
between segments
+            std::vector<segment_v2::SegmentSharedPtr> segments;
+            
RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
+            
RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(rowset->rowset_id(),
+                                                                        
segments, delete_bitmap));
+        }
+
         if (invisible_rowsets == nullptr) {
             status = CloudTablet::update_delete_bitmap(tablet, &txn_info, 
transaction_id,
                                                        txn_expiration);
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.out
new file mode 100644
index 00000000000..f9e767c3d20
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
new file mode 100644
index 00000000000..b741a6e9986
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def table1 = "test_cloud_multi_segments_re_calc_in_publish"
+    sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+    sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+            `k1` int NOT NULL,
+            `c1` int,
+            `c2` int
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${table1} values(99999,99999,99999);"
+    sql "insert into ${table1} values(88888,88888,88888);"
+    sql "insert into ${table1} values(77777,77777,77777);"
+    sql "sync;"
+    qt_sql "select * from ${table1} order by k1;"
+
+    def checkSegmentNum = { rowsetNum, lastRowsetSegmentNum ->
+        def tablets = sql_return_maparray """ show tablets from ${table1}; """
+        logger.info("tablets: ${tablets}")
+        String compactionUrl = tablets[0]["CompactionStatus"]
+        def (code, out, err) = curl("GET", compactionUrl)
+        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
+        assertEquals(code, 0)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        assert tabletJson.rowsets.size() == rowsetNum + 1
+        def rowset = tabletJson.rowsets.get(tabletJson.rowsets.size() - 1)
+        logger.info("rowset: ${rowset}")
+        int start_index = rowset.indexOf("]")
+        int end_index = rowset.indexOf("DATA")
+        def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+        logger.info("segmentNumStr: ${segmentNumStr}")
+        assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
+    }
+
+    // to cause multi segments
+    def customBeConfig = [
+        doris_scanner_row_bytes : 1
+    ]
+
+    setBeConfigTemporary(customBeConfig) {
+        try {
+            // batch_size is 4164 in csv_reader.cpp
+            // _batch_size is 8192 in vtablet_writer.cpp
+            // to cause multi segments
+            GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
+            // inject cache miss so that it will re-calculate delete bitmaps 
for all historical data in publish
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss")
+
+            Thread.sleep(1000)
+
+            def t1 = Thread.start {
+                // load data that will have multi segments and there are 
duplicate keys between segments
+                String content = ""
+                (1..4096).each {
+                    content += "${it},${it},${it}\n"
+                }
+                content += content
+                streamLoad {
+                    table "${table1}"
+                    set 'column_separator', ','
+                    inputStream new ByteArrayInputStream(content.getBytes())
+                    time 30000 // limit inflight 10s
+
+                    check { result, exception, startTime, endTime ->
+                        if (exception != null) {
+                            throw exception
+                        }
+                        def json = parseJson(result)
+                        assert "success" == json.Status.toLowerCase()
+                        assert 8192 == json.NumberTotalRows
+                        assert 0 == json.NumberFilteredRows
+                    }
+                }
+            }
+
+
+            t1.join()
+
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            Thread.sleep(2000)
+
+            qt_sql "select count() from ${table1};"
+
+            qt_dup_key_count "select count() from (select k1,count() as cnt 
from ${table1} group by k1 having cnt > 1) A;"
+
+            // ensure that we really write multi segments
+            checkSegmentNum(4, 3)
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to