This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6217073db7b216acde0f833d5e39a838e35bd530 Author: meiyi <myime...@gmail.com> AuthorDate: Wed Aug 7 09:56:15 2024 +0800 [fix](group commit) fix repaly wal check label status (#38883) ## Proposed changes When replay wal, it firstly abort the txn with the label but does not check the abort result. And when begin txn of replay, if FE returns `LabelAlreadyUsedException`, it consider the load is success in previous group commit load or repaly wal, and delete this wal directly. But `LabelAlreadyUsedException` means there is a txn with this label, the txn may be in `PREPARE / RUNNING / COMMITTED / VISIBLE` status(the abort txn in first step may fail), so replay wal should check both `LabelAlreadyUsedException` and txn status is `COMMITTED / VISIBLE`. This pr also add a case for replay wal with schema change. --- be/src/olap/wal/wal_table.cpp | 10 ++- be/src/runtime/group_commit_mgr.cpp | 4 + .../doris/common/LabelAlreadyUsedException.java | 3 +- .../apache/doris/service/FrontendServiceImpl.java | 3 + .../group_commit/group_commit_wal_msg.csv | 5 ++ .../group_commit/replay_wal_restart_fe.groovy | 80 ++++++++++++++++++ .../test_group_commit_replay_wal.groovy | 96 ++++++++++++++++++++++ 7 files changed, 197 insertions(+), 4 deletions(-) diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index ef98bb58ae4..9573f81c846 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -92,7 +92,8 @@ Status WalTable::_relay_wal_one_by_one() { auto msg = st.msg(); if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() || st.is<ErrorCode::DATA_QUALITY_ERROR>() || - msg.find("LabelAlreadyUsedException") != msg.npos) { + (msg.find("LabelAlreadyUsedException") != msg.npos && + (msg.find("[COMMITTED]") != msg.npos || msg.find("[VISIBLE]") != msg.npos))) { LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); need_delete_wals.push_back(wal_info); @@ -166,8 +167,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) { request.__set_auth_code(0); // this is a fake, fe not check it now request.__set_db_id(db_id); request.__set_label(label); - std::string reason = "relay wal with label " + label; - request.__set_reason(reason); + request.__set_reason("relay wal with label " + label); TLoadTxnRollbackResult result; TNetworkAddress master_addr = _exec_env->master_info()->network_address; auto st = ThriftRpcHelper::rpc<FrontendServiceClient>( @@ -195,6 +195,10 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { [[maybe_unused]] auto st = _try_abort_txn(_db_id, label); } #endif + DBUG_EXECUTE_IF("WalTable.replay_wals.stop", { + // LOG(INFO) << "WalTable.replay_wals.stop"; + return Status::InternalError("WalTable.replay_wals.stop"); + }); return _replay_one_wal_with_streamload(wal_id, wal, label); } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 30885fa1ac9..d7689cfe940 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -435,6 +435,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ Status result_status; DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { status = Status::InternalError(""); }); + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error", + { status = Status::InternalError("load_error"); }); if (status.ok()) { // commit txn TLoadTxnCommitRequest request; @@ -474,6 +476,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ .error(result_status); retry_times++; } + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error", + { result_status = Status::InternalError("commit_success_and_rpc_error"); }); } else { // abort txn TLoadTxnRollbackRequest request; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java index d739f2032f3..8c508809d59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java @@ -38,7 +38,8 @@ public class LabelAlreadyUsedException extends DdlException { } public LabelAlreadyUsedException(TransactionState txn) { - super("Label [" + txn.getLabel() + "] has already been used, relate to txn [" + txn.getTransactionId() + "]"); + super("Label [" + txn.getLabel() + "] has already been used, relate to txn [" + txn.getTransactionId() + + "], status [" + txn.getTransactionStatus() + "]."); switch (txn.getTransactionStatus()) { case UNKNOWN: case PREPARE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 58754d69fd0..07f63a4cb80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1845,6 +1845,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } try { + if (DebugPointUtil.isEnable("FrontendServiceImpl.loadTxnRollback.error")) { + throw new UserException("FrontendServiceImpl.loadTxnRollback.error"); + } loadTxnRollbackImpl(request); } catch (MetaNotFoundException e) { LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e); diff --git a/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv b/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv new file mode 100644 index 00000000000..6ab7bd6bcdf --- /dev/null +++ b/regression-test/data/insert_p0/group_commit/group_commit_wal_msg.csv @@ -0,0 +1,5 @@ +1,1 +2,2 +3,3 +4,4 +5,5 \ No newline at end of file diff --git a/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy b/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy new file mode 100644 index 00000000000..d39bdd9d4a9 --- /dev/null +++ b/regression-test/suites/insert_p0/group_commit/replay_wal_restart_fe.groovy @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("replay_wal_restart_fe") { + def check_schema_change = { state -> + for (int i = 0; i < 30; i++) { + def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_2' order by CreateTime desc;" + assertTrue(jobs.size() >= 1) + logger.info("alter job: ${jobs[0]}") + if (jobs[0].State == state) { + break + } + sleep(1000) + } + } + + def options = new ClusterOptions() + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.feConfigs.add('sys_log_verbose_modules=org.apache.doris') + options.beConfigs.add('sys_log_verbose_modules=*') + options.beConfigs.add('enable_java_support=false') + docker(options) { + def result = sql 'SELECT DATABASE()' + + // group commit load error and stop replay + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error") + GetDebugPoint().enableDebugPointForAllBEs("WalTable.replay_wals.stop") + + // 1 wal need to replay + sql 'CREATE TABLE tbl_2 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "group_commit_interval_ms"="1000")' + sql 'SET GROUP_COMMIT = ASYNC_MODE' + sql 'INSERT INTO tbl_2 VALUES (1, 2)' + + // do schema change + sql 'ALTER TABLE tbl_2 ORDER BY (k2, k1)' + check_schema_change('RUNNING') + + // stop be, restart fe, start be + cluster.stopBackends() + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + check_schema_change('RUNNING') + cluster.startBackends() + + // check schema change status and row count + check_schema_change('FINISHED') + for (int i = 0; i < 30; i++) { + result = sql "select count(*) from tbl_2" + logger.info("rowCount: ${result}") + if (result[0][0] >= 1) { + break + } + sleep(1000) + } + order_qt_select_1 'SELECT * FROM tbl_2' + } +} diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy new file mode 100644 index 00000000000..da7bcadc11a --- /dev/null +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_group_commit_replay_wal", "nonConcurrent") { + def tableName = "test_group_commit_replay_wal" + + def getRowCount = { expectedRowCount -> + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until( + { + def result = sql "select count(*) from ${tableName}" + logger.info("table: ${tableName}, rowCount: ${result}") + return result[0][0] == expectedRowCount + } + ) + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 5 + properties("replication_num" = "1", "group_commit_interval_ms"="2000") + """ + + // 1. load success but commit rpc timeout + // 2. should skip replay because of fe throw LabelAlreadyUsedException and txn status is VISIBLE + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error") + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + file 'group_commit_wal_msg.csv' + time 10000 + } + getRowCount(5) + // check wal count is 0 + sleep(5000) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // load fail and abort fail, wal should not be deleted and retry + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error") + GetDebugPoint().enableDebugPointForAllFEs("FrontendServiceImpl.loadTxnRollback.error") + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + file 'group_commit_wal_msg.csv' + time 10000 + } + getRowCount(5) + sleep(10000) // wal replay but all failed + getRowCount(5) + // check wal count is 1 + + GetDebugPoint().clearDebugPointsForAllFEs() + getRowCount(10) + // check wal count is 0 + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org