This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6217073db7b216acde0f833d5e39a838e35bd530
Author: meiyi <myime...@gmail.com>
AuthorDate: Wed Aug 7 09:56:15 2024 +0800

    [fix](group commit) fix repaly wal check label status (#38883)
    
    ## Proposed changes
    
    When replay wal, it firstly abort the txn with the label but does not
    check the abort result.
    And when begin txn of replay, if FE returns `LabelAlreadyUsedException`,
    it consider the load is success in previous group commit load or repaly
    wal, and delete this wal directly.
    But `LabelAlreadyUsedException` means there is a txn with this label,
    the txn may be in `PREPARE / RUNNING / COMMITTED / VISIBLE` status(the
    abort txn in first step may fail), so replay wal should check both
    `LabelAlreadyUsedException` and txn status is `COMMITTED / VISIBLE`.
    
    This pr also add a case for replay wal with schema change.
---
 be/src/olap/wal/wal_table.cpp                      | 10 ++-
 be/src/runtime/group_commit_mgr.cpp                |  4 +
 .../doris/common/LabelAlreadyUsedException.java    |  3 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  3 +
 .../group_commit/group_commit_wal_msg.csv          |  5 ++
 .../group_commit/replay_wal_restart_fe.groovy      | 80 ++++++++++++++++++
 .../test_group_commit_replay_wal.groovy            | 96 ++++++++++++++++++++++
 7 files changed, 197 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index ef98bb58ae4..9573f81c846 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -92,7 +92,8 @@ Status WalTable::_relay_wal_one_by_one() {
         auto msg = st.msg();
         if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || 
st.is<ErrorCode::NOT_FOUND>() ||
             st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
-            msg.find("LabelAlreadyUsedException") != msg.npos) {
+            (msg.find("LabelAlreadyUsedException") != msg.npos &&
+             (msg.find("[COMMITTED]") != msg.npos || msg.find("[VISIBLE]") != 
msg.npos))) {
             LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
                       << ", st=" << st.to_string();
             need_delete_wals.push_back(wal_info);
@@ -166,8 +167,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string& 
label) {
     request.__set_auth_code(0); // this is a fake, fe not check it now
     request.__set_db_id(db_id);
     request.__set_label(label);
-    std::string reason = "relay wal with label " + label;
-    request.__set_reason(reason);
+    request.__set_reason("relay wal with label " + label);
     TLoadTxnRollbackResult result;
     TNetworkAddress master_addr = _exec_env->master_info()->network_address;
     auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
@@ -195,6 +195,10 @@ Status WalTable::_replay_wal_internal(const std::string& 
wal) {
         [[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
     }
 #endif
+    DBUG_EXECUTE_IF("WalTable.replay_wals.stop", {
+        // LOG(INFO) << "WalTable.replay_wals.stop";
+        return Status::InternalError("WalTable.replay_wals.stop");
+    });
     return _replay_one_wal_with_streamload(wal_id, wal, label);
 }
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 30885fa1ac9..d7689cfe940 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -435,6 +435,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
     Status result_status;
     DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
                     { status = Status::InternalError(""); });
+    DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error",
+                    { status = Status::InternalError("load_error"); });
     if (status.ok()) {
         // commit txn
         TLoadTxnCommitRequest request;
@@ -474,6 +476,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
                     .error(result_status);
             retry_times++;
         }
+        
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error",
+                        { result_status = 
Status::InternalError("commit_success_and_rpc_error"); });
     } else {
         // abort txn
         TLoadTxnRollbackRequest request;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
index d739f2032f3..8c508809d59 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
@@ -38,7 +38,8 @@ public class LabelAlreadyUsedException extends DdlException {
     }
 
     public LabelAlreadyUsedException(TransactionState txn) {
-        super("Label [" + txn.getLabel() + "] has already been used, relate to 
txn [" + txn.getTransactionId() + "]");
+        super("Label [" + txn.getLabel() + "] has already been used, relate to 
txn [" + txn.getTransactionId()
+                + "], status [" + txn.getTransactionStatus() + "].");
         switch (txn.getTransactionStatus()) {
             case UNKNOWN:
             case PREPARE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 58754d69fd0..07f63a4cb80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1845,6 +1845,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             return result;
         }
         try {
+            if 
(DebugPointUtil.isEnable("FrontendServiceImpl.loadTxnRollback.error")) {
+                throw new 
UserException("FrontendServiceImpl.loadTxnRollback.error");
+            }
             loadTxnRollbackImpl(request);
         } catch (MetaNotFoundException e) {
             LOG.warn("failed to rollback txn, id: {}, label: {}", 
request.getTxnId(), request.getLabel(), e);
diff --git 
a/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv 
b/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv
new file mode 100644
index 00000000000..6ab7bd6bcdf
--- /dev/null
+++ b/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv
@@ -0,0 +1,5 @@
+1,1
+2,2
+3,3
+4,4
+5,5
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy 
b/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy
new file mode 100644
index 00000000000..d39bdd9d4a9
--- /dev/null
+++ b/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
+// and modified by Doris.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("replay_wal_restart_fe") {
+    def check_schema_change = { state ->
+        for (int i = 0; i < 30; i++) {
+            def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE 
TableName = 'tbl_2' order by CreateTime desc;"
+            assertTrue(jobs.size() >= 1)
+            logger.info("alter job: ${jobs[0]}")
+            if (jobs[0].State == state) {
+                break
+            }
+            sleep(1000)
+        }
+    }
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.enableDebugPoints()
+    options.feConfigs.add('sys_log_verbose_modules=org.apache.doris')
+    options.beConfigs.add('sys_log_verbose_modules=*')
+    options.beConfigs.add('enable_java_support=false')
+    docker(options) {
+        def result = sql 'SELECT DATABASE()'
+
+        // group commit load error and stop replay
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error")
+        GetDebugPoint().enableDebugPointForAllBEs("WalTable.replay_wals.stop")
+
+        // 1 wal need to replay
+        sql 'CREATE TABLE tbl_2 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) 
BUCKETS 1 PROPERTIES ( "replication_num" = "1", 
"group_commit_interval_ms"="1000")'
+        sql 'SET GROUP_COMMIT = ASYNC_MODE'
+        sql 'INSERT INTO tbl_2 VALUES (1, 2)'
+
+        // do schema change
+        sql 'ALTER TABLE tbl_2 ORDER BY (k2, k1)'
+        check_schema_change('RUNNING')
+
+        // stop be, restart fe, start be
+        cluster.stopBackends()
+        cluster.restartFrontends()
+        sleep(30000)
+        context.reconnectFe()
+        check_schema_change('RUNNING')
+        cluster.startBackends()
+
+        // check schema change status and row count
+        check_schema_change('FINISHED')
+        for (int i = 0; i < 30; i++) {
+            result = sql "select count(*) from tbl_2"
+            logger.info("rowCount: ${result}")
+            if (result[0][0] >= 1) {
+                break
+            }
+            sleep(1000)
+        }
+        order_qt_select_1 'SELECT * FROM tbl_2'
+    }
+}
diff --git 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
new file mode 100644
index 00000000000..da7bcadc11a
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_group_commit_replay_wal", "nonConcurrent") {
+    def tableName = "test_group_commit_replay_wal"
+
+    def getRowCount = { expectedRowCount ->
+        Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
+            {
+                def result = sql "select count(*) from ${tableName}"
+                logger.info("table: ${tableName}, rowCount: ${result}")
+                return result[0][0] == expectedRowCount
+            }
+        )
+    }
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k`) 
+        BUCKETS 5 
+        properties("replication_num" = "1", "group_commit_interval_ms"="2000")
+    """
+
+    // 1. load success but commit rpc timeout
+    // 2. should skip replay because of fe throw LabelAlreadyUsedException and 
txn status is VISIBLE
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error")
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', ','
+            set 'group_commit', 'async_mode'
+            unset 'label'
+            file 'group_commit_wal_msg.csv'
+            time 10000
+        }
+        getRowCount(5)
+        // check wal count is 0
+        sleep(5000)
+    } catch (Exception e) {
+        logger.info("failed: " + e.getMessage())
+        assertTrue(false)
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+
+    // load fail and abort fail, wal should not be deleted and retry
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error")
+        
GetDebugPoint().enableDebugPointForAllFEs("FrontendServiceImpl.loadTxnRollback.error")
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', ','
+            set 'group_commit', 'async_mode'
+            unset 'label'
+            file 'group_commit_wal_msg.csv'
+            time 10000
+        }
+        getRowCount(5)
+        sleep(10000) // wal replay but all failed
+        getRowCount(5)
+        // check wal count is 1
+
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        getRowCount(10)
+        // check wal count is 0
+    } catch (Exception e) {
+        logger.info("failed: " + e.getMessage())
+        assertTrue(false)
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to