This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new bcd01c5f242 branch-2.1-pick: [Fix](merge-on-write) should calculate 
delete bitmaps between segmens before skip if tablet is in `NOT_READY` state in 
flush phase (#48056) (#48089)
bcd01c5f242 is described below

commit bcd01c5f2425c8bd9a00392e6c893b5e77d0fb2e
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Fri Feb 21 19:53:17 2025 +0800

    branch-2.1-pick: [Fix](merge-on-write) should calculate delete bitmaps 
between segmens before skip if tablet is in `NOT_READY` state in flush phase 
(#48056) (#48089)
    
    pick https://github.com/apache/doris/pull/48056
---
 be/src/olap/memtable.cpp                           |   2 +
 be/src/olap/rowset_builder.cpp                     |  15 +-
 .../test_skip_calc_between_segments.out            | Bin 0 -> 175 bytes
 .../org/apache/doris/regression/suite/Suite.groovy |  68 +++++++++
 .../test_skip_calc_between_segments.groovy         | 153 +++++++++++++++++++++
 5 files changed, 231 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ec64406eb31..856cba6a1f5 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -35,6 +35,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "tablet_meta.h"
+#include "util/debug_points.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
 #include "vec/aggregate_functions/aggregate_function_reader.h"
@@ -486,6 +487,7 @@ void MemTable::shrink_memtable_by_agg() {
 }
 
 bool MemTable::need_flush() const {
+    DBUG_EXECUTE_IF("MemTable.need_flush", { return true; });
     auto max_size = config::write_buffer_size;
     if (_is_partial_update) {
         auto update_columns_size = _num_columns;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index c668df4bd33..ddb55775965 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -255,13 +255,6 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
     }
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_submit_delete_bitmap_timer);
-    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (tablet()->tablet_state() == TABLET_NOTREADY) {
-        LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
-                     "tablet_id: "
-                  << tablet()->tablet_id() << " txn_id: " << _req.txn_id;
-        return Status::OK();
-    }
     auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
     std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
@@ -271,6 +264,14 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
                 tablet()->calc_delete_bitmap_between_segments(_rowset, 
segments, _delete_bitmap));
     }
 
+    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
+    if (_tablet->tablet_state() == TABLET_NOTREADY) {
+        LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
+                     "tablet_id: "
+                  << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
+        return Status::OK();
+    }
+
     // For partial update, we need to fill in the entire row of data, during 
the calculation
     // of the delete bitmap. This operation is resource-intensive, and we need 
to minimize
     // the number of times it occurs. Therefore, we skip this operation here.
diff --git 
a/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out 
b/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out
new file mode 100644
index 00000000000..783cb8c5f8f
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out 
differ
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index c9e057cd7d0..3cd7f89ba09 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1543,6 +1543,74 @@ class Suite implements GroovyInterceptable {
         }
     }
 
