This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c7b447a3d20 [refactor](fragment) Use fragment ID to manage fragment 
context (#42048)
c7b447a3d20 is described below

commit c7b447a3d206f101454f73ea51e510d0023639ca
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Oct 18 11:44:20 2024 +0800

    [refactor](fragment) Use fragment ID to manage fragment context (#42048)
    
    Use fragment ID to manage fragment context
---
 be/src/pipeline/pipeline_fragment_context.cpp      |  9 +--
 be/src/pipeline/pipeline_fragment_context.h        | 14 -----
 be/src/runtime/fragment_mgr.cpp                    | 68 +++++-----------------
 be/src/runtime/fragment_mgr.h                      |  8 +--
 be/src/runtime/query_context.h                     |  2 -
 be/src/runtime/runtime_filter_mgr.cpp              | 16 +++--
 be/src/runtime/runtime_filter_mgr.h                |  1 -
 be/src/service/backend_service.cpp                 |  7 ---
 be/src/service/backend_service.h                   |  2 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 24 +++++---
 gensrc/proto/internal_service.proto                |  1 +
 gensrc/thrift/PaloInternalService.thrift           |  1 +
 12 files changed, 51 insertions(+), 102 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 11898952276..28cfefbf6c1 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -572,10 +572,7 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
 void PipelineFragmentContext::_init_next_report_time() {
     auto interval_s = config::pipeline_status_report_interval;
     if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
-        std::vector<string> ins_ids;
-        instance_ids(ins_ids);
-        VLOG_FILE << "enable period report: instance_id="
-                  << fmt::format("{}", fmt::join(ins_ids, ", "));
+        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
         uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * 
NANOS_PER_SEC;
         // We don't want to wait longer than it takes to run the entire 
fragment.
         _previous_report_time =
@@ -613,11 +610,9 @@ void 
PipelineFragmentContext::trigger_report_if_necessary() {
             return;
         }
         if (VLOG_FILE_IS_ON) {
-            std::vector<string> ins_ids;
-            instance_ids(ins_ids);
             VLOG_FILE << "Reporting "
                       << "profile for query_id " << print_id(_query_id)
-                      << ", instance ids: " << fmt::format("{}", 
fmt::join(ins_ids, ", "));
+                      << ", fragment id: " << _fragment_id;
 
             std::stringstream ss;
             _runtime_state->runtime_profile()->compute_time_in_profile();
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index bcef1271b60..0749729789e 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -115,20 +115,6 @@ public:
 
     [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
 
-    void instance_ids(std::vector<TUniqueId>& ins_ids) const {
-        ins_ids.resize(_fragment_instance_ids.size());
-        for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
-            ins_ids[i] = _fragment_instance_ids[i];
-        }
-    }
-
-    void instance_ids(std::vector<string>& ins_ids) const {
-        ins_ids.resize(_fragment_instance_ids.size());
-        for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
-            ins_ids[i] = print_id(_fragment_instance_ids[i]);
-        }
-    }
-
     void clear_finished_tasks() {
         for (size_t j = 0; j < _tasks.size(); j++) {
             for (size_t i = 0; i < _tasks[j].size(); i++) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 008f92cfef8..26fb098c76d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -106,7 +106,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", 
"prepare");
-bvar::Adder<int64_t> 
g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count");
 
 bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
 bvar::Status<uint64_t> g_fragment_last_active_time(
@@ -598,18 +597,13 @@ void FragmentMgr::remove_pipeline_context(
     {
         std::lock_guard<std::mutex> lock(_lock);
         auto query_id = f_context->get_query_id();
-        std::vector<TUniqueId> ins_ids;
-        f_context->instance_ids(ins_ids);
         int64 now = duration_cast<std::chrono::milliseconds>(
                             
std::chrono::system_clock::now().time_since_epoch())
                             .count();
         g_fragment_executing_count << -1;
         g_fragment_last_active_time.set_value(now);
-        for (const auto& ins_id : ins_ids) {
-            LOG_INFO("Removing query {} instance {}", print_id(query_id), 
print_id(ins_id));
-            _pipeline_map.erase(ins_id);
-            g_pipeline_fragment_instances_count << -1;
-        }
+        LOG_INFO("Removing query {} fragment {}", print_id(query_id), 
f_context->get_fragment_id());
+        _pipeline_map.erase({query_id, f_context->get_fragment_id()});
     }
 }
 
@@ -743,11 +737,10 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t 
duration) {
                 continue;
             }
             auto timeout_second = it.second->timeout_second();
-            fmt::format_to(debug_string_buffer,
-                           "No.{} (elapse_second={}s, 
query_timeout_second={}s, instance_id="
-                           "{}, is_timeout={}) : {}\n",
-                           i, elapsed, timeout_second, print_id(it.first),
-                           it.second->is_timeout(now), 
it.second->debug_string());
+            fmt::format_to(
+                    debug_string_buffer,
+                    "No.{} (elapse_second={}s, query_timeout_second={}s, 
is_timeout={}) : {}\n", i,
+                    elapsed, timeout_second, it.second->is_timeout(now), 
it.second->debug_string());
             i++;
         }
     }
@@ -807,11 +800,10 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     for (const auto& local_param : params.local_params) {
         const TUniqueId& fragment_instance_id = 
local_param.fragment_instance_id;
         std::lock_guard<std::mutex> lock(_lock);
-        auto iter = _pipeline_map.find(fragment_instance_id);
+        auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
         if (iter != _pipeline_map.end()) {
-            return Status::InternalError(
-                    "exec_plan_fragment input duplicated 
fragment_instance_id({})",
-                    UniqueId(fragment_instance_id).to_string());
+            return Status::InternalError("exec_plan_fragment input duplicated 
fragment_id({})",
+                                         params.fragment_id);
         }
         query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
     }
@@ -827,12 +819,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         g_fragment_executing_count << 1;
         g_fragment_last_active_time.set_value(now);
         std::lock_guard<std::mutex> lock(_lock);
-        std::vector<TUniqueId> ins_ids;
-        context->instance_ids(ins_ids);
         // TODO: simplify this mapping
-        for (const auto& ins_id : ins_ids) {
-            _pipeline_map.insert({ins_id, context});
-        }
+        _pipeline_map.insert({{params.query_id, params.fragment_id}, context});
     }
     query_ctx->set_pipeline_context(params.fragment_id, context);
 
