This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 329d57fdd7c [regression](move-memtable) test LoadStream on_idle_timeout (#29354) 329d57fdd7c is described below commit 329d57fdd7cc68304a74bb8c3389e693403b458d Author: zhengyu <freeman.zhang1...@gmail.com> AuthorDate: Wed Jan 3 14:07:51 2024 +0800 [regression](move-memtable) test LoadStream on_idle_timeout (#29354) Signed-off-by: freemandealer <freeman.zhang1...@gmail.com> --- be/src/common/config.cpp | 2 +- be/src/runtime/load_stream.cpp | 2 +- be/src/service/internal_service.cpp | 3 +- be/src/vec/sink/load_stream_stub.cpp | 5 ++ .../test_load_stream_fault_injection.groovy | 59 +++++++++++++++++++++- 5 files changed, 66 insertions(+), 5 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index ddb235130bc..f3e8df44e17 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -777,7 +777,7 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min // idle timeout for load stream in ms -DEFINE_Int64(load_stream_idle_timeout_ms, "600000"); +DEFINE_mInt64(load_stream_idle_timeout_ms, "600000"); // brpc streaming max_buf_size in bytes DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB // brpc streaming messages_in_batch diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 3aad2566575..1bc7e7b6637 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -454,7 +454,7 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) IndexStreamSharedPtr index_stream; int64_t index_id = header.index_id(); - DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_indexid", + DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid", { index_id = UNKNOWN_ID_FOR_TEST; }); auto it = _index_streams_map.find(index_id); if (it == _index_streams_map.end()) { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a92775a6f6e..d27db896ccc 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -391,8 +391,7 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con } stream_options.handler = load_stream.get(); - // TODO : set idle timeout - // stream_options.idle_timeout_ms = + stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms; StreamId streamid; if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 9ad8d8805ca..939c88e0b65 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -21,6 +21,7 @@ #include "olap/rowset/rowset_writer.h" #include "util/brpc_client_cache.h" +#include "util/debug_points.h" #include "util/network_util.h" #include "util/thrift_util.h" #include "util/uid_util.h" @@ -330,6 +331,10 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { int ret; { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.delay_before_send", { + int64_t delay_ms = dp->param<int64>("delay_ms", 1000); + bthread_usleep(delay_ms * 1000); + }); ret = brpc::StreamWrite(_stream_id, buf); } DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; }); diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy index 09f271fead5..f58c1226fd8 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy @@ -67,6 +67,52 @@ suite("load_stream_fault_injection", "nonConcurrent") { file "baseall.txt" } + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string:[:]] + + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + def load_with_injection = { injection, expect_errmsg -> try { GetDebugPoint().enableDebugPointForAllBEs(injection) @@ -110,10 +156,21 @@ suite("load_stream_fault_injection", "nonConcurrent") { // LoadStream add_segment meet unknown segid in request header load_with_injection("TabletStream.add_segment.unknown_segid", "") // LoadStream append_data meet unknown index id in request header - load_with_injection("abletStream.add_segment.unknown_indexid", "") + load_with_injection("TabletStream._append_data.unknown_indexid", "") // LoadStream dispatch meet unknown load id load_with_injection("LoadStream._dispatch.unknown_loadid", "") // LoadStream dispatch meet unknown src id load_with_injection("LoadStream._dispatch.unknown_srcid", "") + + // LoadStream meets StreamRPC idle timeout + get_be_param("load_stream_idle_timeout_ms") + set_be_param("load_stream_idle_timeout_ms", 500) + try { + load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "") + } catch(Exception e) { + logger.info(e.getMessage()) + } finally { + reset_be_param("load_stream_idle_timeout_ms") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org