This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cc083c53eb7 [Chore](rpc) set and clean signal task id on some rpc thread (#49871) cc083c53eb7 is described below commit cc083c53eb7598a4de59c259b6ae4bd3fa8fa9d6 Author: Pxl <x...@selectdb.com> AuthorDate: Thu Apr 10 13:17:00 2025 +0800 [Chore](rpc) set and clean signal task id on some rpc thread (#49871) ### What problem does this PR solve? set and clean signal task id on some rpc thread ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [x] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [x] Confirm the release note - [x] Confirm test cases - [x] Confirm document - [x] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/service/internal_service.cpp | 64 ++++++++++++++++++++++--------------- be/src/service/internal_service.h | 21 ------------ 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index cb47562a778..f1d3735699b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -44,26 +44,20 @@ #include <vec/sink/varrow_flight_result_writer.h> #include <algorithm> -#include <exception> #include <filesystem> #include <memory> #include <set> #include <sstream> #include <string> -#include <unordered_map> #include <utility> #include <vector> #include "common/config.h" -#include "common/consts.h" #include "common/exception.h" #include "common/logging.h" #include "common/signal_handler.h" #include "common/status.h" #include "exec/rowid_fetcher.h" -#include "gen_cpp/BackendService.h" -#include "gen_cpp/PaloInternalService_types.h" -#include "gen_cpp/internal_service.pb.h" #include "http/http_client.h" #include "io/fs/local_file_system.h" #include "io/fs/stream_load_pipe.h" @@ -76,21 +70,14 @@ #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/segment_v2/column_reader.h" -#include "olap/rowset/segment_v2/common.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" -#include "olap/rowset/segment_v2/segment.h" -#include "olap/rowset/segment_v2/segment_iterator.h" -#include "olap/segment_loader.h" #include "olap/storage_engine.h" -#include "olap/tablet.h" #include "olap/tablet_fwd.h" #include "olap/tablet_manager.h" #include "olap/tablet_schema.h" #include "olap/txn_manager.h" -#include "olap/utils.h" #include "olap/wal/wal_manager.h" #include "runtime/cache/result_cache.h" -#include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/fold_constant_executor.h" @@ -121,12 +108,8 @@ #include "util/thrift_util.h" #include "util/time.h" #include "util/uid_util.h" -#include "vec/columns/column.h" -#include "vec/columns/column_string.h" #include "vec/common/schema_util.h" #include "vec/core/block.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/data_types/data_type.h" #include "vec/exec/format/avro//avro_jni_reader.h" #include "vec/exec/format/csv/csv_reader.h" #include "vec/exec/format/generic_reader.h" @@ -169,6 +152,34 @@ static void thread_context_deleter(void* d) { delete static_cast<ThreadContext*>(d); } +template <typename T> +concept CanCancel = requires(T* response) { response->mutable_status(); }; + +template <typename T> +void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { + brpc::ClosureGuard closure_guard(done); + LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info(); +} + +template <CanCancel T> +void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { + brpc::ClosureGuard closure_guard(done); + // Should use status to generate protobuf message, because it will encoding Backend Info + // into the error message and then we could know which backend's pool is full. + Status st = Status::Error<TStatusCode::CANCELLED>( + "fail to offer request to the work pool, pool={}", pool.get_info()); + st.to_protobuf(response->mutable_status()); + LOG(WARNING) << "cancelled due to fail to offer request to the work pool, pool=" + << pool.get_info(); +} + +// this struct is used to set signal task id in the constructor and reset it in the destructor +// thread pool will reuse pthread, so we need to clean thread local data +struct SignalTaskIdKeeper { + SignalTaskIdKeeper(const PUniqueId& id) { signal::set_signal_task_id(id); } + ~SignalTaskIdKeeper() { signal::set_signal_task_id(PUniqueId {}); } +}; + template <typename T> class NewHttpClosure : public ::google::protobuf::Closure { public: @@ -282,7 +293,7 @@ void PInternalService::tablet_writer_open(google::protobuf::RpcController* contr bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); - signal::set_signal_task_id(request->id()); + SignalTaskIdKeeper keeper(request->id()); brpc::ClosureGuard closure_guard(done); auto st = _exec_env->load_channel_mgr()->open(*request); if (!st.ok()) { @@ -388,7 +399,7 @@ void PInternalService::open_load_stream(google::protobuf::RpcController* control POpenLoadStreamResponse* response, google::protobuf::Closure* done) { bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() { - signal::set_signal_task_id(request->load_id()); + SignalTaskIdKeeper keeper(request->load_id()); brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); brpc::StreamOptions stream_options; @@ -468,7 +479,7 @@ void PInternalService::tablet_writer_add_block(google::protobuf::RpcController* int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - signal::set_signal_task_id(request->id()); + SignalTaskIdKeeper keeper(request->id()); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { LOG(WARNING) << "tablet writer add block failed, message=" << st @@ -494,7 +505,7 @@ void PInternalService::tablet_writer_cancel(google::protobuf::RpcController* con bool ret = _heavy_work_pool.try_offer([this, request, done]() { VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); - signal::set_signal_task_id(request->id()); + SignalTaskIdKeeper keeper(request->id()); brpc::ClosureGuard closure_guard(done); auto st = _exec_env->load_channel_mgr()->cancel(*request); if (!st.ok()) { @@ -620,10 +631,7 @@ void PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*c google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([this, request, result, done]() { brpc::ClosureGuard closure_guard(done); - TUniqueId tid; - tid.__set_hi(request->finst_id().hi()); - tid.__set_lo(request->finst_id().lo()); - signal::set_signal_task_id(tid); + SignalTaskIdKeeper keeper(request->finst_id()); Status st = Status::OK(); const bool has_cancel_reason = request->has_cancel_reason(); @@ -1390,6 +1398,7 @@ void PInternalService::merge_filter(::google::protobuf::RpcController* controlle ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { + SignalTaskIdKeeper keeper(request->query_id()); brpc::ClosureGuard closure_guard(done); auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); @@ -1407,6 +1416,7 @@ void PInternalService::send_filter_size(::google::protobuf::RpcController* contr ::doris::PSendFilterSizeResponse* response, ::google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([this, request, response, done]() { + SignalTaskIdKeeper keeper(request->query_id()); brpc::ClosureGuard closure_guard(done); Status st = _exec_env->fragment_mgr()->send_filter_size(request); st.to_protobuf(response->mutable_status()); @@ -1422,6 +1432,7 @@ void PInternalService::sync_filter_size(::google::protobuf::RpcController* contr ::doris::PSyncFilterSizeResponse* response, ::google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([this, request, response, done]() { + SignalTaskIdKeeper keeper(request->query_id()); brpc::ClosureGuard closure_guard(done); Status st = _exec_env->fragment_mgr()->sync_filter_size(request); st.to_protobuf(response->mutable_status()); @@ -1437,6 +1448,7 @@ void PInternalService::apply_filterv2(::google::protobuf::RpcController* control ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { + SignalTaskIdKeeper keeper(request->query_id()); brpc::ClosureGuard closure_guard(done); auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); @@ -2053,7 +2065,7 @@ void PInternalService::multiget_data(google::protobuf::RpcController* controller const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) { bool ret = _heavy_work_pool.try_offer([request, response, done]() { - signal::set_signal_task_id(request->query_id()); + SignalTaskIdKeeper keeper(request->query_id()); // multi get data by rowid MonotonicStopWatch watch; watch.start(); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index e3d03a6a449..31c50480784 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -38,27 +38,6 @@ class PHandShakeRequest; class PHandShakeResponse; class RuntimeState; -template <typename T> -concept CanCancel = requires(T* response) { response->mutable_status(); }; - -template <typename T> -void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { - brpc::ClosureGuard closure_guard(done); - LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info(); -} - -template <CanCancel T> -void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { - brpc::ClosureGuard closure_guard(done); - // Should use status to generate protobuf message, because it will encoding Backend Info - // into the error message and then we could know which backend's pool is full. - Status st = Status::Error<TStatusCode::CANCELLED>( - "fail to offer request to the work pool, pool={}", pool.get_info()); - st.to_protobuf(response->mutable_status()); - LOG(WARNING) << "cancelled due to fail to offer request to the work pool, pool=" - << pool.get_info(); -} - class PInternalService : public PBackendService { public: PInternalService(ExecEnv* exec_env); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org