This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5766047c616 branch-3.0: [fix](group commit) fix group commit with 
schema change (#51144) (#51306)
5766047c616 is described below

commit 5766047c616a29a0fb389203bd4b0735bbb17f2b
Author: meiyi <me...@selectdb.com>
AuthorDate: Thu May 29 10:07:54 2025 +0800

    branch-3.0: [fix](group commit) fix group commit with schema change 
(#51144) (#51306)
    
    pick https://github.com/apache/doris/pull/51144
---
 be/src/olap/wal/wal_table.cpp                      |   1 +
 .../exec/group_commit_block_sink_operator.cpp      |   8 +-
 be/src/runtime/group_commit_mgr.cpp                |  36 +++---
 be/src/runtime/group_commit_mgr.h                  |  15 ++-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  17 +++
 .../apache/doris/service/FrontendServiceImpl.java  |   4 +
 .../test_group_commit_schema_change.out            | Bin 0 -> 115 bytes
 .../group_commit/test_group_commit_error.groovy    |  19 +++
 .../test_group_commit_replay_wal.groovy            |  15 ++-
 .../test_group_commit_schema_change.groovy         | 135 +++++++++++++++++++++
 10 files changed, 224 insertions(+), 26 deletions(-)

diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index a671717b50f..aed180c86a3 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -251,6 +251,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const 
std::string& wal,
     ctx->group_commit = false;
     ctx->load_type = TLoadType::MANUL_LOAD;
     ctx->load_src_type = TLoadSourceType::RAW;
+    ctx->max_filter_ratio = 1;
     auto st = _http_stream_action->process_put(nullptr, ctx);
     if (st.ok()) {
         // wait stream load finish
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index a9201f0302f..8b1ef1be95a 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -68,9 +68,9 @@ Status 
GroupCommitBlockSinkLocalState::_initialize_load_queue() {
     auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
     if (_state->exec_env()->wal_mgr()->is_running()) {
         
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
-                p._db_id, p._table_id, p._base_schema_version, p._load_id, 
_load_block_queue,
-                _state->be_exec_version(), _state->query_mem_tracker(), 
_create_plan_dependency,
-                _put_block_dependency));
+                p._db_id, p._table_id, p._base_schema_version, 
p._schema->indexes().size(),
+                p._load_id, _load_block_queue, _state->be_exec_version(),
+                _state->query_mem_tracker(), _create_plan_dependency, 
_put_block_dependency));
         _state->set_import_label(_load_block_queue->label);
         _state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id
         return Status::OK();
@@ -259,7 +259,7 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& 
t_sink) {
     RETURN_IF_ERROR(_schema->init(table_sink.schema));
     _db_id = table_sink.db_id;
     _table_id = table_sink.table_id;
-    _base_schema_version = table_sink.base_schema_version;
+    _base_schema_version = _schema->version();
     _partition = table_sink.partition;
     _group_commit_mode = table_sink.group_commit_mode;
     _load_id = table_sink.load_id;
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index f49d6708bcc..0f22dbf4573 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -46,6 +46,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
         return runtime_state->cancel_reason();
     }
     RETURN_IF_ERROR(status);
+    DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block", DBUG_BLOCK);
     if (block->rows() > 0) {
         if (!config::group_commit_wait_replay_wal_finish) {
             _block_queue.emplace_back(block);
@@ -144,7 +145,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
                           << ", duration=" << duration << ", load_ids=" << 
get_load_ids();
             }
         }
