This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7fc78e3f87e [opt](brpc) check and remove unavailable brpc stubs 
(#43212) (#43859)
7fc78e3f87e is described below

commit 7fc78e3f87e0c3fd85dd1de9fa55e1d3f27a6bf4
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Thu Nov 14 19:52:06 2024 +0800

    [opt](brpc) check and remove unavailable brpc stubs (#43212) (#43859)
---
 be/src/common/config.cpp                 |  5 +++
 be/src/common/config.h                   |  4 ++
 be/src/runtime/fragment_mgr.cpp          | 68 +++++++++++++++++++++++++++++++-
 be/src/runtime/fragment_mgr.h            |  9 +++++
 be/src/runtime/query_context.h           | 23 +++++++++++
 be/src/vec/sink/vdata_stream_sender.cpp  |  9 +++++
 regression-test/pipeline/p0/conf/be.conf |  1 +
 regression-test/pipeline/p1/conf/be.conf |  2 +
 8 files changed, 120 insertions(+), 1 deletion(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0878d27e833..c239dc3e72d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -529,6 +529,9 @@ DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
 
//https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely
 DEFINE_Bool(enable_brpc_builtin_services, "true");
 
+// Enable brpc connection check
+DEFINE_Bool(enable_brpc_connection_check, "false");
+
 // The maximum amount of data that can be processed by a stream load
 DEFINE_mInt64(streaming_load_max_mb, "102400");
 // Some data formats, such as JSON, cannot be streamed.
@@ -964,6 +967,8 @@ DEFINE_mInt64(brpc_streaming_client_batch_bytes, "262144");
 // so as to avoid occupying the execution thread for a long time.
 DEFINE_mInt32(max_fragment_start_wait_time_seconds, "30");
 
+DEFINE_mInt32(fragment_mgr_cancel_worker_interval_seconds, "1");
+
 // Node role tag for backend. Mix role is the default role, and computation 
role have no
 // any tablet.
 DEFINE_String(be_node_role, "mix");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a282abb37ee..289c56464f3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1014,6 +1014,8 @@ DECLARE_mInt64(brpc_streaming_client_batch_bytes);
 
 DECLARE_Bool(enable_brpc_builtin_services);
 
+DECLARE_Bool(enable_brpc_connection_check);
+
 // Max waiting time to wait the "plan fragment start" rpc.
 // If timeout, the fragment will be cancelled.
 // This parameter is usually only used when the FE loses connection,
@@ -1021,6 +1023,8 @@ DECLARE_Bool(enable_brpc_builtin_services);
 // so as to avoid occupying the execution thread for a long time.
 DECLARE_mInt32(max_fragment_start_wait_time_seconds);
 
+DECLARE_Int32(fragment_mgr_cancel_worker_interval_seconds);
+
 // Node role tag for backend. Mix role is the default role, and computation 
role have no
 // any tablet.
 DECLARE_String(be_node_role);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3a4c752ccae..bcb48559178 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -17,6 +17,7 @@
 
 #include "runtime/fragment_mgr.h"
 
+#include <brpc/controller.h>
 #include <bvar/latency_recorder.h>
 #include <exprs/runtime_filter.h>
 #include <fmt/format.h>
@@ -84,6 +85,7 @@
 #include "runtime/workload_group/workload_group_manager.h"
 #include "runtime/workload_management/workload_query_info.h"
 #include "service/backend_options.h"
+#include "util/brpc_client_cache.h"
 #include "util/debug_points.h"
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
@@ -1284,6 +1286,7 @@ void FragmentMgr::cancel_worker() {
         }
 
         VecDateTimeValue now = VecDateTimeValue::local_time();
+        std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> 
brpc_stub_with_queries;
         {
             std::lock_guard<std::mutex> lock(_lock);
             for (auto& fragment_instance_itr : _fragment_instance_map) {
@@ -1291,6 +1294,7 @@ void FragmentMgr::cancel_worker() {
                     
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
                 }
             }
+
             for (auto& pipeline_itr : _pipeline_map) {
                 if (pipeline_itr.second->is_timeout(now)) {
                     std::vector<TUniqueId> ins_ids;
@@ -1308,6 +1312,18 @@ void FragmentMgr::cancel_worker() {
                     LOG_WARNING("Query {} is timeout", print_id(it->first));
                     it = _query_ctx_map.erase(it);
                 } else {
+                    if (config::enable_brpc_connection_check) {
+                        auto brpc_stubs = it->second->get_using_brpc_stubs();
+                        for (auto& item : brpc_stubs) {
+                            if (!brpc_stub_with_queries.contains(item.second)) 
{
+                                brpc_stub_with_queries.emplace(item.second,
+                                                               BrpcItem 
{item.first, {it->second}});
+                            } else {
+                                
brpc_stub_with_queries[item.second].queries.emplace_back(
+                                        it->second);
+                            }
+                        }
+                    }
                     ++it;
                 }
             }
@@ -1431,7 +1447,11 @@ void FragmentMgr::cancel_worker() {
                          std::string("Coordinator dead."));
         }
 
-    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
+        for (auto it : brpc_stub_with_queries) {
+            _check_brpc_available(it.first, it.second);
+        }
+    } while (!_stop_background_threads_latch.wait_for(
+            
std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
     LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
 }
 
@@ -1448,6 +1468,52 @@ void FragmentMgr::debug(std::stringstream& ss) {
     }
 }
 
+void FragmentMgr::_check_brpc_available(const 
std::shared_ptr<PBackendService_Stub>& brpc_stub,
+                                        const BrpcItem& brpc_item) {
+    const std::string message = "hello doris!";
+    std::string error_message;
+    int32_t failed_count = 0;
+    while (true) {
+        PHandShakeRequest request;
+        request.set_hello(message);
+        PHandShakeResponse response;
+        brpc::Controller cntl;
+        cntl.set_timeout_ms(500 * (failed_count + 1));
+        cntl.set_max_retry(10);
+        brpc_stub->hand_shake(&cntl, &request, &response, nullptr);
+
+        if (cntl.Failed()) {
+            error_message = cntl.ErrorText();
+            LOG(WARNING) << "brpc stub: " << 
brpc_item.network_address.hostname << ":"
+                         << brpc_item.network_address.port << " check failed: 
" << error_message;
+        } else if (response.has_status() && response.status().status_code() == 
0) {
+            break;
+        } else {
+            error_message = response.DebugString();
+            LOG(WARNING) << "brpc stub: " << 
brpc_item.network_address.hostname << ":"
+                         << brpc_item.network_address.port << " check failed: 
" << error_message;
+        }
+        failed_count++;
+        if (failed_count == 2) {
+            for (const auto& query_wptr : brpc_item.queries) {
+                auto query = query_wptr.lock();
+                if (query && !query->is_cancelled()) {
+                    cancel_query(query->query_id(), 
PPlanFragmentCancelReason::INTERNAL_ERROR,
+                                 fmt::format("brpc(dest: {}:{}) check failed: 
{}",
+                                             
brpc_item.network_address.hostname,
+                                             brpc_item.network_address.port, 
error_message));
+                }
+            }
+
+            LOG(WARNING) << "remove brpc stub from cache: " << 
brpc_item.network_address.hostname
+                         << ":" << brpc_item.network_address.port << ", error: 
" << error_message;
+            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                    brpc_item.network_address.hostname, 
brpc_item.network_address.port);
+            break;
+        }
+    }
+}
+
 /*
  * 1. resolve opaqued_query_plan to thrift structure
  * 2. build TExecPlanFragmentParams
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 16ad368ae61..0c1bb3033d9 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -159,6 +159,12 @@ private:
 
     void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor,
                       const FinishCallback& cb);
+    struct BrpcItem {
+        TNetworkAddress network_address;
+        std::vector<std::weak_ptr<QueryContext>> queries;
+    };
+
+    std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId& 
query_id);
 
     template <typename Param>
     void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
@@ -177,6 +183,9 @@ private:
     Status _get_query_ctx(const Params& params, TUniqueId query_id, bool 
pipeline,
                           QuerySource query_type, 
std::shared_ptr<QueryContext>& query_ctx);
 
+    void _check_brpc_available(const std::shared_ptr<PBackendService_Stub>& 
brpc_stub,
+                               const BrpcItem& brpc_item);
+
     // This is input params
     ExecEnv* _exec_env = nullptr;
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 862da39bfae..e781ae61cab 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -19,6 +19,7 @@
 
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
 
 #include <atomic>
 #include <memory>
@@ -305,6 +306,25 @@ public:
         }
     }
 
+    void add_using_brpc_stub(const TNetworkAddress& network_address,
+                             std::shared_ptr<PBackendService_Stub> brpc_stub) {
+        if (network_address.port == 0) {
+            return;
+        }
+        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
+        if (!_using_brpc_stubs.contains(network_address)) {
+            _using_brpc_stubs.emplace(network_address, brpc_stub);
+        }
+
+        DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get());
+    }
+
+    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
+    get_using_brpc_stubs() {
+        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
+        return _using_brpc_stubs;
+    }
+
 private:
     TUniqueId _query_id;
     ExecEnv* _exec_env = nullptr;
@@ -363,6 +383,9 @@ private:
     // help us manage the query.
     QuerySource _query_source;
 
+    std::mutex _brpc_stubs_mutex;
+    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> 
_using_brpc_stubs;
+
 public:
     timespec get_query_arrival_timestamp() const { return 
this->_query_arrival_timestamp; }
     QuerySource get_query_source() const { return this->_query_source; }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 394005f6adf..0733c39621e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -21,6 +21,7 @@
 #include <fmt/ranges.h> // IWYU pragma: keep
 #include <gen_cpp/DataSinks_types.h>
 #include <gen_cpp/Metrics_types.h>
+#include <gen_cpp/Types_types.h>
 #include <gen_cpp/data.pb.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <glog/logging.h>
@@ -131,8 +132,16 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
     if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
         _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(
                 "127.0.0.1", _brpc_dest_addr.port);
+        if (config::enable_brpc_connection_check) {
+            auto network_address = _brpc_dest_addr;
+            network_address.hostname = "127.0.0.1";
+            state->get_query_ctx()->add_using_brpc_stub(network_address, 
_brpc_stub);
+        }
     } else {
         _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr);
+        if (config::enable_brpc_connection_check) {
+            state->get_query_ctx()->add_using_brpc_stub(_brpc_dest_addr, 
_brpc_stub);
+        }
     }
 
     if (!_brpc_stub) {
diff --git a/regression-test/pipeline/p0/conf/be.conf 
b/regression-test/pipeline/p0/conf/be.conf
index 3aaf8777b37..b7565d721c6 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -90,3 +90,4 @@ enable_missing_rows_correctness_check=true
 #enable_jvm_monitor = true
 
 crash_in_memory_tracker_inaccurate = true
+enable_brpc_connection_check=true
diff --git a/regression-test/pipeline/p1/conf/be.conf 
b/regression-test/pipeline/p1/conf/be.conf
index e9bf1dbdd88..356e045e31b 100644
--- a/regression-test/pipeline/p1/conf/be.conf
+++ b/regression-test/pipeline/p1/conf/be.conf
@@ -82,3 +82,5 @@ enable_missing_rows_correctness_check=true
 
 enable_jvm_monitor = true
 crash_in_memory_tracker_inaccurate = true
+enable_table_size_correctness_check=true
+enable_brpc_connection_check=true


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to