This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 7fc78e3f87e [opt](brpc) check and remove unavailable brpc stubs (#43212) (#43859) 7fc78e3f87e is described below commit 7fc78e3f87e0c3fd85dd1de9fa55e1d3f27a6bf4 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Thu Nov 14 19:52:06 2024 +0800 [opt](brpc) check and remove unavailable brpc stubs (#43212) (#43859) --- be/src/common/config.cpp | 5 +++ be/src/common/config.h | 4 ++ be/src/runtime/fragment_mgr.cpp | 68 +++++++++++++++++++++++++++++++- be/src/runtime/fragment_mgr.h | 9 +++++ be/src/runtime/query_context.h | 23 +++++++++++ be/src/vec/sink/vdata_stream_sender.cpp | 9 +++++ regression-test/pipeline/p0/conf/be.conf | 1 + regression-test/pipeline/p1/conf/be.conf | 2 + 8 files changed, 120 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0878d27e833..c239dc3e72d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -529,6 +529,9 @@ DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1"); //https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely DEFINE_Bool(enable_brpc_builtin_services, "true"); +// Enable brpc connection check +DEFINE_Bool(enable_brpc_connection_check, "false"); + // The maximum amount of data that can be processed by a stream load DEFINE_mInt64(streaming_load_max_mb, "102400"); // Some data formats, such as JSON, cannot be streamed. @@ -964,6 +967,8 @@ DEFINE_mInt64(brpc_streaming_client_batch_bytes, "262144"); // so as to avoid occupying the execution thread for a long time. DEFINE_mInt32(max_fragment_start_wait_time_seconds, "30"); +DEFINE_mInt32(fragment_mgr_cancel_worker_interval_seconds, "1"); + // Node role tag for backend. Mix role is the default role, and computation role have no // any tablet. DEFINE_String(be_node_role, "mix"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a282abb37ee..289c56464f3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1014,6 +1014,8 @@ DECLARE_mInt64(brpc_streaming_client_batch_bytes); DECLARE_Bool(enable_brpc_builtin_services); +DECLARE_Bool(enable_brpc_connection_check); + // Max waiting time to wait the "plan fragment start" rpc. // If timeout, the fragment will be cancelled. // This parameter is usually only used when the FE loses connection, @@ -1021,6 +1023,8 @@ DECLARE_Bool(enable_brpc_builtin_services); // so as to avoid occupying the execution thread for a long time. DECLARE_mInt32(max_fragment_start_wait_time_seconds); +DECLARE_Int32(fragment_mgr_cancel_worker_interval_seconds); + // Node role tag for backend. Mix role is the default role, and computation role have no // any tablet. DECLARE_String(be_node_role); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3a4c752ccae..bcb48559178 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -17,6 +17,7 @@ #include "runtime/fragment_mgr.h" +#include <brpc/controller.h> #include <bvar/latency_recorder.h> #include <exprs/runtime_filter.h> #include <fmt/format.h> @@ -84,6 +85,7 @@ #include "runtime/workload_group/workload_group_manager.h" #include "runtime/workload_management/workload_query_info.h" #include "service/backend_options.h" +#include "util/brpc_client_cache.h" #include "util/debug_points.h" #include "util/debug_util.h" #include "util/doris_metrics.h" @@ -1284,6 +1286,7 @@ void FragmentMgr::cancel_worker() { } VecDateTimeValue now = VecDateTimeValue::local_time(); + std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries; { std::lock_guard<std::mutex> lock(_lock); for (auto& fragment_instance_itr : _fragment_instance_map) { @@ -1291,6 +1294,7 @@ void FragmentMgr::cancel_worker() { queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id()); } } + for (auto& pipeline_itr : _pipeline_map) { if (pipeline_itr.second->is_timeout(now)) { std::vector<TUniqueId> ins_ids; @@ -1308,6 +1312,18 @@ void FragmentMgr::cancel_worker() { LOG_WARNING("Query {} is timeout", print_id(it->first)); it = _query_ctx_map.erase(it); } else { + if (config::enable_brpc_connection_check) { + auto brpc_stubs = it->second->get_using_brpc_stubs(); + for (auto& item : brpc_stubs) { + if (!brpc_stub_with_queries.contains(item.second)) { + brpc_stub_with_queries.emplace(item.second, + BrpcItem {item.first, {it->second}}); + } else { + brpc_stub_with_queries[item.second].queries.emplace_back( + it->second); + } + } + } ++it; } } @@ -1431,7 +1447,11 @@ void FragmentMgr::cancel_worker() { std::string("Coordinator dead.")); } - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); + for (auto it : brpc_stub_with_queries) { + _check_brpc_available(it.first, it.second); + } + } while (!_stop_background_threads_latch.wait_for( + std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds))); LOG(INFO) << "FragmentMgr cancel worker is going to exit."; } @@ -1448,6 +1468,52 @@ void FragmentMgr::debug(std::stringstream& ss) { } } +void FragmentMgr::_check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub, + const BrpcItem& brpc_item) { + const std::string message = "hello doris!"; + std::string error_message; + int32_t failed_count = 0; + while (true) { + PHandShakeRequest request; + request.set_hello(message); + PHandShakeResponse response; + brpc::Controller cntl; + cntl.set_timeout_ms(500 * (failed_count + 1)); + cntl.set_max_retry(10); + brpc_stub->hand_shake(&cntl, &request, &response, nullptr); + + if (cntl.Failed()) { + error_message = cntl.ErrorText(); + LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":" + << brpc_item.network_address.port << " check failed: " << error_message; + } else if (response.has_status() && response.status().status_code() == 0) { + break; + } else { + error_message = response.DebugString(); + LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":" + << brpc_item.network_address.port << " check failed: " << error_message; + } + failed_count++; + if (failed_count == 2) { + for (const auto& query_wptr : brpc_item.queries) { + auto query = query_wptr.lock(); + if (query && !query->is_cancelled()) { + cancel_query(query->query_id(), PPlanFragmentCancelReason::INTERNAL_ERROR, + fmt::format("brpc(dest: {}:{}) check failed: {}", + brpc_item.network_address.hostname, + brpc_item.network_address.port, error_message)); + } + } + + LOG(WARNING) << "remove brpc stub from cache: " << brpc_item.network_address.hostname + << ":" << brpc_item.network_address.port << ", error: " << error_message; + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + brpc_item.network_address.hostname, brpc_item.network_address.port); + break; + } + } +} + /* * 1. resolve opaqued_query_plan to thrift structure * 2. build TExecPlanFragmentParams diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 16ad368ae61..0c1bb3033d9 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -159,6 +159,12 @@ private: void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor, const FinishCallback& cb); + struct BrpcItem { + TNetworkAddress network_address; + std::vector<std::weak_ptr<QueryContext>> queries; + }; + + std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId& query_id); template <typename Param> void _set_scan_concurrency(const Param& params, QueryContext* query_ctx); @@ -177,6 +183,9 @@ private: Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, QuerySource query_type, std::shared_ptr<QueryContext>& query_ctx); + void _check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub, + const BrpcItem& brpc_item); + // This is input params ExecEnv* _exec_env = nullptr; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 862da39bfae..e781ae61cab 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -19,6 +19,7 @@ #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/Types_types.h> +#include <glog/logging.h> #include <atomic> #include <memory> @@ -305,6 +306,25 @@ public: } } + void add_using_brpc_stub(const TNetworkAddress& network_address, + std::shared_ptr<PBackendService_Stub> brpc_stub) { + if (network_address.port == 0) { + return; + } + std::lock_guard<std::mutex> lock(_brpc_stubs_mutex); + if (!_using_brpc_stubs.contains(network_address)) { + _using_brpc_stubs.emplace(network_address, brpc_stub); + } + + DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get()); + } + + std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> + get_using_brpc_stubs() { + std::lock_guard<std::mutex> lock(_brpc_stubs_mutex); + return _using_brpc_stubs; + } + private: TUniqueId _query_id; ExecEnv* _exec_env = nullptr; @@ -363,6 +383,9 @@ private: // help us manage the query. QuerySource _query_source; + std::mutex _brpc_stubs_mutex; + std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> _using_brpc_stubs; + public: timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; } QuerySource get_query_source() const { return this->_query_source; } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 394005f6adf..0733c39621e 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -21,6 +21,7 @@ #include <fmt/ranges.h> // IWYU pragma: keep #include <gen_cpp/DataSinks_types.h> #include <gen_cpp/Metrics_types.h> +#include <gen_cpp/Types_types.h> #include <gen_cpp/data.pb.h> #include <gen_cpp/internal_service.pb.h> #include <glog/logging.h> @@ -131,8 +132,16 @@ Status Channel<Parent>::init_stub(RuntimeState* state) { if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client( "127.0.0.1", _brpc_dest_addr.port); + if (config::enable_brpc_connection_check) { + auto network_address = _brpc_dest_addr; + network_address.hostname = "127.0.0.1"; + state->get_query_ctx()->add_using_brpc_stub(network_address, _brpc_stub); + } } else { _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr); + if (config::enable_brpc_connection_check) { + state->get_query_ctx()->add_using_brpc_stub(_brpc_dest_addr, _brpc_stub); + } } if (!_brpc_stub) { diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 3aaf8777b37..b7565d721c6 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -90,3 +90,4 @@ enable_missing_rows_correctness_check=true #enable_jvm_monitor = true crash_in_memory_tracker_inaccurate = true +enable_brpc_connection_check=true diff --git a/regression-test/pipeline/p1/conf/be.conf b/regression-test/pipeline/p1/conf/be.conf index e9bf1dbdd88..356e045e31b 100644 --- a/regression-test/pipeline/p1/conf/be.conf +++ b/regression-test/pipeline/p1/conf/be.conf @@ -82,3 +82,5 @@ enable_missing_rows_correctness_check=true enable_jvm_monitor = true crash_in_memory_tracker_inaccurate = true +enable_table_size_correctness_check=true +enable_brpc_connection_check=true --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org