This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 749c9f7b568 [fix](group commit) fix repaly wal check label status (#38883) (#38997) 749c9f7b568 is described below commit 749c9f7b56870d265ec81742521efeb2bc9b79a8 Author: meiyi <myime...@gmail.com> AuthorDate: Wed Aug 7 22:06:59 2024 +0800 [fix](group commit) fix repaly wal check label status (#38883) (#38997) pick https://github.com/apache/doris/pull/38883 --- be/src/olap/wal/wal_table.cpp | 10 ++- be/src/runtime/group_commit_mgr.cpp | 4 + .../doris/common/LabelAlreadyUsedException.java | 3 +- .../apache/doris/service/FrontendServiceImpl.java | 3 + .../group_commit/group_commit_wal_msg.csv | 5 ++ .../group_commit/replay_wal_restart_fe.groovy | 80 ++++++++++++++++++ .../test_group_commit_replay_wal.groovy | 96 ++++++++++++++++++++++ 7 files changed, 197 insertions(+), 4 deletions(-) diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index 38c262e9889..e57bf29a1c9 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -92,7 +92,8 @@ Status WalTable::_relay_wal_one_by_one() { auto msg = st.msg(); if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() || st.is<ErrorCode::DATA_QUALITY_ERROR>() || - msg.find("LabelAlreadyUsedException") != msg.npos) { + (msg.find("LabelAlreadyUsedException") != msg.npos && + (msg.find("[COMMITTED]") != msg.npos || msg.find("[VISIBLE]") != msg.npos))) { LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); // delete wal @@ -164,8 +165,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) { request.__set_auth_code(0); // this is a fake, fe not check it now request.__set_db_id(db_id); request.__set_label(label); - std::string reason = "relay wal with label " + label; - request.__set_reason(reason); + request.__set_reason("relay wal with label " + label); TLoadTxnRollbackResult result; TNetworkAddress master_addr = _exec_env->master_info()->network_address; auto st = ThriftRpcHelper::rpc<FrontendServiceClient>( @@ -193,6 +193,10 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { [[maybe_unused]] auto st = _try_abort_txn(_db_id, label); } #endif + DBUG_EXECUTE_IF("WalTable.replay_wals.stop", { + // LOG(INFO) << "WalTable.replay_wals.stop"; + return Status::InternalError("WalTable.replay_wals.stop"); + }); return _replay_one_wal_with_streamload(wal_id, wal, label); } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index fe53ce7e6cb..52a14f159a6 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -397,6 +397,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ Status result_status; DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { status = Status::InternalError(""); }); + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error", + { status = Status::InternalError("load_error"); }); if (status.ok()) { DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error", { status = Status::InternalError(""); }); @@ -425,6 +427,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ }, 10000L); result_status = Status::create(result.status); + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error", + { result_status = Status::InternalError("commit_success_and_rpc_error"); }); } else { // abort txn TLoadTxnRollbackRequest request; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java index 7c3e7e31c72..928f7fd6eda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java @@ -34,7 +34,8 @@ public class LabelAlreadyUsedException extends DdlException { } public LabelAlreadyUsedException(TransactionState txn) { - super("Label [" + txn.getLabel() + "] has already been used, relate to txn [" + txn.getTransactionId() + "]"); + super("Label [" + txn.getLabel() + "] has already been used, relate to txn [" + txn.getTransactionId() + + "], status [" + txn.getTransactionStatus() + "]."); switch (txn.getTransactionStatus()) { case UNKNOWN: case PREPARE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 77bf3173377..446d7ed26d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1751,6 +1751,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } try { + if (DebugPointUtil.isEnable("FrontendServiceImpl.loadTxnRollback.error")) { + throw new UserException("FrontendServiceImpl.loadTxnRollback.error"); + } loadTxnRollbackImpl(request); } catch (MetaNotFoundException e) { LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e); diff --git a/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv b/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv new file mode 100644 index 00000000000..6ab7bd6bcdf --- /dev/null +++ b/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv @@ -0,0 +1,5 @@ +1,1 +2,2 +3,3 +4,4 +5,5 \ No newline at end of file diff --git a/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy b/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy new file mode 100644 index 00000000000..d39bdd9d4a9 --- /dev/null +++ b/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("replay_wal_restart_fe") { + def check_schema_change = { state -> + for (int i = 0; i < 30; i++) { + def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_2' order by CreateTime desc;" + assertTrue(jobs.size() >= 1) + logger.info("alter job: ${jobs[0]}") + if (jobs[0].State == state) { + break + } + sleep(1000) + } + } + + def options = new ClusterOptions() + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.feConfigs.add('sys_log_verbose_modules=org.apache.doris') + options.beConfigs.add('sys_log_verbose_modules=*') + options.beConfigs.add('enable_java_support=false') + docker(options) { + def result = sql 'SELECT DATABASE()' + + // group commit load error and stop replay + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error") + GetDebugPoint().enableDebugPointForAllBEs("WalTable.replay_wals.stop") + + // 1 wal need to replay + sql 'CREATE TABLE tbl_2 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "group_commit_interval_ms"="1000")' + sql 'SET GROUP_COMMIT = ASYNC_MODE' + sql 'INSERT INTO tbl_2 VALUES (1, 2)' + + // do schema change + sql 'ALTER TABLE tbl_2 ORDER BY (k2, k1)' + check_schema_change('RUNNING') + + // stop be, restart fe, start be + cluster.stopBackends() + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + check_schema_change('RUNNING') + cluster.startBackends() + + // check schema change status and row count + check_schema_change('FINISHED') + for (int i = 0; i < 30; i++) { + result = sql "select count(*) from tbl_2" + logger.info("rowCount: ${result}") + if (result[0][0] >= 1) { + break + } + sleep(1000) + } + order_qt_select_1 'SELECT * FROM tbl_2' + } +} diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy new file mode 100644 index 00000000000..da7bcadc11a --- /dev/null +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_group_commit_replay_wal", "nonConcurrent") { + def tableName = "test_group_commit_replay_wal" + + def getRowCount = { expectedRowCount -> + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until( + { + def result = sql "select count(*) from ${tableName}" + logger.info("table: ${tableName}, rowCount: ${result}") + return result[0][0] == expectedRowCount + } + ) + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 5 + properties("replication_num" = "1", "group_commit_interval_ms"="2000") + """ + + // 1. load success but commit rpc timeout + // 2. should skip replay because of fe throw LabelAlreadyUsedException and txn status is VISIBLE + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error") + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + file 'group_commit_wal_msg.csv' + time 10000 + } + getRowCount(5) + // check wal count is 0 + sleep(5000) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // load fail and abort fail, wal should not be deleted and retry + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error") + GetDebugPoint().enableDebugPointForAllFEs("FrontendServiceImpl.loadTxnRollback.error") + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + file 'group_commit_wal_msg.csv' + time 10000 + } + getRowCount(5) + sleep(10000) // wal replay but all failed + getRowCount(5) + // check wal count is 1 + + GetDebugPoint().clearDebugPointsForAllFEs() + getRowCount(10) + // check wal count is 0 + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org