@@ -877,31 +865,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, 
const Status reason) {
               << " is cancelled and removed. Reason: " << reason.to_string();
 }
 
-void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status 
reason) {
-    std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
-    {
-        std::lock_guard<std::mutex> state_lock(_lock);
-        DCHECK(!_pipeline_map.contains(instance_id))
-                << " Pipeline tasks should be canceled by query instead of 
instance! Query ID: "
-                << print_id(_pipeline_map[instance_id]->get_query_id());
-        const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
-        if (is_pipeline_instance) {
-            auto itr = _pipeline_map.find(instance_id);
-            if (itr != _pipeline_map.end()) {
-                pipeline_ctx = itr->second;
-            } else {
-                LOG(WARNING) << "Could not find the pipeline instance id:" << 
print_id(instance_id)
-                             << " to cancel";
-                return;
-            }
-        }
-    }
-
-    if (pipeline_ctx != nullptr) {
-        pipeline_ctx->cancel(reason);
-    }
-}
-
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
 
@@ -1167,15 +1130,16 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
 
     RuntimeFilterMgr* runtime_filter_mgr = nullptr;
 
-    const auto& fragment_instance_ids = request->fragment_instance_ids();
+    const auto& fragment_ids = request->fragment_ids();
     {
         std::unique_lock<std::mutex> lock(_lock);
-        for (UniqueId fragment_instance_id : fragment_instance_ids) {
-            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
+        for (auto fragment_id : fragment_ids) {
             if (is_pipeline) {
-                auto iter = _pipeline_map.find(tfragment_instance_id);
+                auto iter = _pipeline_map.find(
+                        {UniqueId(request->query_id()).to_thrift(), 
fragment_id});
                 if (iter == _pipeline_map.end()) {
+                    LOG(WARNING) << "No pipeline fragment is found: Query-ID = 
"
+                                 << request->query_id() << " fragment_id = " 
<< fragment_id;
                     continue;
                 }
                 pip_context = iter->second;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index bc066066f7b..41b63db0b23 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -100,9 +100,6 @@ public:
     Status trigger_pipeline_context_report(const ReportStatusRequest,
                                            
std::shared_ptr<pipeline::PipelineFragmentContext>&&);
 
-    // Cancel instance (pipeline or nonpipeline).
-    void cancel_instance(const TUniqueId instance_id, const Status reason);
-
     // Can be used in both version.
     void cancel_query(const TUniqueId query_id, const Status reason);
 
@@ -169,7 +166,10 @@ private:
     // call _lock, so that there is dead lock.
     std::mutex _lock;
 
-    std::unordered_map<TUniqueId, 
std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
+    // (QueryID, FragmentID) -> PipelineFragmentContext
+    std::unordered_map<std::pair<TUniqueId, int>,
+                       std::shared_ptr<pipeline::PipelineFragmentContext>>
+            _pipeline_map;
 
     // query id -> QueryContext
     std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d1d78573923..9d499f3487e 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -193,8 +193,6 @@ public:
 
     ThreadPool* get_memtable_flush_pool();
 
-    std::vector<TUniqueId> get_fragment_instance_ids() const { return 
fragment_instance_ids; }
-
     int64_t mem_limit() const { return _bytes_limit; }
 
     void set_merge_controller_handler(
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 5b865f28dce..08a229c0ecf 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -228,7 +228,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     // so we need to copy to cnt_val
     cnt_val->producer_size = producer_size;
     cnt_val->runtime_filter_desc = *runtime_filter_desc;
-    cnt_val->target_info = *target_info;
     cnt_val->pool.reset(new ObjectPool());
     cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, 
runtime_filter_desc));
 
@@ -460,10 +459,17 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             closure->cntl_->set_timeout_ms(std::min(3600, 
_state->execution_timeout) * 1000);
             closure->cntl_->ignore_eovercrowded();
             // set fragment-id
-            for (auto& target_fragment_instance_id : 
target.target_fragment_instance_ids) {
-                PUniqueId* cur_id = 
closure->request_->add_fragment_instance_ids();
-                cur_id->set_hi(target_fragment_instance_id.hi);
-                cur_id->set_lo(target_fragment_instance_id.lo);
+            if (target.__isset.target_fragment_ids) {
+                for (auto& target_fragment_id : target.target_fragment_ids) {
+                    closure->request_->add_fragment_ids(target_fragment_id);
+                }
+            } else {
+                // FE not upgraded yet.
+                for (auto& target_fragment_instance_id : 
target.target_fragment_instance_ids) {
+                    PUniqueId* cur_id = 
closure->request_->add_fragment_instance_ids();
+                    cur_id->set_hi(target_fragment_instance_id.hi);
+                    cur_id->set_lo(target_fragment_instance_id.lo);
+                }
             }
 
             std::shared_ptr<PBackendService_Stub> stub(
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index d89a3b9f1b1..b0aea7568cf 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -168,7 +168,6 @@ public:
         int producer_size;
         uint64_t global_size;
         TRuntimeFilterDesc runtime_filter_desc;
-        std::vector<doris::TRuntimeFilterTargetParams> target_info;
         std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
         IRuntimeFilter* filter = nullptr;
         std::unordered_set<UniqueId> arrive_id;
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index aa29661da02..d56aa49b19b 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -657,13 +657,6 @@ Status BaseBackendService::start_plan_fragment_execution(
                                                          
QuerySource::INTERNAL_FRONTEND);
 }
 
-void BaseBackendService::cancel_plan_fragment(TCancelPlanFragmentResult& 
return_val,
-                                              const TCancelPlanFragmentParams& 
params) {
-    LOG(INFO) << "cancel_plan_fragment(): instance_id=" << 
print_id(params.fragment_instance_id);
-    _exec_env->fragment_mgr()->cancel_instance(
-            params.fragment_instance_id, Status::InternalError("cancel message 
received from FE"));
-}
-
 void BaseBackendService::transmit_data(TTransmitDataResult& return_val,
                                        const TTransmitDataParams& params) {
     VLOG_ROW << "transmit_data(): instance_id=" << 
params.dest_fragment_instance_id
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 4d01107ba8a..1d4219e2191 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -90,7 +90,7 @@ public:
                             const TExecPlanFragmentParams& params) override;
 
     void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
-                              const TCancelPlanFragmentParams& params) 
override;
+                              const TCancelPlanFragmentParams& params) 
override {};
 
     void transmit_data(TTransmitDataResult& return_val, const 
TTransmitDataParams& params) override;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5b977a87f1f..8e580c549df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1997,7 +1997,8 @@ public class Coordinator implements CoordInterface {
                 List<FRuntimeFilterTargetParam> targetFragments = 
ridToTargetParam.computeIfAbsent(rid,
                         k -> new ArrayList<>());
                 for (final FInstanceExecParam instance : 
params.instanceExecParams) {
-                    targetFragments.add(new 
FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host)));
+                    targetFragments.add(new 
FRuntimeFilterTargetParam(instance.fragment().getFragmentId().asInt(),
+                            toBrpcHost(instance.host)));
                 }
             }
 
@@ -3181,8 +3182,8 @@ public class Coordinator implements CoordInterface {
                             for (FRuntimeFilterTargetParam targetParam : 
fParams) {
                                 if 
(targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
                                     
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
-                                            .target_fragment_instance_ids
-                                            
.add(targetParam.targetFragmentInstanceId);
+                                            .target_fragment_ids
+                                            .add(targetParam.targetFragmentId);
                                 } else {
                                     
targetParamsV2.put(targetParam.targetFragmentInstanceAddr,
                                             new 
TRuntimeFilterTargetParamsV2());
@@ -3190,11 +3191,15 @@ public class Coordinator implements CoordInterface {
                                             .target_fragment_instance_addr
                                             = 
targetParam.targetFragmentInstanceAddr;
                                     
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
-                                            .target_fragment_instance_ids
+                                            .target_fragment_ids
                                             = new ArrayList<>();
+                                    
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
+                                            .target_fragment_ids
+                                            .add(targetParam.targetFragmentId);
+                                    // `target_fragment_instance_ids` is a 
required field
                                     
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
                                             .target_fragment_instance_ids
-                                            
.add(targetParam.targetFragmentInstanceId);
+                                            = new ArrayList<>();
                                 }
                             }
 
@@ -3203,7 +3208,8 @@ public class Coordinator implements CoordInterface {
                         } else {
                             List<TRuntimeFilterTargetParams> targetParams = 
Lists.newArrayList();
                             for (FRuntimeFilterTargetParam targetParam : 
fParams) {
-                                targetParams.add(new 
TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
+                                // Instance id make no sense if this runtime 
filter doesn't have remote targets.
+                                targetParams.add(new 
TRuntimeFilterTargetParams(new TUniqueId(),
                                         
targetParam.targetFragmentInstanceAddr));
                             }
                             
localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(),
@@ -3373,12 +3379,12 @@ public class Coordinator implements CoordInterface {
 
     // Runtime filter target fragment instance param
     static class FRuntimeFilterTargetParam {
-        public TUniqueId targetFragmentInstanceId;
+        public int targetFragmentId;
 
         public TNetworkAddress targetFragmentInstanceAddr;
 
-        public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
-            this.targetFragmentInstanceId = id;
+        public FRuntimeFilterTargetParam(int id, TNetworkAddress host) {
+            this.targetFragmentId = id;
             this.targetFragmentInstanceAddr = host;
         }
     }
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 9abf9d7ea65..f3764cea233 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -613,6 +613,7 @@ message PPublishFilterRequestV2 {
     optional int64 merge_time = 9;
     optional bool contain_null = 10;
     optional bool ignored = 11;
+    repeated int32 fragment_ids = 12;
 };
 
 message PPublishFilterResponse {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index b560059819f..62a45260f80 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -374,6 +374,7 @@ struct TRuntimeFilterTargetParamsV2 {
   1: required list<Types.TUniqueId> target_fragment_instance_ids
   // The address of the instance where the fragment is expected to run
   2: required Types.TNetworkAddress target_fragment_instance_addr
+  3: optional list<i32> target_fragment_ids
 }
 
 struct TRuntimeFilterParams {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to