This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 453c06cb247 [fix](move-memtable) set idle timeout equal to load 
timeout (#29839)
453c06cb247 is described below

commit 453c06cb24750743e3ef5d9957edaf801015e00d
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Fri Jan 12 21:34:36 2024 +0800

    [fix](move-memtable) set idle timeout equal to load timeout (#29839)
---
 be/src/common/config.cpp                             |  2 --
 be/src/common/config.h                               |  2 --
 be/src/service/internal_service.cpp                  |  4 +++-
 be/src/vec/sink/load_stream_stub.cpp                 |  5 +++--
 be/src/vec/sink/load_stream_stub.h                   |  2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp         |  5 +++--
 gensrc/proto/internal_service.proto                  |  1 +
 .../test_load_stream_fault_injection.groovy          | 20 +++++++++++++++-----
 8 files changed, 26 insertions(+), 15 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a184d9aadf6..49eb046e7ae 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -776,8 +776,6 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
 // timeout for load stream close wait in ms
 DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min
 
-// idle timeout for load stream in ms
-DEFINE_mInt64(load_stream_idle_timeout_ms, "600000");
 // brpc streaming max_buf_size in bytes
 DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
 // brpc streaming messages_in_batch
diff --git a/be/src/common/config.h b/be/src/common/config.h
index bd9852f768f..e9a5d38fe6a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -829,8 +829,6 @@ DECLARE_Int64(open_load_stream_timeout_ms);
 // timeout for load stream close wait in ms
 DECLARE_Int64(close_load_stream_timeout_ms);
 
-// idle timeout for load stream in ms
-DECLARE_Int64(load_stream_idle_timeout_ms);
 // brpc streaming max_buf_size in bytes
 DECLARE_Int64(load_stream_max_buf_size);
 // brpc streaming messages_in_batch
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 18a8325e4cb..e7255f017b7 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -391,7 +391,9 @@ void 
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
         }
 
         stream_options.handler = load_stream.get();
-        stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms;
+        stream_options.idle_timeout_ms = request->idle_timeout_ms();
+        
DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout",
+                        { stream_options.idle_timeout_ms = 1; });
 
         StreamId streamid;
         if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 59fc29c60f8..40ce75d24e6 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -151,7 +151,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> 
self,
                             const NodeInfo& node_info, int64_t txn_id,
                             const OlapTableSchemaParam& schema,
                             const std::vector<PTabletID>& tablets_for_schema, 
int total_streams,
-                            bool enable_profile) {
+                            int64_t idle_timeout_ms, bool enable_profile) {
     std::unique_lock<bthread::Mutex> lock(_open_mutex);
     if (_is_init.load()) {
         return Status::OK();
@@ -160,7 +160,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> 
self,
     std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
     brpc::StreamOptions opt;
     opt.max_buf_size = config::load_stream_max_buf_size;
-    opt.idle_timeout_ms = config::load_stream_idle_timeout_ms;
+    opt.idle_timeout_ms = idle_timeout_ms;
     opt.messages_in_batch = config::load_stream_messages_in_batch;
     opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self);
     brpc::Controller cntl;
@@ -174,6 +174,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> 
self,
     request.set_txn_id(txn_id);
     request.set_enable_profile(enable_profile);
     request.set_total_streams(total_streams);
+    request.set_idle_timeout_ms(idle_timeout_ms);
     schema.to_protobuf(request.mutable_schema());
     for (auto& tablet : tablets_for_schema) {
         *request.add_tablets() = tablet;
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 81ec99fa451..6aae778dc93 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -125,7 +125,7 @@ public:
                 BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
                 int64_t txn_id, const OlapTableSchemaParam& schema,
                 const std::vector<PTabletID>& tablets_for_schema, int 
total_streams,
-                bool enable_profile);
+                int64_t idle_timeout_ms, bool enable_profile);
 
 // for mock this class in UT
 #ifdef BE_TEST
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 12636088871..02b40549253 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -275,18 +275,19 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t 
dst_id, LoadStreams& st
     if (node_info == nullptr) {
         return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
     }
+    auto idle_timeout_ms = _state->execution_timeout() * 1000;
     // get tablet schema from each backend only in the 1st stream
     for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
         const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
         RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
                                      *node_info, _txn_id, *_schema, 
tablets_for_schema,
-                                     _total_streams, 
_state->enable_profile()));
+                                     _total_streams, idle_timeout_ms, 
_state->enable_profile()));
     }
     // for the rest streams, open without getting tablet schema
     for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
         RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
                                      *node_info, _txn_id, *_schema, {}, 
_total_streams,
-                                     _state->enable_profile()));
+                                     idle_timeout_ms, 
_state->enable_profile()));
     }
     return Status::OK();
 }
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index c91a4865ca7..f197cd162dc 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -754,6 +754,7 @@ message POpenLoadStreamRequest {
     repeated PTabletID tablets = 5;
     optional bool enable_profile = 6 [default = false];
     optional int64 total_streams = 7;
+    optional int64 idle_timeout_ms = 8;
 }
 
 message PTabletSchemaWithIndex {
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
index f5dcbb1b7e4..d5cd8097f96 100644
--- 
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
@@ -125,6 +125,20 @@ suite("load_stream_fault_injection", "nonConcurrent") {
         }
     }
 
+    def load_with_injection2 = { injection1, injection2, error_msg->
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs(injection1)
+            GetDebugPoint().enableDebugPointForAllBEs(injection2)
+            sql "insert into test select * from baseall where k1 <= 3"
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            assertTrue(e.getMessage().contains(error_msg))
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs(injection1)
+            GetDebugPoint().disableDebugPointForAllBEs(injection2)
+        }
+    }
+
     // LoadStreamWriter create file failed
     load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", 
"")
     // LoadStreamWriter append_data meet null file writer error
@@ -161,14 +175,10 @@ suite("load_stream_fault_injection", "nonConcurrent") {
     load_with_injection("LoadStream._dispatch.unknown_srcid", "")
 
     // LoadStream meets StreamRPC idle timeout
-    get_be_param("load_stream_idle_timeout_ms")
-    set_be_param("load_stream_idle_timeout_ms", 500)
     try {
-        
load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "")
+        
load_with_injection2("LoadStreamStub._send_with_retry.delay_before_send", 
"PInternalServiceImpl.open_load_stream.set_idle_timeout", "")
     } catch(Exception e) {
         logger.info(e.getMessage())
-    } finally {
-        reset_be_param("load_stream_idle_timeout_ms")
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to