This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 33375c4ca5c [Fix](merge-on-write) should calculate delete bitmaps 
between segments before skip if tablet is in `NOT_READY` state in flush phase 
(#48056)
33375c4ca5c is described below

commit 33375c4ca5ca97af6a0a304f36903cbff7c7fe04
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Wed Feb 19 16:29:32 2025 +0800

    [Fix](merge-on-write) should calculate delete bitmaps between segments 
before skip if tablet is in `NOT_READY` state in flush phase (#48056)
    
    ### What problem does this PR solve?
    Problem Summary:
    
    considering the following situation:
    1. heavy sc start on BE and change tablet's state to `NOT_READY`
    2. load1 skip to calculate delete bitmaps in flush phase because the
    tablet's state is `NOT_READY`
    3. heavy sc finished and change tablet's state to `RUNNING`
    4. load1 calculate delete bitmaps in publish phase for converted
    historical rowsets
    
    If load1 write multi segments, this will cause duplicate key because it
    forget to calculate delete bitmaps between segments in any phase.
---
 be/src/olap/rowset_builder.cpp                     |  15 +-
 .../test_skip_calc_between_segments.out            | Bin 0 -> 175 bytes
 .../test_skip_calc_between_segments.groovy         | 173 +++++++++++++++++++++
 3 files changed, 181 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 1a041698486..fce244f0839 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -274,13 +274,6 @@ Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() 
{
         }
     }
 
-    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (_tablet->tablet_state() == TABLET_NOTREADY) {
-        LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
-                     "tablet_id: "
-                  << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
-        return Status::OK();
-    }
     auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
     std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
@@ -290,6 +283,14 @@ Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() 
{
                                                                      
_delete_bitmap));
     }
 
+    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
+    if (_tablet->tablet_state() == TABLET_NOTREADY) {
+        LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
+                     "tablet_id: "
+                  << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
+        return Status::OK();
+    }
+
     // For partial update, we need to fill in the entire row of data, during 
the calculation
     // of the delete bitmap. This operation is resource-intensive, and we need 
to minimize
     // the number of times it occurs. Therefore, we skip this operation here.
diff --git 
a/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out 
b/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out
new file mode 100644
index 00000000000..783cb8c5f8f
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out 
differ
diff --git 
a/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
 
b/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
new file mode 100644
index 00000000000..2eb49eb2759
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_skip_calc_between_segments", "nonConcurrent") {
+
+    def table1 = "test_skip_calc_between_segments"
+    sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+    sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+            `k1` int NOT NULL,
+            `c1` int,
+            `c2` int
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${table1} values(99999,99999,99999);"
+    sql "insert into ${table1} values(88888,88888,88888);"
+    sql "insert into ${table1} values(77777,77777,77777);"
+    sql "sync;"
+    qt_sql "select * from ${table1} order by k1;"
+
+    def block_sc = {
+        if (isCloudMode()) {
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+        } else {
+            
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+        }
+    }
+    
+    def unblock_sc = {
+        if (isCloudMode()) {
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+        } else {
+            
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+        }
+    }
+
+    def block_publish = {
+        if (isCloudMode()) {
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+        } else {
+            
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+            
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+            
+        }
+    }
+
+    def unblock_publish = {
+        if (isCloudMode()) {
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+        } else {
+            
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+            
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+            
+        }
+    }
+
+    def checkSegmentNum = { rowsetNum, lastRowsetSegmentNum ->
+        def tablets = sql_return_maparray """ show tablets from ${table1}; """
+        logger.info("tablets: ${tablets}")
+        assertEquals(1, tablets.size())
+        String compactionUrl = tablets[0]["CompactionStatus"]
+        def (code, out, err) = curl("GET", compactionUrl)
+        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
+        assertEquals(code, 0)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        assert tabletJson.rowsets.size() == rowsetNum + 1
+        def rowset = tabletJson.rowsets.get(tabletJson.rowsets.size() - 1)
+        logger.info("rowset: ${rowset}")
+        int start_index = rowset.indexOf("]")
+        int end_index = rowset.indexOf("DATA")
+        def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+        logger.info("segmentNumStr: ${segmentNumStr}")
+        assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
+    }
+
+    // to cause multi segments
+    def customBeConfig = [
+        doris_scanner_row_bytes : 1
+    ]
+
+    setBeConfigTemporary(customBeConfig) {
+        try {
+            // batch_size is 4164 in csv_reader.cpp
+            // _batch_size is 8192 in vtablet_writer.cpp
+
+            // to cause multi segments
+            GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
+            block_publish()
+
+            // block sc to let load skip to calculate delete bitmap in flush 
and commit phase
+            block_sc()
+
+            sql "alter table ${table1} modify column c1 varchar(100);"
+
+            Thread.sleep(3000)
+
+            def t1 = Thread.start {
+                // load data that will have multi segments and there are 
duplicate keys between segments
+                String content = ""
+                (1..4096).each {
+                    content += "${it},${it},${it}\n"
+                }
+                content += content
+                streamLoad {
+                    table "${table1}"
+                    set 'column_separator', ','
+                    inputStream new ByteArrayInputStream(content.getBytes())
+                    time 30000 // limit inflight 10s
+
+                    check { result, exception, startTime, endTime ->
+                        if (exception != null) {
+                            throw exception
+                        }
+                        def json = parseJson(result)
+                        assertEquals("success", json.Status.toLowerCase())
+                        assertEquals(8192, json.NumberTotalRows)
+                        assertEquals(0, json.NumberFilteredRows)
+                    }
+                }
+            }
+
+
+            Thread.sleep(2000)
+
+            // let sc finish and wait for tablet state to be RUNNING
+            unblock_sc()
+            waitForSchemaChangeDone {
+                sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' 
ORDER BY createtime DESC LIMIT 1 """
+                time 1000
+            }
+            logger.info("wait for schema change done")
+
+            Thread.sleep(500)
+
+            unblock_publish()
+
+            t1.join()
+            // ensure that we really write multi segments
+            checkSegmentNum(4, 3)
+
+            qt_sql "select count() from (select k1,count() as cnt from 
${table1} group by k1 having cnt > 1) A;"
+
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to