This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 82bb3ed50f59eab8e47c0dac8f1c84cc22db549e Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Fri Feb 2 00:12:15 2024 +0800 [Fix](group commit) Fix pre allocated err handling for group commit async load and add regression test #30718 --- be/src/olap/wal/wal_manager.cpp | 8 +- be/src/olap/wal/wal_manager.h | 2 +- be/src/runtime/group_commit_mgr.cpp | 12 +++ be/src/runtime/group_commit_mgr.h | 3 + be/src/vec/sink/group_commit_block_sink.cpp | 16 ++++ .../fault_injection_p0/group_commit_wal_msg.csv | 5 ++ ...oup_commit_async_wal_msg_fault_injection.groovy | 97 ++++++++++++++++++++++ 7 files changed, 138 insertions(+), 5 deletions(-) diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 6d59c6df686..511751693c8 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -352,9 +352,9 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_ } table_ptr->add_wal(wal_id, wal); #ifndef BE_TEST - WARN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)), + WARN_IF_ERROR(update_wal_dir_limit(get_base_wal_path(wal)), "Failed to update wal dir limit while add recover wal!"); - WARN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)), + WARN_IF_ERROR(update_wal_dir_used(get_base_wal_path(wal)), "Failed to update wal dir used while add recove wal!"); #endif return Status::OK(); @@ -413,7 +413,7 @@ Status WalManager::get_wal_dir_available_size(const std::string& wal_dir, size_t return _wal_dirs_info->get_wal_dir_available_size(wal_dir, available_bytes); } -std::string WalManager::_get_base_wal_path(const std::string& wal_path_str) { +std::string WalManager::get_base_wal_path(const std::string& wal_path_str) { io::Path wal_path = wal_path_str; for (int i = 0; i < 3; ++i) { if (!wal_path.has_parent_path()) { @@ -505,7 +505,7 @@ Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t block_que } } erase_wal_queue(table_id, wal_id); - RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0, + RETURN_IF_ERROR(update_wal_dir_pre_allocated(get_base_wal_path(wal_path), 0, block_queue_pre_allocated)); return Status::OK(); } diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h index dfa2859cbe3..db75d991154 100644 --- a/be/src/olap/wal/wal_manager.h +++ b/be/src/olap/wal/wal_manager.h @@ -84,13 +84,13 @@ public: std::shared_ptr<std::condition_variable>& cv); Status wait_replay_wal_finish(int64_t wal_id); Status notify_relay_wal(int64_t wal_id); + static std::string get_base_wal_path(const std::string& wal_path_str); private: // wal back pressure Status _init_wal_dirs_conf(); Status _init_wal_dirs(); Status _init_wal_dirs_info(); - std::string _get_base_wal_path(const std::string& wal_path_str); Status _update_wal_dir_info_thread(); // replay wal diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index a41aef2acf2..88a8150a60f 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -25,6 +25,7 @@ #include "client_cache.h" #include "common/compiler_util.h" #include "common/config.h" +#include "common/status.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "util/debug_points.h" @@ -392,6 +393,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ // status: exec_plan_fragment result // st: commit txn rpc status // result_status: commit txn result + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", + { st = Status::InternalError(""); }); if (status.ok() && st.ok() && (result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) { if (!config::group_commit_wait_replay_wal_finish) { @@ -405,6 +408,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ std::string wal_path; RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->update_wal_dir_pre_allocated( + WalManager::get_base_wal_path(wal_path), 0, + load_block_queue->block_queue_pre_allocated())); } std::stringstream ss; ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label @@ -419,6 +425,12 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ ss << ", rows=" << state->num_rows_load_success(); } LOG(INFO) << ss.str(); + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", { + std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string(); + LOG(INFO) << "debug promise set: " << msg; + ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value( + Status ::InternalError(msg)); + };); return st; } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 49bdd5f87a7..6467ca38561 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -22,6 +22,7 @@ #include <atomic> #include <condition_variable> #include <cstdint> +#include <future> #include <memory> #include <mutex> #include <shared_mutex> @@ -170,6 +171,8 @@ public: const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version); + std::promise<Status> debug_promise; + std::future<Status> debug_future = debug_promise.get_future(); private: ExecEnv* _exec_env = nullptr; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index cc1abaafcf2..f10becd056b 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -19,12 +19,14 @@ #include <gen_cpp/DataSinks_types.h> +#include <future> #include <shared_mutex> #include "common/exception.h" #include "runtime/exec_env.h" #include "runtime/group_commit_mgr.h" #include "runtime/runtime_state.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "vec/exprs/vexpr.h" #include "vec/sink/vtablet_finder.h" @@ -260,6 +262,20 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state, } _is_block_appended = true; _blocks.clear(); + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", { + if (_load_block_queue) { + _load_block_queue->remove_load_id(_load_id); + } + if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for( + std ::chrono ::seconds(60)) == std ::future_status ::ready) { + auto st = ExecEnv::GetInstance()->group_commit_mgr()->debug_future.get(); + ExecEnv::GetInstance()->group_commit_mgr()->debug_promise = std::promise<Status>(); + ExecEnv::GetInstance()->group_commit_mgr()->debug_future = + ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.get_future(); + LOG(INFO) << "debug future output: " << st.to_string(); + RETURN_IF_ERROR(st); + } + }); return Status::OK(); } diff --git a/regression-test/data/fault_injection_p0/group_commit_wal_msg.csv b/regression-test/data/fault_injection_p0/group_commit_wal_msg.csv new file mode 100644 index 00000000000..6ab7bd6bcdf --- /dev/null +++ b/regression-test/data/fault_injection_p0/group_commit_wal_msg.csv @@ -0,0 +1,5 @@ +1,1 +2,2 +3,3 +4,4 +5,5 \ No newline at end of file diff --git a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy new file mode 100644 index 00000000000..0ad2d8b8f27 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") { + + + def tableName = "wal_test" + + // test successful group commit async load + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 5 + properties("replication_num" = "1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + + def exception = false; + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + file 'group_commit_wal_msg.csv' + time 10000 + } + assertFalse(true); + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains('pre allocated 0 Bytes')) + exception = true; + } finally { + GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") + assertTrue(exception) + } + + // test failed group commit async load + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 5 + properties("replication_num" = "1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + + exception = false; + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status") + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + file 'group_commit_wal_msg.csv' + time 10000 + } + assertFalse(true); + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains('pre allocated 0 Bytes')) + exception = true; + } finally { + GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg") + GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.err_status") + assertTrue(exception) + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org