+    def get_be_param = { paramName ->
+        def ipList = [:]
+        def portList = [:]
+        def backendId_to_params = [:]
+        getBackendIpHttpPort(ipList, portList)
+        for (String id in ipList.keySet()) {
+            def beIp = ipList.get(id)
+            def bePort = portList.get(id)
+            // get the config value from be
+            def (code, out, err) = curl("GET", 
String.format("http://%s:%s/api/show_config?conf_item=%s";, beIp, bePort, 
paramName))
+            assert code == 0
+            assert out.contains(paramName)
+            // parsing
+            def resultList = parseJson(out)[0]
+            assert resultList.size() == 4
+            // get original value
+            def paramValue = resultList[2]
+            backendId_to_params.put(id, paramValue)
+        }
+        logger.info("backendId_to_params: ${backendId_to_params}".toString())
+        return backendId_to_params
+    }
+
+    def set_be_param = { paramName, paramValue ->
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHttpPort(ipList, portList)
+        for (String id in ipList.keySet()) {
+            def beIp = ipList.get(id)
+            def bePort = portList.get(id)
+            logger.info("set be_id ${id} ${paramName} to 
${paramValue}".toString())
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
paramValue))
+            assert out.contains("OK")
+        }
+    }
+
+    def set_original_be_param = { paramName, backendId_to_params ->
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHttpPort(ipList, portList)
+        for (String id in ipList.keySet()) {
+            def beIp = ipList.get(id)
+            def bePort = portList.get(id)
+            def paramValue = backendId_to_params.get(id)
+            logger.info("set be_id ${id} ${paramName} to 
${paramValue}".toString())
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
paramValue))
+            assert out.contains("OK")
+        }
+    }
+
+    void setBeConfigTemporary(Map<String, Object> tempConfig, Closure 
actionSupplier) {
+        Map<String, Map<String, String>> originConf = Maps.newHashMap()
+        tempConfig.each{ k, v ->
+            originConf.put(k, get_be_param(k))
+        }
+        try {
+            tempConfig.each{ k, v -> set_be_param(k, v)}
+            actionSupplier()
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            originConf.each { k, confs ->
+                set_original_be_param(k, confs)
+            }
+        }
+    }
+
     void waiteCreateTableFinished(String tableName) {
         Thread.sleep(2000);
         String showCreateTable = "SHOW CREATE TABLE ${tableName}"
diff --git 
a/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
 
b/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
new file mode 100644
index 00000000000..a549bbec562
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_skip_calc_between_segments", "nonConcurrent") {
+
+    def table1 = "test_skip_calc_between_segments"
+    sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+    sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+            `k1` int NOT NULL,
+            `c1` int,
+            `c2` int
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${table1} values(99999,99999,99999);"
+    sql "insert into ${table1} values(88888,88888,88888);"
+    sql "insert into ${table1} values(77777,77777,77777);"
+    sql "sync;"
+    qt_sql "select * from ${table1} order by k1;"
+
+    def block_sc = {
+        
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+    }
+    
+    def unblock_sc = {
+        
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+    }
+
+    def block_publish = {
+        
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+        
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+    }
+
+    def unblock_publish = {
+        
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+        
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+    }
+
+    def checkSegmentNum = { rowsetNum, lastRowsetSegmentNum ->
+        def tablets = sql_return_maparray """ show tablets from ${table1}; """
+        logger.info("tablets: ${tablets}")
+        assertEquals(1, tablets.size())
+        String compactionUrl = tablets[0]["CompactionStatus"]
+        def (code, out, err) = curl("GET", compactionUrl)
+        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
+        assertEquals(code, 0)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        assert tabletJson.rowsets.size() == rowsetNum + 1
+        def rowset = tabletJson.rowsets.get(tabletJson.rowsets.size() - 1)
+        logger.info("rowset: ${rowset}")
+        int start_index = rowset.indexOf("]")
+        int end_index = rowset.indexOf("DATA")
+        def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+        logger.info("segmentNumStr: ${segmentNumStr}")
+        assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
+    }
+
+    // to cause multi segments
+    def customBeConfig = [
+        doris_scanner_row_bytes : 1
+    ]
+
+    setBeConfigTemporary(customBeConfig) {
+        try {
+            // batch_size is 4164 in csv_reader.cpp
+            // _batch_size is 8192 in vtablet_writer.cpp
+
+            // to cause multi segments
+            GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
+            block_publish()
+
+            // block sc to let load skip to calculate delete bitmap in flush 
and commit phase
+            block_sc()
+
+            sql "alter table ${table1} modify column c1 varchar(100);"
+
+            Thread.sleep(3000)
+
+            def t1 = Thread.start {
+                // load data that will have multi segments and there are 
duplicate keys between segments
+                String content = ""
+                (1..4096).each {
+                    content += "${it},${it},${it}\n"
+                }
+                content += content
+                streamLoad {
+                    table "${table1}"
+                    set 'column_separator', ','
+                    inputStream new ByteArrayInputStream(content.getBytes())
+                    time 30000 // limit inflight 10s
+
+                    check { result, exception, startTime, endTime ->
+                        if (exception != null) {
+                            throw exception
+                        }
+                        def json = parseJson(result)
+                        assertEquals("success", json.Status.toLowerCase())
+                        assertEquals(8192, json.NumberTotalRows)
+                        assertEquals(0, json.NumberFilteredRows)
+                    }
+                }
+            }
+
+
+            Thread.sleep(2000)
+
+            // let sc finish and wait for tablet state to be RUNNING
+            unblock_sc()
+            waitForSchemaChangeDone {
+                sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' 
ORDER BY createtime DESC LIMIT 1 """
+                time 1000
+            }
+            logger.info("wait for schema change done")
+
+            Thread.sleep(500)
+
+            unblock_publish()
+
+            t1.join()
+            // ensure that we really write multi segments
+            checkSegmentNum(4, 3)
+
+            qt_sql "select count() from (select k1,count() as cnt from 
${table1} group by k1 having cnt > 1) A;"
+
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to