This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new eb84992bbf2 [Enhancement](wal) Add fault injection case for wal back pressure (#29689) eb84992bbf2 is described below commit eb84992bbf2419a649aa385fd83bafcb914b2128 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Tue Jan 9 14:52:51 2024 +0800 [Enhancement](wal) Add fault injection case for wal back pressure (#29689) --- be/src/runtime/group_commit_mgr.cpp | 6 ++ ...m_back_pressure_time_out_fault_injection.groovy | 110 +++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index fc4a2df427f..c7333e21d64 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -20,10 +20,13 @@ #include <gen_cpp/Types_types.h> #include <glog/logging.h> +#include <chrono> + #include "client_cache.h" #include "common/config.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" +#include "util/debug_points.h" #include "util/thrift_rpc_helper.h" namespace doris { @@ -33,6 +36,9 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::unique_lock l(mutex); RETURN_IF_ERROR(status); auto start = std::chrono::steady_clock::now(); + DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", { + start = std::chrono::steady_clock::now() - std::chrono::milliseconds(120000); + }); while (!runtime_state->is_cancelled() && status.ok() && _all_block_queues_bytes->load(std::memory_order_relaxed) >= config::group_commit_queue_mem_limit) { diff --git a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy new file mode 100644 index 00000000000..8e9dd22f790 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_wal_mem_back_pressure_time_out_fault_injection","nonConcurrent") { + + + def tableName = "wal_test" + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 5 + properties("replication_num" = "1") + """ + + def enable_back_pressure = { + try { + def fes = sql_return_maparray "show frontends" + def bes = sql_return_maparray "show backends" + logger.info("frontends: ${fes}") + def fe = fes[0] + def be = bes[0] + def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/" + logger.info("observer url: " + url) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""") + String command = sb.toString() + logger.info(command) + def process = command.execute() + + sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""") + command = sb.toString() + logger.info(command) + process = command.execute() + } finally { + } + } + + def disable_back_pressure = { + try { + def fes = sql_return_maparray "show frontends" + def bes = sql_return_maparray "show backends" + logger.info("frontends: ${fes}") + def fe = fes[0] + def be = bes[0] + def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/" + logger.info("observer url: " + url) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": \\"false\\"}}\"""") + String command = sb.toString() + logger.info(command) + def process = command.execute() + + sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"10000\\",\\"persist\\": \\"false\\"}}\"""") + command = sb.toString() + logger.info(command) + process = command.execute() + } finally { + } + } + + GetDebugPoint().clearDebugPointsForAllBEs() + enable_back_pressure() + + sql """ set group_commit = async_mode; """ + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.back_pressure_time_out") + sql """insert into ${tableName} values(1,1)""" + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains('Wal memory back pressure wait too much time!')) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue.add_block.back_pressure_time_out") + } + + disable_back_pressure() + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org