This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 329d57fdd7c [regression](move-memtable) test LoadStream 
on_idle_timeout (#29354)
329d57fdd7c is described below

commit 329d57fdd7cc68304a74bb8c3389e693403b458d
Author: zhengyu <freeman.zhang1...@gmail.com>
AuthorDate: Wed Jan 3 14:07:51 2024 +0800

    [regression](move-memtable) test LoadStream on_idle_timeout (#29354)
    
    Signed-off-by: freemandealer <freeman.zhang1...@gmail.com>
---
 be/src/common/config.cpp                           |  2 +-
 be/src/runtime/load_stream.cpp                     |  2 +-
 be/src/service/internal_service.cpp                |  3 +-
 be/src/vec/sink/load_stream_stub.cpp               |  5 ++
 .../test_load_stream_fault_injection.groovy        | 59 +++++++++++++++++++++-
 5 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ddb235130bc..f3e8df44e17 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -777,7 +777,7 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
 DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min
 
 // idle timeout for load stream in ms
-DEFINE_Int64(load_stream_idle_timeout_ms, "600000");
+DEFINE_mInt64(load_stream_idle_timeout_ms, "600000");
 // brpc streaming max_buf_size in bytes
 DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
 // brpc streaming messages_in_batch
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 3aad2566575..1bc7e7b6637 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -454,7 +454,7 @@ Status LoadStream::_append_data(const PStreamHeader& 
header, butil::IOBuf* data)
     IndexStreamSharedPtr index_stream;
 
     int64_t index_id = header.index_id();
-    DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_indexid",
+    DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
                     { index_id = UNKNOWN_ID_FOR_TEST; });
     auto it = _index_streams_map.find(index_id);
     if (it == _index_streams_map.end()) {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index a92775a6f6e..d27db896ccc 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -391,8 +391,7 @@ void 
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
         }
 
         stream_options.handler = load_stream.get();
-        // TODO : set idle timeout
-        // stream_options.idle_timeout_ms =
+        stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms;
 
         StreamId streamid;
         if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 9ad8d8805ca..939c88e0b65 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -21,6 +21,7 @@
 
 #include "olap/rowset/rowset_writer.h"
 #include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
 #include "util/network_util.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
@@ -330,6 +331,10 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) 
{
         int ret;
         {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+            
DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.delay_before_send", {
+                int64_t delay_ms = dp->param<int64>("delay_ms", 1000);
+                bthread_usleep(delay_ms * 1000);
+            });
             ret = brpc::StreamWrite(_stream_id, buf);
         }
         DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", 
{ ret = EPIPE; });
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
index 09f271fead5..f58c1226fd8 100644
--- 
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
@@ -67,6 +67,52 @@ suite("load_stream_fault_injection", "nonConcurrent") {
         file "baseall.txt"
     }
 
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_params = [string:[:]]
+
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    def set_be_param = { paramName, paramValue ->
+        // for eache be node, set paramName=paramValue
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
paramValue))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def reset_be_param = { paramName ->
+        // for eache be node, reset paramName to default
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def original_value = backendId_to_params.get(id).get(paramName)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
original_value))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def get_be_param = { paramName ->
+        // for eache be node, get param value by default
+        def paramValue = ""
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            // get the config value from be
+            def (code, out, err) = curl("GET", 
String.format("http://%s:%s/api/show_config?conf_item=%s";, beIp, bePort, 
paramName))
+            assertTrue(code == 0)
+            assertTrue(out.contains(paramName))
+            // parsing
+            def resultList = parseJson(out)[0]
+            assertTrue(resultList.size() == 4)
+            // get original value
+            paramValue = resultList[2]
+            backendId_to_params.get(id, [:]).put(paramName, paramValue)
+        }
+    }
+
     def load_with_injection = { injection, expect_errmsg ->
         try {
             GetDebugPoint().enableDebugPointForAllBEs(injection)
@@ -110,10 +156,21 @@ suite("load_stream_fault_injection", "nonConcurrent") {
     // LoadStream add_segment meet unknown segid in request header
     load_with_injection("TabletStream.add_segment.unknown_segid", "")
     // LoadStream append_data meet unknown index id in request header
-    load_with_injection("abletStream.add_segment.unknown_indexid", "")
+    load_with_injection("TabletStream._append_data.unknown_indexid", "")
     // LoadStream dispatch meet unknown load id
     load_with_injection("LoadStream._dispatch.unknown_loadid", "")
     // LoadStream dispatch meet unknown src id
     load_with_injection("LoadStream._dispatch.unknown_srcid", "")
+
+    // LoadStream meets StreamRPC idle timeout
+    get_be_param("load_stream_idle_timeout_ms")
+    set_be_param("load_stream_idle_timeout_ms", 500)
+    try {
+        
load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "")
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+    } finally {
+        reset_be_param("load_stream_idle_timeout_ms")
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to