-        if (!_need_commit && !timer_dependency->ready()) {
+        if (!_need_commit) {
             get_block_dep->block();
             VLOG_DEBUG << "block get_block for query_id=" << load_instance_id;
         }
@@ -254,7 +255,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) 
{
 }
 
 Status GroupCommitTable::get_first_block_load_queue(
-        int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
+        int64_t table_id, int64_t base_schema_version, int64_t index_size, 
const UniqueId& load_id,
         std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
         std::shared_ptr<MemTrackerLimiter> mem_tracker,
         std::shared_ptr<pipeline::Dependency> create_plan_dep,
@@ -270,7 +271,8 @@ Status GroupCommitTable::get_first_block_load_queue(
         }
         for (const auto& [_, inner_block_queue] : _load_block_queues) {
             if (!inner_block_queue->need_commit()) {
-                if (base_schema_version == inner_block_queue->schema_version) {
+                if (base_schema_version == inner_block_queue->schema_version &&
+                    index_size == inner_block_queue->index_size) {
                     if (inner_block_queue->add_load_id(load_id, 
put_block_dep).ok()) {
                         load_block_queue = inner_block_queue;
                         return Status::OK();
@@ -290,8 +292,8 @@ Status GroupCommitTable::get_first_block_load_queue(
         return Status::OK();
     }
     create_plan_dep->block();
-    _create_plan_deps.emplace(load_id,
-                              std::make_tuple(create_plan_dep, put_block_dep, 
base_schema_version));
+    _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, 
put_block_dep,
+                                                       base_schema_version, 
index_size));
     if (!_is_creating_plan_fragment) {
         _is_creating_plan_fragment = true;
         RETURN_IF_ERROR(
@@ -378,18 +380,21 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
             LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
             return st;
         }
-        auto schema_version = result.base_schema_version;
         auto& pipeline_params = result.pipeline_params;
+        auto schema_version = 
pipeline_params.fragment.output_sink.olap_table_sink.schema.version;
+        auto index_size =
+                
pipeline_params.fragment.output_sink.olap_table_sink.schema.indexes.size();
         DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == 
_db_id);
         txn_id = pipeline_params.txn_conf.txn_id;
         DCHECK(pipeline_params.local_params.size() == 1);
         instance_id = pipeline_params.local_params[0].fragment_instance_id;
         VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" 
<< _table_id
-                   << ", schema version=" << schema_version << ", label=" << 
label
-                   << ", txn_id=" << txn_id << ", instance_id=" << 
print_id(instance_id);
+                   << ", schema version=" << schema_version << ", index size=" 
<< index_size
+                   << ", label=" << label << ", txn_id=" << txn_id
+                   << ", instance_id=" << print_id(instance_id);
         {
             auto load_block_queue = std::make_shared<LoadBlockQueue>(
-                    instance_id, label, txn_id, schema_version, 
_all_block_queues_bytes,
+                    instance_id, label, txn_id, schema_version, index_size, 
_all_block_queues_bytes,
                     result.wait_internal_group_commit_finish, 
result.group_commit_interval_ms,
                     result.group_commit_data_bytes);
             RETURN_IF_ERROR(load_block_queue->create_wal(
@@ -403,7 +408,8 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
             for (const auto& [id, load_info] : _create_plan_deps) {
                 auto create_dep = std::get<0>(load_info);
                 auto put_dep = std::get<1>(load_info);
-                if (load_block_queue->schema_version == 
std::get<2>(load_info)) {
+                if (load_block_queue->schema_version == std::get<2>(load_info) 
&&
+                    load_block_queue->index_size == std::get<3>(load_info)) {
                     if (load_block_queue->add_load_id(id, put_dep).ok()) {
                         create_dep->set_ready();
                         success_load_ids.emplace_back(id);
@@ -624,9 +630,9 @@ void GroupCommitMgr::stop() {
 }
 
 Status GroupCommitMgr::get_first_block_load_queue(
-        int64_t db_id, int64_t table_id, int64_t base_schema_version, const 
UniqueId& load_id,
-        std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
-        std::shared_ptr<MemTrackerLimiter> mem_tracker,
+        int64_t db_id, int64_t table_id, int64_t base_schema_version, int64_t 
index_size,
+        const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
+        int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker,
         std::shared_ptr<pipeline::Dependency> create_plan_dep,
         std::shared_ptr<pipeline::Dependency> put_block_dep) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
@@ -640,8 +646,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
         group_commit_table = _table_map[table_id];
     }
     RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
-            table_id, base_schema_version, load_id, load_block_queue, 
be_exe_version, mem_tracker,
-            create_plan_dep, put_block_dep));
+            table_id, base_schema_version, index_size, load_id, 
load_block_queue, be_exe_version,
+            mem_tracker, create_plan_dep, put_block_dep));
     return Status::OK();
 }
 
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 32579547893..2be17400026 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -55,7 +55,7 @@ struct BlockData {
 class LoadBlockQueue {
 public:
     LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, 
int64_t txn_id,
-                   int64_t schema_version,
+                   int64_t schema_version, int64_t index_size,
                    std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
                    bool wait_internal_group_commit_finish, int64_t 
group_commit_interval_ms,
                    int64_t group_commit_data_bytes)
@@ -63,6 +63,7 @@ public:
               label(label),
               txn_id(txn_id),
               schema_version(schema_version),
+              index_size(index_size),
               
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
               _group_commit_interval_ms(group_commit_interval_ms),
               _start_time(std::chrono::steady_clock::now()),
@@ -108,6 +109,7 @@ public:
     std::string label;
     int64_t txn_id;
     int64_t schema_version;
+    int64_t index_size;
     bool wait_internal_group_commit_finish = false;
     bool data_size_condition = false;
 
@@ -157,7 +159,7 @@ public:
               _db_id(db_id),
               _table_id(table_id) {};
     Status get_first_block_load_queue(int64_t table_id, int64_t 
base_schema_version,
-                                      const UniqueId& load_id,
+                                      int64_t index_size, const UniqueId& 
load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version,
                                       std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
@@ -189,9 +191,10 @@ private:
     // fragment_instance_id to load_block_queue
     std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> 
_load_block_queues;
     bool _is_creating_plan_fragment = false;
-    // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version>
-    std::unordered_map<UniqueId, 
std::tuple<std::shared_ptr<pipeline::Dependency>,
-                                            
std::shared_ptr<pipeline::Dependency>, int64_t>>
+    // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version, 
index_size>
+    std::unordered_map<UniqueId,
+                       std::tuple<std::shared_ptr<pipeline::Dependency>,
+                                  std::shared_ptr<pipeline::Dependency>, 
int64_t, int64_t>>
             _create_plan_deps;
 };
 
@@ -207,7 +210,7 @@ public:
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                 std::shared_ptr<pipeline::Dependency> 
get_block_dep);
     Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t 
base_schema_version,
-                                      const UniqueId& load_id,
+                                      int64_t index_size, const UniqueId& 
load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version,
                                       std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index ea53a931131..b3136b41b04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -48,6 +48,7 @@ import org.apache.doris.common.SchemaVersionAndHash;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.DbUtil;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.task.AgentBatchTask;
@@ -377,6 +378,14 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
         // create all replicas success.
         // add all shadow indexes to catalog
+        while 
(DebugPointUtil.isEnable("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block"))
 {
+            try {
+                Thread.sleep(1000);
+                LOG.info("block addShadowIndexToCatalog for job: {}", jobId);
+            } catch (InterruptedException e) {
+                LOG.warn("InterruptedException: ", e);
+            }
+        }
         tbl.writeLockOrAlterCancelException();
         try {
             Preconditions.checkState(tbl.getState() == 
OlapTableState.SCHEMA_CHANGE);
@@ -609,6 +618,14 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             }
             return;
         }
+        while 
(DebugPointUtil.isEnable("FE.SchemaChangeJobV2.runRunning.block")) {
+            try {
+                Thread.sleep(1000);
+                LOG.info("block schema change for job: {}", jobId);
+            } catch (InterruptedException e) {
+                LOG.warn("InterruptedException: ", e);
+            }
+        }
         Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
         Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId);
         Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index bbf8555f0fd..9140b033f5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2174,6 +2174,10 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         String originStmt = request.getLoadSql();
         HttpStreamParams httpStreamParams;
         try {
+            while 
(DebugPointUtil.isEnable("FE.FrontendServiceImpl.initHttpStreamPlan.block")) {
+                Thread.sleep(1000);
+                LOG.info("block initHttpStreamPlan");
+            }
             StmtExecutor executor = new StmtExecutor(ctx, originStmt);
             ctx.setExecutor(executor);
             httpStreamParams = executor.generateHttpStreamPlan(ctx.queryId());
diff --git 
a/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out
 
b/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out
new file mode 100644
index 00000000000..3747bce7722
Binary files /dev/null and 
b/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out
 differ
diff --git 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
index 7f785a3292f..6e9a89aa0f7 100644
--- 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
+++ 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
@@ -73,4 +73,23 @@ suite("test_group_commit_error", "nonConcurrent") {
     } finally {
         GetDebugPoint().clearDebugPointsForAllBEs()
     }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.block")
+        Thread thread = new Thread(() -> {
+            sql """ set group_commit = async_mode """
+            sql """ insert into ${tableName} values (5, 4) """
+        })
+        thread.start()
+        sleep(4000)
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        thread.join()
+        def result = sql "select count(*) from ${tableName}"
+        logger.info("rowCount 0: ${result}")
+    } catch (Exception e) {
+        logger.warn("unexpected failed: " + e.getMessage())
+        assertTrue(false, "unexpected failed: " + e.getMessage())
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
 }
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
index 2858d1e4f51..0fbc3ec0a8d 100644
--- 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
+++ 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
@@ -37,6 +37,10 @@ suite("test_group_commit_replay_wal", "nonConcurrent") {
             `k` int ,
             `v` int ,
         ) engine=olap
+        PARTITION BY LIST(k) ( 
+            PARTITION p1 VALUES IN ("1","2","3","4"), 
+            PARTITION p2 VALUES IN ("5")
+        )
         DISTRIBUTED BY HASH(`k`) 
         BUCKETS 5 
         properties("replication_num" = "1", "group_commit_interval_ms"="2000")
@@ -86,9 +90,18 @@ suite("test_group_commit_replay_wal", "nonConcurrent") {
         sleep(4000) // wal replay but all failed
         getRowCount(5)
         // check wal count is 1
+        sql """ ALTER TABLE ${tableName} DROP PARTITION p2 """
+        for (int i = 0; i < 10; i++) {
+            List<List<Object>> partitions = sql "show partitions from 
${tableName};"
+            logger.info("partitions: ${partitions}")
+            if (partitions.size() == 1) {
+                break
+            }
+            sleep(100)
+        }
 
         GetDebugPoint().clearDebugPointsForAllFEs()
-        getRowCount(10)
+        getRowCount(8)
         // check wal count is 0
     } catch (Exception e) {
         logger.info("failed: " + e.getMessage())
diff --git 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy
 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy
new file mode 100644
index 00000000000..06bbbebef5b
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovyjarjarantlr4.v4.codegen.model.ExceptionClause
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_group_commit_schema_change", "nonConcurrent") {
+    def tableName3 = "test_group_commit_schema_change"
+
+    onFinish {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+    }
+
+    def getJobState = { tableName ->
+        def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE 
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
+        logger.info("jobStateResult: ${jobStateResult}")
+        return jobStateResult[0][9]
+    }
+
+    def getRowCount = { expectedRowCount ->
+        Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
+            {
+                def result = sql "select count(*) from ${tableName3}"
+                logger.info("table: ${tableName3}, rowCount: ${result}")
+                return result[0][0] == expectedRowCount
+            }
+        )
+    }
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    sql """ DROP TABLE IF EXISTS ${tableName3} """
+    sql """
+        CREATE TABLE ${tableName3} (
+            `id` int(11) NOT NULL,
+            `name` varchar(50) NULL,
+            `score` varchar(11) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "group_commit_interval_ms" = "200"
+        );
+    """
+
+    
GetDebugPoint().enableDebugPointForAllFEs("FE.FrontendServiceImpl.initHttpStreamPlan.block")
+    
GetDebugPoint().enableDebugPointForAllFEs("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block")
+
+    // write data
+    Thread thread = new Thread(() -> {
+        sql """ set group_commit = async_mode; """
+        for (int i = 0; i < 10; i++) {
+            try {
+                sql """ insert into ${tableName3} values (1, 'a', 100) """
+                break
+            } catch (Exception e) {
+                logger.info("insert error: ${e}")
+                if (e.getMessage().contains("schema version not match")) {
+                    continue
+                } else {
+                    throw e
+                }
+            }
+        }
+    })
+    thread.start()
+    sleep(1000)
+    def result = sql "select count(*) from ${tableName3}"
+    logger.info("rowCount 0: ${result}")
+    assertEquals(0, result[0][0])
+
+    // schema change
+    sql """ alter table ${tableName3} modify column score int NULL"""
+    
GetDebugPoint().enableDebugPointForAllFEs("FE.SchemaChangeJobV2.runRunning.block")
+    
GetDebugPoint().disableDebugPointForAllFEs("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block")
+    for (int i = 0; i < 10; i++) {
+        def job_state = getJobState(tableName3)
+        if (job_state == "RUNNING") {
+            break
+        }
+        sleep(100)
+    }
+
+    
GetDebugPoint().disableDebugPointForAllFEs("FE.FrontendServiceImpl.initHttpStreamPlan.block")
+    thread.join()
+    getRowCount(1)
+    qt_sql """ select id, name, score from ${tableName3} """
+    def job_state = getJobState(tableName3)
+    assertEquals("RUNNING", job_state)
+    
GetDebugPoint().disableDebugPointForAllFEs("FE.SchemaChangeJobV2.runRunning.block")
+    for (int i = 0; i < 10; i++) {
+        job_state = getJobState(tableName3)
+        if (job_state == "FINISHED") {
+            break
+        }
+        sleep(100)
+    }
+    assertEquals("FINISHED", job_state)
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to