This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cc083c53eb7 [Chore](rpc) set and clean signal task id on some rpc 
thread (#49871)
cc083c53eb7 is described below

commit cc083c53eb7598a4de59c259b6ae4bd3fa8fa9d6
Author: Pxl <x...@selectdb.com>
AuthorDate: Thu Apr 10 13:17:00 2025 +0800

    [Chore](rpc) set and clean signal task id on some rpc thread (#49871)
    
    ### What problem does this PR solve?
    set and clean signal task id on some rpc thread
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [x] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [x] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [x] Confirm the release note
    - [x] Confirm test cases
    - [x] Confirm document
    - [x] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/service/internal_service.cpp | 64 ++++++++++++++++++++++---------------
 be/src/service/internal_service.h   | 21 ------------
 2 files changed, 38 insertions(+), 47 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index cb47562a778..f1d3735699b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -44,26 +44,20 @@
 #include <vec/sink/varrow_flight_result_writer.h>
 
 #include <algorithm>
-#include <exception>
 #include <filesystem>
 #include <memory>
 #include <set>
 #include <sstream>
 #include <string>
-#include <unordered_map>
 #include <utility>
 #include <vector>
 
 #include "common/config.h"
-#include "common/consts.h"
 #include "common/exception.h"
 #include "common/logging.h"
 #include "common/signal_handler.h"
 #include "common/status.h"
 #include "exec/rowid_fetcher.h"
-#include "gen_cpp/BackendService.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/internal_service.pb.h"
 #include "http/http_client.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/stream_load_pipe.h"
@@ -76,21 +70,14 @@
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/segment_v2/column_reader.h"
-#include "olap/rowset/segment_v2/common.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
-#include "olap/rowset/segment_v2/segment.h"
-#include "olap/rowset/segment_v2/segment_iterator.h"
-#include "olap/segment_loader.h"
 #include "olap/storage_engine.h"
-#include "olap/tablet.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_schema.h"
 #include "olap/txn_manager.h"
-#include "olap/utils.h"
 #include "olap/wal/wal_manager.h"
 #include "runtime/cache/result_cache.h"
-#include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/fold_constant_executor.h"
@@ -121,12 +108,8 @@
 #include "util/thrift_util.h"
 #include "util/time.h"
 #include "util/uid_util.h"
-#include "vec/columns/column.h"
-#include "vec/columns/column_string.h"
 #include "vec/common/schema_util.h"
 #include "vec/core/block.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/data_types/data_type.h"
 #include "vec/exec/format/avro//avro_jni_reader.h"
 #include "vec/exec/format/csv/csv_reader.h"
 #include "vec/exec/format/generic_reader.h"
@@ -169,6 +152,34 @@ static void thread_context_deleter(void* d) {
     delete static_cast<ThreadContext*>(d);
 }
 
+template <typename T>
+concept CanCancel = requires(T* response) { response->mutable_status(); };
+
+template <typename T>
+void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
+    brpc::ClosureGuard closure_guard(done);
+    LOG(WARNING) << "fail to offer request to the work pool, pool=" << 
pool.get_info();
+}
+
+template <CanCancel T>
+void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
+    brpc::ClosureGuard closure_guard(done);
+    // Should use status to generate protobuf message, because it will 
encoding Backend Info
+    // into the error message and then we could know which backend's pool is 
full.
+    Status st = Status::Error<TStatusCode::CANCELLED>(
+            "fail to offer request to the work pool, pool={}", 
pool.get_info());
+    st.to_protobuf(response->mutable_status());
+    LOG(WARNING) << "cancelled due to fail to offer request to the work pool, 
pool="
+                 << pool.get_info();
+}
+
+// this struct is used to set signal task id in the constructor and reset it 
in the destructor
+// thread pool will reuse pthread, so we need to clean thread local data
+struct SignalTaskIdKeeper {
+    SignalTaskIdKeeper(const PUniqueId& id) { signal::set_signal_task_id(id); }
+    ~SignalTaskIdKeeper() { signal::set_signal_task_id(PUniqueId {}); }
+};
+
 template <typename T>
 class NewHttpClosure : public ::google::protobuf::Closure {
 public:
@@ -282,7 +293,7 @@ void 
PInternalService::tablet_writer_open(google::protobuf::RpcController* contr
     bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
         VLOG_RPC << "tablet writer open, id=" << request->id()
                  << ", index_id=" << request->index_id() << ", txn_id=" << 
request->txn_id();
-        signal::set_signal_task_id(request->id());
+        SignalTaskIdKeeper keeper(request->id());
         brpc::ClosureGuard closure_guard(done);
         auto st = _exec_env->load_channel_mgr()->open(*request);
         if (!st.ok()) {
@@ -388,7 +399,7 @@ void 
PInternalService::open_load_stream(google::protobuf::RpcController* control
                                         POpenLoadStreamResponse* response,
                                         google::protobuf::Closure* done) {
     bool ret = _heavy_work_pool.try_offer([this, controller, request, 
response, done]() {
-        signal::set_signal_task_id(request->load_id());
+        SignalTaskIdKeeper keeper(request->load_id());
         brpc::ClosureGuard done_guard(done);
         brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
         brpc::StreamOptions stream_options;
@@ -468,7 +479,7 @@ void 
PInternalService::tablet_writer_add_block(google::protobuf::RpcController*
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
-            signal::set_signal_task_id(request->id());
+            SignalTaskIdKeeper keeper(request->id());
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, 
response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add block failed, message=" << 
st
@@ -494,7 +505,7 @@ void 
PInternalService::tablet_writer_cancel(google::protobuf::RpcController* con
     bool ret = _heavy_work_pool.try_offer([this, request, done]() {
         VLOG_RPC << "tablet writer cancel, id=" << request->id()
                  << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id();
-        signal::set_signal_task_id(request->id());
+        SignalTaskIdKeeper keeper(request->id());
         brpc::ClosureGuard closure_guard(done);
         auto st = _exec_env->load_channel_mgr()->cancel(*request);
         if (!st.ok()) {
@@ -620,10 +631,7 @@ void 
PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*c
                                             google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, request, result, done]() {
         brpc::ClosureGuard closure_guard(done);
-        TUniqueId tid;
-        tid.__set_hi(request->finst_id().hi());
-        tid.__set_lo(request->finst_id().lo());
-        signal::set_signal_task_id(tid);
+        SignalTaskIdKeeper keeper(request->finst_id());
         Status st = Status::OK();
 
         const bool has_cancel_reason = request->has_cancel_reason();
@@ -1390,6 +1398,7 @@ void 
PInternalService::merge_filter(::google::protobuf::RpcController* controlle
                                     ::doris::PMergeFilterResponse* response,
                                     ::google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+        SignalTaskIdKeeper keeper(request->query_id());
         brpc::ClosureGuard closure_guard(done);
         auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
@@ -1407,6 +1416,7 @@ void 
PInternalService::send_filter_size(::google::protobuf::RpcController* contr
                                         ::doris::PSendFilterSizeResponse* 
response,
                                         ::google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        SignalTaskIdKeeper keeper(request->query_id());
         brpc::ClosureGuard closure_guard(done);
         Status st = _exec_env->fragment_mgr()->send_filter_size(request);
         st.to_protobuf(response->mutable_status());
@@ -1422,6 +1432,7 @@ void 
PInternalService::sync_filter_size(::google::protobuf::RpcController* contr
                                         ::doris::PSyncFilterSizeResponse* 
response,
                                         ::google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        SignalTaskIdKeeper keeper(request->query_id());
         brpc::ClosureGuard closure_guard(done);
         Status st = _exec_env->fragment_mgr()->sync_filter_size(request);
         st.to_protobuf(response->mutable_status());
@@ -1437,6 +1448,7 @@ void 
PInternalService::apply_filterv2(::google::protobuf::RpcController* control
                                       ::doris::PPublishFilterResponse* 
response,
                                       ::google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+        SignalTaskIdKeeper keeper(request->query_id());
         brpc::ClosureGuard closure_guard(done);
         auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
@@ -2053,7 +2065,7 @@ void 
PInternalService::multiget_data(google::protobuf::RpcController* controller
                                      const PMultiGetRequest* request, 
PMultiGetResponse* response,
                                      google::protobuf::Closure* done) {
     bool ret = _heavy_work_pool.try_offer([request, response, done]() {
-        signal::set_signal_task_id(request->query_id());
+        SignalTaskIdKeeper keeper(request->query_id());
         // multi get data by rowid
         MonotonicStopWatch watch;
         watch.start();
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index e3d03a6a449..31c50480784 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -38,27 +38,6 @@ class PHandShakeRequest;
 class PHandShakeResponse;
 class RuntimeState;
 
-template <typename T>
-concept CanCancel = requires(T* response) { response->mutable_status(); };
-
-template <typename T>
-void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
-    brpc::ClosureGuard closure_guard(done);
-    LOG(WARNING) << "fail to offer request to the work pool, pool=" << 
pool.get_info();
-}
-
-template <CanCancel T>
-void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
-    brpc::ClosureGuard closure_guard(done);
-    // Should use status to generate protobuf message, because it will 
encoding Backend Info
-    // into the error message and then we could know which backend's pool is 
full.
-    Status st = Status::Error<TStatusCode::CANCELLED>(
-            "fail to offer request to the work pool, pool={}", 
pool.get_info());
-    st.to_protobuf(response->mutable_status());
-    LOG(WARNING) << "cancelled due to fail to offer request to the work pool, 
pool="
-                 << pool.get_info();
-}
-
 class PInternalService : public PBackendService {
 public:
     PInternalService(ExecEnv* exec_env);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to