This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 453c06cb247 [fix](move-memtable) set idle timeout equal to load timeout (#29839) 453c06cb247 is described below commit 453c06cb24750743e3ef5d9957edaf801015e00d Author: Kaijie Chen <c...@apache.org> AuthorDate: Fri Jan 12 21:34:36 2024 +0800 [fix](move-memtable) set idle timeout equal to load timeout (#29839) --- be/src/common/config.cpp | 2 -- be/src/common/config.h | 2 -- be/src/service/internal_service.cpp | 4 +++- be/src/vec/sink/load_stream_stub.cpp | 5 +++-- be/src/vec/sink/load_stream_stub.h | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 +++-- gensrc/proto/internal_service.proto | 1 + .../test_load_stream_fault_injection.groovy | 20 +++++++++++++++----- 8 files changed, 26 insertions(+), 15 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a184d9aadf6..49eb046e7ae 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -776,8 +776,6 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s // timeout for load stream close wait in ms DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min -// idle timeout for load stream in ms -DEFINE_mInt64(load_stream_idle_timeout_ms, "600000"); // brpc streaming max_buf_size in bytes DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB // brpc streaming messages_in_batch diff --git a/be/src/common/config.h b/be/src/common/config.h index bd9852f768f..e9a5d38fe6a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -829,8 +829,6 @@ DECLARE_Int64(open_load_stream_timeout_ms); // timeout for load stream close wait in ms DECLARE_Int64(close_load_stream_timeout_ms); -// idle timeout for load stream in ms -DECLARE_Int64(load_stream_idle_timeout_ms); // brpc streaming max_buf_size in bytes DECLARE_Int64(load_stream_max_buf_size); // brpc streaming messages_in_batch diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 18a8325e4cb..e7255f017b7 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -391,7 +391,9 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con } stream_options.handler = load_stream.get(); - stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms; + stream_options.idle_timeout_ms = request->idle_timeout_ms(); + DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout", + { stream_options.idle_timeout_ms = 1; }); StreamId streamid; if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 59fc29c60f8..40ce75d24e6 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -151,7 +151,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector<PTabletID>& tablets_for_schema, int total_streams, - bool enable_profile) { + int64_t idle_timeout_ms, bool enable_profile) { std::unique_lock<bthread::Mutex> lock(_open_mutex); if (_is_init.load()) { return Status::OK(); @@ -160,7 +160,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self, std::string host_port = get_host_port(node_info.host, node_info.brpc_port); brpc::StreamOptions opt; opt.max_buf_size = config::load_stream_max_buf_size; - opt.idle_timeout_ms = config::load_stream_idle_timeout_ms; + opt.idle_timeout_ms = idle_timeout_ms; opt.messages_in_batch = config::load_stream_messages_in_batch; opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self); brpc::Controller cntl; @@ -174,6 +174,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self, request.set_txn_id(txn_id); request.set_enable_profile(enable_profile); request.set_total_streams(total_streams); + request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); for (auto& tablet : tablets_for_schema) { *request.add_tablets() = tablet; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 81ec99fa451..6aae778dc93 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -125,7 +125,7 @@ public: BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector<PTabletID>& tablets_for_schema, int total_streams, - bool enable_profile); + int64_t idle_timeout_ms, bool enable_profile); // for mock this class in UT #ifdef BE_TEST diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 12636088871..02b40549253 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -275,18 +275,19 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& st if (node_info == nullptr) { return Status::InternalError("Unknown node {} in tablet location", dst_id); } + auto idle_timeout_ms = _state->execution_timeout() * 1000; // get tablet schema from each backend only in the 1st stream for (auto& stream : streams.streams() | std::ranges::views::take(1)) { const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id]; RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, - _total_streams, _state->enable_profile())); + _total_streams, idle_timeout_ms, _state->enable_profile())); } // for the rest streams, open without getting tablet schema for (auto& stream : streams.streams() | std::ranges::views::drop(1)) { RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, {}, _total_streams, - _state->enable_profile())); + idle_timeout_ms, _state->enable_profile())); } return Status::OK(); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index c91a4865ca7..f197cd162dc 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -754,6 +754,7 @@ message POpenLoadStreamRequest { repeated PTabletID tablets = 5; optional bool enable_profile = 6 [default = false]; optional int64 total_streams = 7; + optional int64 idle_timeout_ms = 8; } message PTabletSchemaWithIndex { diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy index f5dcbb1b7e4..d5cd8097f96 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy @@ -125,6 +125,20 @@ suite("load_stream_fault_injection", "nonConcurrent") { } } + def load_with_injection2 = { injection1, injection2, error_msg-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection1) + GetDebugPoint().enableDebugPointForAllBEs(injection2) + sql "insert into test select * from baseall where k1 <= 3" + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg)) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection1) + GetDebugPoint().disableDebugPointForAllBEs(injection2) + } + } + // LoadStreamWriter create file failed load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "") // LoadStreamWriter append_data meet null file writer error @@ -161,14 +175,10 @@ suite("load_stream_fault_injection", "nonConcurrent") { load_with_injection("LoadStream._dispatch.unknown_srcid", "") // LoadStream meets StreamRPC idle timeout - get_be_param("load_stream_idle_timeout_ms") - set_be_param("load_stream_idle_timeout_ms", 500) try { - load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "") + load_with_injection2("LoadStreamStub._send_with_retry.delay_before_send", "PInternalServiceImpl.open_load_stream.set_idle_timeout", "") } catch(Exception e) { logger.info(e.getMessage()) - } finally { - reset_be_param("load_stream_idle_timeout_ms") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org