This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 877935442f3 [feature](pipelineX)use markFragments instead of 
markInstances in pipelineX  (#27829)
877935442f3 is described below

commit 877935442f3a0592737b5e11dd1bc46eba5d5849
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Mon Dec 11 17:59:53 2023 +0800

    [feature](pipelineX)use markFragments instead of markInstances in pipelineX 
 (#27829)
---
 be/src/exprs/runtime_filter.cpp                    |  38 ++--
 be/src/exprs/runtime_filter.h                      |  34 ++--
 be/src/exprs/runtime_filter_rpc.cpp                |  16 +-
 be/src/pipeline/pipeline_fragment_context.h        |   7 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 208 ++++++++++++---------
 .../pipeline_x/pipeline_x_fragment_context.h       |  41 ++--
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |   9 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |   3 +
 be/src/runtime/fragment_mgr.cpp                    | 132 ++++++++-----
 be/src/runtime/fragment_mgr.h                      |   8 +
 be/src/runtime/query_context.h                     |   5 +
 be/src/runtime/runtime_filter_mgr.cpp              |  13 +-
 be/src/runtime/runtime_filter_mgr.h                |  13 +-
 be/src/runtime/runtime_state.cpp                   |  63 ++++++-
 be/src/runtime/runtime_state.h                     |  43 ++++-
 be/src/service/internal_service.cpp                |  17 +-
 be/test/exprs/runtime_filter_test.cpp              |   5 +-
 .../doris/common/profile/ExecutionProfile.java     | 135 ++++++++++++-
 .../main/java/org/apache/doris/qe/Coordinator.java | 161 +++++++++++-----
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  18 ++
 gensrc/proto/internal_service.proto                |   2 +
 21 files changed, 710 insertions(+), 261 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index de3f2ad4fd5..efdb8af7029 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -279,20 +279,20 @@ Status create_vbin_predicate(const TypeDescriptor& type, 
TExprOpcode::type opcod
 // This class is a wrapper of runtime predicate function
 class RuntimePredicateWrapper {
 public:
-    RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool,
+    RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
                             const RuntimeFilterParams* params)
             : _state(state),
-              _be_exec_version(_state->be_exec_version()),
+              _be_exec_version(_state->be_exec_version),
               _pool(pool),
               _column_return_type(params->column_return_type),
               _filter_type(params->filter_type),
               _filter_id(params->filter_id) {}
     // for a 'tmp' runtime predicate wrapper
     // only could called assign method or as a param for merge
-    RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, 
PrimitiveType column_type,
-                            RuntimeFilterType type, uint32_t filter_id)
+    RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
+                            PrimitiveType column_type, RuntimeFilterType type, 
uint32_t filter_id)
             : _state(state),
-              _be_exec_version(_state->be_exec_version()),
+              _be_exec_version(_state->be_exec_version),
               _pool(pool),
               _column_return_type(column_type),
               _filter_type(type),
@@ -945,7 +945,7 @@ public:
     }
 
 private:
-    RuntimeState* _state;
+    RuntimeFilterParamsContext* _state;
     QueryContext* _query_ctx;
     int _be_exec_version;
     ObjectPool* _pool;
@@ -962,9 +962,10 @@ private:
     uint32_t _filter_id;
 };
 
-Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const 
TRuntimeFilterDesc* desc,
-                              const TQueryOptions* query_options, const 
RuntimeFilterRole role,
-                              int node_id, IRuntimeFilter** res, bool 
build_bf_exactly) {
+Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
+                              const TRuntimeFilterDesc* desc, const 
TQueryOptions* query_options,
+                              const RuntimeFilterRole role, int node_id, 
IRuntimeFilter** res,
+                              bool build_bf_exactly) {
     *res = pool->add(new IRuntimeFilter(state, pool, desc));
     (*res)->set_role(role);
     return (*res)->init_with_desc(desc, query_options, node_id, 
build_bf_exactly);
@@ -1003,7 +1004,7 @@ Status IRuntimeFilter::publish() {
     DCHECK(is_producer());
     if (_has_local_target) {
         std::vector<IRuntimeFilter*> filters;
-        
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filters(_filter_id, 
filters));
+        
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, 
filters));
         // push down
         for (auto filter : filters) {
             filter->_wrapper = _wrapper;
@@ -1014,7 +1015,7 @@ Status IRuntimeFilter::publish() {
     } else {
         TNetworkAddress addr;
         DCHECK(_state != nullptr);
-        RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr));
+        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
         return push_to_remote(_state, &addr, _opt_remote_rf);
     }
 }
@@ -1036,9 +1037,9 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
     auto execution_timeout = _state == nullptr ? 
_query_ctx->execution_timeout() * 1000
-                                               : _state->execution_timeout() * 
1000;
+                                               : _state->execution_timeout * 
1000;
     auto runtime_filter_wait_time_ms = _state == nullptr ? 
_query_ctx->runtime_filter_wait_time_ms()
-                                                         : 
_state->runtime_filter_wait_time_ms();
+                                                         : 
_state->runtime_filter_wait_time_ms;
     // bitmap filter is precise filter and only filter once, so it must be 
applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
                                     ? execution_timeout
@@ -1234,14 +1235,14 @@ Status 
IRuntimeFilter::serialize(PPublishFilterRequestV2* request, void** data,
     return serialize_impl(request, data, len);
 }
 
-Status IRuntimeFilter::create_wrapper(RuntimeState* state, const 
MergeRuntimeFilterParams* param,
-                                      ObjectPool* pool,
+Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
+                                      const MergeRuntimeFilterParams* param, 
ObjectPool* pool,
                                       
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
     return _create_wrapper(state, param, pool, wrapper);
 }
 
-Status IRuntimeFilter::create_wrapper(RuntimeState* state, const 
UpdateRuntimeFilterParams* param,
-                                      ObjectPool* pool,
+Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
+                                      const UpdateRuntimeFilterParams* param, 
ObjectPool* pool,
                                       
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
     return _create_wrapper(state, param, pool, wrapper);
 }
@@ -1290,7 +1291,8 @@ Status IRuntimeFilter::init_bloom_filter(const size_t 
build_bf_cardinality) {
 }
 
 template <class T>
-Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, 
ObjectPool* pool,
+Status IRuntimeFilter::_create_wrapper(RuntimeFilterParamsContext* state, 
const T* param,
+                                       ObjectPool* pool,
                                        
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
     int filter_type = param->request->filter_type();
     PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 97078c11757..cc47d590e6b 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -185,7 +185,8 @@ enum RuntimeFilterState {
 /// that can be pushed down to node based on the results of the right table.
 class IRuntimeFilter {
 public:
-    IRuntimeFilter(RuntimeState* state, ObjectPool* pool, const 
TRuntimeFilterDesc* desc)
+    IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool,
+                   const TRuntimeFilterDesc* desc)
             : _state(state),
               _pool(pool),
               _filter_id(desc->filter_id),
@@ -199,9 +200,9 @@ public:
               _always_true(false),
               _is_ignored(false),
               registration_time_(MonotonicMillis()),
-              _wait_infinitely(_state->runtime_filter_wait_infinitely()),
-              _rf_wait_time_ms(_state->runtime_filter_wait_time_ms()),
-              _enable_pipeline_exec(_state->enable_pipeline_exec()),
+              _wait_infinitely(_state->runtime_filter_wait_infinitely),
+              _rf_wait_time_ms(_state->runtime_filter_wait_time_ms),
+              _enable_pipeline_exec(_state->enable_pipeline_exec),
               _runtime_filter_type(get_runtime_filter_type(desc)),
               _name(fmt::format("RuntimeFilter: (id = {}, type = {})", 
_filter_id,
                                 to_string(_runtime_filter_type))),
@@ -231,9 +232,10 @@ public:
 
     ~IRuntimeFilter() = default;
 
-    static Status create(RuntimeState* state, ObjectPool* pool, const 
TRuntimeFilterDesc* desc,
-                         const TQueryOptions* query_options, const 
RuntimeFilterRole role,
-                         int node_id, IRuntimeFilter** res, bool 
build_bf_exactly = false);
+    static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
+                         const TRuntimeFilterDesc* desc, const TQueryOptions* 
query_options,
+                         const RuntimeFilterRole role, int node_id, 
IRuntimeFilter** res,
+                         bool build_bf_exactly = false);
 
     static Status create(QueryContext* query_ctx, ObjectPool* pool, const 
TRuntimeFilterDesc* desc,
                          const TQueryOptions* query_options, const 
RuntimeFilterRole role,
@@ -299,11 +301,11 @@ public:
     Status merge_from(const RuntimePredicateWrapper* wrapper);
 
     // for ut
-    static Status create_wrapper(RuntimeState* state, const 
MergeRuntimeFilterParams* param,
-                                 ObjectPool* pool,
+    static Status create_wrapper(RuntimeFilterParamsContext* state,
+                                 const MergeRuntimeFilterParams* param, 
ObjectPool* pool,
                                  std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
-    static Status create_wrapper(RuntimeState* state, const 
UpdateRuntimeFilterParams* param,
-                                 ObjectPool* pool,
+    static Status create_wrapper(RuntimeFilterParamsContext* state,
+                                 const UpdateRuntimeFilterParams* param, 
ObjectPool* pool,
                                  std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
     static Status create_wrapper(QueryContext* query_ctx, const 
UpdateRuntimeFilterParamsV2* param,
                                  ObjectPool* pool,
@@ -325,7 +327,8 @@ public:
     Status join_rpc();
 
     // async push runtimefilter to remote node
-    Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, 
bool opt_remote_rf);
+    Status push_to_remote(RuntimeFilterParamsContext* state, const 
TNetworkAddress* addr,
+                          bool opt_remote_rf);
 
     void init_profile(RuntimeProfile* parent_profile);
 
@@ -367,7 +370,7 @@ public:
     int32_t wait_time_ms() const {
         int32_t res = 0;
         if (wait_infinitely()) {
-            res = _state == nullptr ? _query_ctx->execution_timeout() : 
_state->execution_timeout();
+            res = _state == nullptr ? _query_ctx->execution_timeout() : 
_state->execution_timeout;
             // Convert to ms
             res *= 1000;
         } else {
@@ -391,7 +394,8 @@ protected:
     Status serialize_impl(T* request, void** data, int* len);
 
     template <class T>
-    static Status _create_wrapper(RuntimeState* state, const T* param, 
ObjectPool* pool,
+    static Status _create_wrapper(RuntimeFilterParamsContext* state, const T* 
param,
+                                  ObjectPool* pool,
                                   std::unique_ptr<RuntimePredicateWrapper>* 
wrapper);
 
     void _set_push_down() { _is_push_down = true; }
@@ -419,7 +423,7 @@ protected:
         }
     }
 
-    RuntimeState* _state = nullptr;
+    RuntimeFilterParamsContext* _state = nullptr;
     QueryContext* _query_ctx = nullptr;
     ObjectPool* _pool = nullptr;
     // _wrapper is a runtime filter function wrapper
diff --git a/be/src/exprs/runtime_filter_rpc.cpp 
b/be/src/exprs/runtime_filter_rpc.cpp
index 00540b8382c..a9aa7944625 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -48,12 +48,12 @@ struct IRuntimeFilter::RPCContext {
     static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished = 
true; }
 };
 
-Status IRuntimeFilter::push_to_remote(RuntimeState* state, const 
TNetworkAddress* addr,
-                                      bool opt_remote_rf) {
+Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state,
+                                      const TNetworkAddress* addr, bool 
opt_remote_rf) {
     DCHECK(is_producer());
     DCHECK(_rpc_context == nullptr);
     std::shared_ptr<PBackendService_Stub> stub(
-            
state->exec_env()->brpc_internal_client_cache()->get_client(*addr));
+            state->exec_env->brpc_internal_client_cache()->get_client(*addr));
     if (!stub) {
         std::string msg =
                 fmt::format("Get rpc stub failed, host={},  port=", 
addr->hostname, addr->port);
@@ -64,16 +64,16 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, 
const TNetworkAddress
     int len = 0;
 
     auto pquery_id = _rpc_context->request.mutable_query_id();
-    pquery_id->set_hi(_state->query_id().hi);
-    pquery_id->set_lo(_state->query_id().lo);
+    pquery_id->set_hi(_state->query_id.hi());
+    pquery_id->set_lo(_state->query_id.lo());
 
     auto pfragment_instance_id = 
_rpc_context->request.mutable_fragment_instance_id();
-    pfragment_instance_id->set_hi(state->fragment_instance_id().hi);
-    pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
+    pfragment_instance_id->set_hi(state->fragment_instance_id.hi());
+    pfragment_instance_id->set_lo(state->fragment_instance_id.lo());
 
     _rpc_context->request.set_filter_id(_filter_id);
     _rpc_context->request.set_opt_remote_rf(opt_remote_rf);
-    _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
+    _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec);
     _rpc_context->cntl.set_timeout_ms(wait_time_ms());
     _rpc_context->cid = _rpc_context->cntl.call_id();
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index e95bef870a3..0800da22ad4 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -74,10 +74,15 @@ public:
 
     TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; 
}
 
-    virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) 
{
+    RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) {
         return _runtime_state.get();
     }
 
+    virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId 
/*fragment_instance_id*/) {
+        return _runtime_state->runtime_filter_mgr();
+    }
+
+    QueryContext* get_query_ctx() { return _runtime_state->get_query_ctx(); }
     // should be protected by lock?
     [[nodiscard]] bool is_canceled() const { return 
_runtime_state->is_cancelled(); }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index ac19c92ff55..724d50f55e4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -100,11 +100,6 @@
 
 namespace doris::pipeline {
 
-#define FOR_EACH_RUNTIME_STATE(stmt)              \
-    for (auto& runtime_state : _runtime_states) { \
-        stmt                                      \
-    }
-
 PipelineXFragmentContext::PipelineXFragmentContext(
         const TUniqueId& query_id, const int fragment_id, 
std::shared_ptr<QueryContext> query_ctx,
         ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>& 
call_back,
@@ -114,10 +109,13 @@ PipelineXFragmentContext::PipelineXFragmentContext(
 
 PipelineXFragmentContext::~PipelineXFragmentContext() {
     auto st = _query_ctx->exec_status();
-    if (!_runtime_states.empty()) {
+    if (!_task_runtime_states.empty()) {
         // The memory released by the query end is recorded in the query mem 
tracker, main memory in _runtime_state.
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
-        FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st); 
runtime_state.reset();)
+        for (auto& runtime_state : _task_runtime_states) {
+            _call_back(runtime_state.get(), &st);
+            runtime_state.reset();
+        }
     } else {
         _call_back(nullptr, &st);
     }
@@ -136,8 +134,9 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
     }
     if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) {
         if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
-            FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext 
cancel instance: "
-                                                << 
print_id(runtime_state->fragment_instance_id());)
+            for (auto& id : _fragment_instance_ids) {
+                LOG(WARNING) << "PipelineXFragmentContext cancel instance: " 
<< print_id(id);
+            }
         } else {
             _set_is_report_on_cancel(false); // TODO bug llj
         }
@@ -229,7 +228,10 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
     static_cast<void>(root_pipeline->set_sink(_sink));
 
-    RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets, 
request.bucket_seq_to_instance_idx));
+    if (_enable_local_shuffle()) {
+        RETURN_IF_ERROR(
+                _plan_local_exchange(request.num_buckets, 
request.bucket_seq_to_instance_idx));
+    }
 
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
@@ -451,43 +453,84 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         const doris::TPipelineFragmentParams& request) {
     _total_tasks = 0;
     int target_size = request.local_params.size();
-    _runtime_states.resize(target_size);
     _tasks.resize(target_size);
+    auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
+    DCHECK(pipeline_id_to_profile.empty());
+    pipeline_id_to_profile.resize(_pipelines.size());
+    {
+        size_t pip_idx = 0;
+        for (auto& pipeline_profile : pipeline_id_to_profile) {
+            pipeline_profile =
+                    std::make_unique<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_idx));
+            pip_idx++;
+        }
+    }
+
     for (size_t i = 0; i < target_size; i++) {
         const auto& local_params = request.local_params[i];
+        auto fragment_instance_id = local_params.fragment_instance_id;
+        _fragment_instance_ids.push_back(fragment_instance_id);
+        std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
+        auto set_runtime_state = [&](std::unique_ptr<RuntimeState>& 
runtime_state) {
+            
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
 
-        _runtime_states[i] = RuntimeState::create_unique(
-                local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
-                request.query_options, _query_ctx->query_globals, _exec_env);
-        if (local_params.__isset.runtime_filter_params) {
-            
_runtime_states[i]->set_runtime_filter_params(local_params.runtime_filter_params);
-        }
-        _runtime_states[i]->set_query_ctx(_query_ctx.get());
-        
_runtime_states[i]->set_query_mem_tracker(_query_ctx->query_mem_tracker);
+            runtime_state->set_query_ctx(_query_ctx.get());
+            runtime_state->set_be_number(local_params.backend_num);
 
-        static_cast<void>(_runtime_states[i]->runtime_filter_mgr()->init());
-        _runtime_states[i]->set_be_number(local_params.backend_num);
+            if (request.__isset.backend_id) {
+                runtime_state->set_backend_id(request.backend_id);
+            }
+            if (request.__isset.import_label) {
+                runtime_state->set_import_label(request.import_label);
+            }
+            if (request.__isset.db_name) {
+                runtime_state->set_db_name(request.db_name);
+            }
+            if (request.__isset.load_job_id) {
+                runtime_state->set_load_job_id(request.load_job_id);
+            }
 
-        if (request.__isset.backend_id) {
-            _runtime_states[i]->set_backend_id(request.backend_id);
-        }
-        if (request.__isset.import_label) {
-            _runtime_states[i]->set_import_label(request.import_label);
-        }
-        if (request.__isset.db_name) {
-            _runtime_states[i]->set_db_name(request.db_name);
+            runtime_state->set_desc_tbl(_desc_tbl);
+            
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
+            runtime_state->set_num_per_fragment_instances(request.num_senders);
+            runtime_state->resize_op_id_to_local_state(max_operator_id(), 
max_sink_operator_id());
+            
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+            runtime_state->set_total_load_streams(request.total_load_streams);
+            runtime_state->set_num_local_sink(request.num_local_sink);
+            DCHECK(runtime_filter_mgr);
+            
runtime_state->set_pipeline_x_runtime_filter_mgr(runtime_filter_mgr.get());
+        };
+
+        auto filterparams = std::make_unique<RuntimeFilterParamsContext>();
+
+        {
+            filterparams->runtime_filter_wait_infinitely =
+                    _runtime_state->runtime_filter_wait_infinitely();
+            filterparams->runtime_filter_wait_time_ms =
+                    _runtime_state->runtime_filter_wait_time_ms();
+            filterparams->enable_pipeline_exec = 
_runtime_state->enable_pipeline_exec();
+            filterparams->execution_timeout = 
_runtime_state->execution_timeout();
+
+            filterparams->exec_env = ExecEnv::GetInstance();
+            filterparams->query_id.set_hi(_runtime_state->query_id().hi);
+            filterparams->query_id.set_lo(_runtime_state->query_id().lo);
+
+            filterparams->fragment_instance_id.set_hi(fragment_instance_id.hi);
+            filterparams->fragment_instance_id.set_lo(fragment_instance_id.lo);
+            filterparams->be_exec_version = _runtime_state->be_exec_version();
+            filterparams->query_ctx = _query_ctx.get();
         }
-        if (request.__isset.load_job_id) {
-            _runtime_states[i]->set_load_job_id(request.load_job_id);
+
+        // build runtime_filter_mgr for each instance
+        runtime_filter_mgr =
+                std::make_unique<RuntimeFilterMgr>(request.query_id, 
filterparams.get());
+        if (local_params.__isset.runtime_filter_params) {
+            
runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params);
         }
+        RETURN_IF_ERROR(runtime_filter_mgr->init());
+        filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
 
-        _runtime_states[i]->set_desc_tbl(_desc_tbl);
-        
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
-        
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
-        _runtime_states[i]->resize_op_id_to_local_state(max_operator_id(), 
max_sink_operator_id());
-        
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
-        _runtime_states[i]->set_total_load_streams(request.total_load_streams);
-        _runtime_states[i]->set_num_local_sink(request.num_local_sink);
+        _runtime_filter_states.push_back(std::move(filterparams));
         std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
         auto get_local_exchange_state = [&](PipelinePtr pipeline)
                 -> std::map<int, std::shared_ptr<LocalExchangeSharedState>> {
@@ -504,12 +547,25 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             }
             return le_state_map;
         };
-        for (auto& pipeline : _pipelines) {
+        auto get_task_runtime_state = [&](int task_id) -> RuntimeState* {
+            DCHECK(_task_runtime_states[task_id]);
+            return _task_runtime_states[task_id].get();
+        };
+        for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+            auto& pipeline = _pipelines[pip_idx];
             if (pipeline->need_to_create_task()) {
-                auto task = std::make_unique<PipelineXTask>(pipeline, 
_total_tasks++,
-                                                            
_runtime_states[i].get(), this,
-                                                            
_runtime_states[i]->runtime_profile(),
-                                                            
get_local_exchange_state(pipeline), i);
+                // build task runtime state
+                _task_runtime_states.push_back(RuntimeState::create_unique(
+                        this, local_params.fragment_instance_id, 
request.query_id,
+                        request.fragment_id, request.query_options, 
_query_ctx->query_globals,
+                        _exec_env));
+                auto& task_runtime_state = _task_runtime_states.back();
+                set_runtime_state(task_runtime_state);
+                auto cur_task_id = _total_tasks++;
+                auto task = std::make_unique<PipelineXTask>(
+                        pipeline, cur_task_id, 
get_task_runtime_state(cur_task_id), this,
+                        pipeline_id_to_profile[pip_idx].get(), 
get_local_exchange_state(pipeline),
+                        i);
                 pipeline_id_to_task.insert({pipeline->id(), task.get()});
                 _tasks[i].emplace_back(std::move(task));
             }
@@ -533,32 +589,18 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
          * and JoinProbeOperator2.
          */
 
-        // First, set up the parent profile,
-        // then prepare the task profile and add it to 
operator_id_to_task_profile.
-        std::vector<RuntimeProfile*> operator_id_to_task_profile(
-                max_operator_id(), _runtime_states[i]->runtime_profile());
-        auto prepare_and_set_parent_profile = [&](PipelineXTask* task) {
-            auto sink = task->sink();
-            const auto& dests_id = sink->dests_id();
-            int dest_id = dests_id.front();
-            DCHECK(dest_id < operator_id_to_task_profile.size());
-            task->set_parent_profile(operator_id_to_task_profile[dest_id]);
-
-            RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), 
local_params,
-                                          request.fragment.output_sink));
+        // First, set up the parent profile,task runtime state
 
-            for (auto o : task->operatorXs()) {
-                int id = o->operator_id();
-                DCHECK(id < operator_id_to_task_profile.size());
-                auto* op_local_state = 
_runtime_states[i].get()->get_local_state(o->operator_id());
-                operator_id_to_task_profile[id] = op_local_state->profile();
-            }
+        auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t 
pip_idx) {
+            DCHECK(pipeline_id_to_profile[pip_idx]);
+            
RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), 
local_params,
+                                          request.fragment.output_sink));
             return Status::OK();
         };
 
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
-                auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
+                auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
                 DCHECK(task != nullptr);
 
                 // if this task has upstream dependency, then record them.
@@ -571,15 +613,12 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                         }
                     }
                 }
-                RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
+                RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx));
             }
         }
-
         {
             std::lock_guard<std::mutex> l(_state_map_lock);
-            _instance_id_to_runtime_state.insert(
-                    {UniqueId(_runtime_states[i]->fragment_instance_id()),
-                     _runtime_states[i].get()});
+            _runtime_filter_mgr_map[fragment_instance_id] = 
std::move(runtime_filter_mgr);
         }
     }
     _pipeline_parent_map.clear();
@@ -692,7 +731,8 @@ Status PipelineXFragmentContext::_add_local_exchange(
         int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr 
cur_pipe,
         const std::vector<TExpr>& texprs, ExchangeType exchange_type, bool* 
do_local_exchange,
         int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx) 
{
-    if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
+    DCHECK(_enable_local_shuffle());
+    if (_num_instances <= 1) {
         return Status::OK();
     }
 
@@ -1144,17 +1184,17 @@ Status PipelineXFragmentContext::submit() {
 }
 
 void PipelineXFragmentContext::close_sink() {
-    FOR_EACH_RUNTIME_STATE(static_cast<void>(_sink->close(
-            runtime_state.get(),
-            _prepared ? Status::RuntimeError("prepare failed") : 
Status::OK())););
+    for (auto& tasks : _tasks) {
+        auto& root_task = *tasks.begin();
+        auto st = root_task->close_sink(_prepared ? 
Status::RuntimeError("prepare failed")
+                                                  : Status::OK());
+        if (!st.ok()) {
+            LOG_WARNING("PipelineXFragmentContext::close_sink() 
error").tag("msg", st.msg());
+        }
+    }
 }
 
 void PipelineXFragmentContext::close_if_prepare_failed() {
-    if (_tasks.empty()) {
-        FOR_EACH_RUNTIME_STATE(
-                static_cast<void>(_root_op->close(runtime_state.get())); 
static_cast<void>(
-                        _sink->close(runtime_state.get(), 
Status::RuntimeError("prepare failed")));)
-    }
     for (auto& task : _tasks) {
         for (auto& t : task) {
             DCHECK(!t->is_pending_finish());
@@ -1196,15 +1236,15 @@ Status PipelineXFragmentContext::send_report(bool done) 
{
         return Status::NeedSendAgain("");
     }
 
-    std::vector<RuntimeState*> runtime_states(_runtime_states.size());
-    for (size_t i = 0; i < _runtime_states.size(); i++) {
-        runtime_states[i] = _runtime_states[i].get();
-    }
+    std::vector<RuntimeState*> runtime_states;
 
+    for (auto& task_state : _task_runtime_states) {
+        runtime_states.push_back(task_state.get());
+    }
     return _report_status_cb(
-            {true, exec_status, runtime_states, nullptr, nullptr, done || 
!exec_status.ok(),
-             _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), 
_backend_num,
-             _runtime_state.get(),
+            {true, exec_status, runtime_states, nullptr, 
_runtime_state->load_channel_profile(),
+             done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, 
_fragment_id,
+             TUniqueId(), _backend_num, _runtime_state.get(),
              std::bind(&PipelineFragmentContext::update_status, this, 
std::placeholders::_1),
              std::bind(&PipelineFragmentContext::cancel, this, 
std::placeholders::_1,
                        std::placeholders::_2),
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index a95a90e356d..326f1f84254 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -19,6 +19,7 @@
 
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
 #include <stddef.h>
 #include <stdint.h>
 
@@ -69,16 +70,16 @@ public:
     ~PipelineXFragmentContext() override;
 
     void instance_ids(std::vector<TUniqueId>& ins_ids) const override {
-        ins_ids.resize(_runtime_states.size());
-        for (size_t i = 0; i < _runtime_states.size(); i++) {
-            ins_ids[i] = _runtime_states[i]->fragment_instance_id();
+        ins_ids.resize(_fragment_instance_ids.size());
+        for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
+            ins_ids[i] = _fragment_instance_ids[i];
         }
     }
 
     void instance_ids(std::vector<string>& ins_ids) const override {
-        ins_ids.resize(_runtime_states.size());
-        for (size_t i = 0; i < _runtime_states.size(); i++) {
-            ins_ids[i] = print_id(_runtime_states[i]->fragment_instance_id());
+        ins_ids.resize(_fragment_instance_ids.size());
+        for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
+            ins_ids[i] = print_id(_fragment_instance_ids[i]);
         }
     }
 
@@ -102,13 +103,9 @@ public:
 
     Status send_report(bool) override;
 
-    RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
-        std::lock_guard<std::mutex> l(_state_map_lock);
-        if (_instance_id_to_runtime_state.contains(fragment_instance_id)) {
-            return _instance_id_to_runtime_state[fragment_instance_id];
-        } else {
-            return _runtime_state.get();
-        }
+    RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId fragment_instance_id) 
override {
+        DCHECK(_runtime_filter_mgr_map.contains(fragment_instance_id));
+        return _runtime_filter_mgr_map[fragment_instance_id].get();
     }
 
     [[nodiscard]] int next_operator_id() { return _operator_id++; }
@@ -162,13 +159,12 @@ private:
 
     bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
 
+    bool _enable_local_shuffle() const { return 
_runtime_state->enable_local_shuffle(); }
+
     OperatorXPtr _root_op = nullptr;
     // this is a [n * m] matrix. n is parallelism of pipeline engine and m is 
the number of pipelines.
     std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;
 
-    // Local runtime states for each pipeline task.
-    std::vector<std::unique_ptr<RuntimeState>> _runtime_states;
-
     // It is used to manage the lifecycle of RuntimeFilterMergeController
     std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> 
_merge_controller_handlers;
 
@@ -219,6 +215,19 @@ private:
     int _operator_id = 0;
     int _sink_operator_id = 0;
     std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> 
_op_id_to_le_state;
+
+    // UniqueId -> runtime mgr
+    std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>> 
_runtime_filter_mgr_map;
+
+    //Here are two types of runtime states:
+    //    - _runtime state is at the Fragment level.
+    //    - _task_runtime_states is at the task level, unique to each task.
+
+    std::vector<TUniqueId> _fragment_instance_ids;
+    // Local runtime states for each task
+    std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
+
+    std::vector<std::unique_ptr<RuntimeFilterParamsContext>> 
_runtime_filter_states;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index cfe859b2e7c..5f07b79bebf 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -73,10 +73,11 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_CPU_TIMER(_task_cpu_timer);
     SCOPED_TIMER(_prepare_timer);
+    DCHECK_EQ(state, _state);
 
     {
         // set sink local state
-        LocalSinkStateInfo info {_parent_profile, local_params.sender_id,
+        LocalSinkStateInfo info {_task_profile.get(), local_params.sender_id,
                                  get_downstream_dependency(), _le_state_map, 
tsink};
         RETURN_IF_ERROR(_sink->setup_local_state(state, info));
     }
@@ -84,7 +85,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
     std::vector<TScanRangeParams> no_scan_ranges;
     auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
                                          _operators.front()->node_id(), 
no_scan_ranges);
-    auto* parent_profile = _parent_profile;
+    auto* parent_profile = _task_profile.get();
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
         auto& deps = get_upstream_dependency(op->operator_id());
@@ -330,6 +331,10 @@ Status PipelineXTask::close(Status exec_status) {
     return s;
 }
 
+Status PipelineXTask::close_sink(Status exec_status) {
+    return _sink->close(_state, exec_status);
+}
+
 std::string PipelineXTask::debug_string() {
     std::unique_lock<std::mutex> lc(_release_lock);
     fmt::memory_buffer debug_string_buffer;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 261ecb54f24..0bb3e16fc98 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -71,6 +71,7 @@ public:
     // must be call after all pipeline task is finish to release resource
     Status close(Status exec_status) override;
 
+    Status close_sink(Status exec_status);
     bool source_can_read() override {
         if (_dry_run) {
             return true;
@@ -123,6 +124,8 @@ public:
 
     OperatorXs operatorXs() { return _operators; }
 
+    int task_id() const { return _index; };
+
     void clear_blocking_state() {
         if (!is_final_state(get_state()) && get_state() != 
PipelineTaskState::PENDING_FINISH &&
             _blocked_dep) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 57074bc629c..3c795273a77 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -236,32 +236,44 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
         }
         if (req.is_pipeline_x) {
             params.__isset.detailed_report = true;
-            for (auto* rs : req.runtime_states) {
-                TDetailedReportParams detailed_param;
-                
detailed_param.__set_fragment_instance_id(rs->fragment_instance_id());
-                detailed_param.__isset.fragment_instance_id = true;
-
-                if (rs->enable_profile()) {
-                    detailed_param.__isset.profile = true;
-                    detailed_param.__isset.loadChannelProfile = true;
-
-                    rs->runtime_profile()->to_thrift(&detailed_param.profile);
+            DCHECK(!req.runtime_states.empty());
+            const bool enable_profile = 
(*req.runtime_states.begin())->enable_profile();
+            if (enable_profile) {
+                params.__isset.profile = true;
+                params.__isset.loadChannelProfile = false;
+                for (auto* rs : req.runtime_states) {
+                    DCHECK(req.load_channel_profile);
+                    TDetailedReportParams detailed_param;
                     
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
+                    // merge all runtime_states.loadChannelProfile to 
req.load_channel_profile
+                    
req.load_channel_profile->update(detailed_param.loadChannelProfile);
                 }
-
-                params.detailed_report.push_back(detailed_param);
+                
req.load_channel_profile->to_thrift(&params.loadChannelProfile);
+            } else {
+                params.__isset.profile = false;
             }
-        }
 
-        if (req.profile != nullptr) {
-            req.profile->to_thrift(&params.profile);
-            if (req.load_channel_profile) {
-                
req.load_channel_profile->to_thrift(&params.loadChannelProfile);
+            if (enable_profile) {
+                for (auto& pipeline_profile : 
req.runtime_state->pipeline_id_to_profile()) {
+                    TDetailedReportParams detailed_param;
+                    detailed_param.__isset.fragment_instance_id = false;
+                    detailed_param.__isset.profile = true;
+                    detailed_param.__isset.loadChannelProfile = false;
+                    pipeline_profile->to_thrift(&detailed_param.profile);
+                    params.detailed_report.push_back(detailed_param);
+                }
             }
-            params.__isset.profile = true;
-            params.__isset.loadChannelProfile = true;
         } else {
-            params.__isset.profile = false;
+            if (req.profile != nullptr) {
+                req.profile->to_thrift(&params.profile);
+                if (req.load_channel_profile) {
+                    
req.load_channel_profile->to_thrift(&params.loadChannelProfile);
+                }
+                params.__isset.profile = true;
+                params.__isset.loadChannelProfile = true;
+            } else {
+                params.__isset.profile = false;
+            }
         }
 
         if (!req.runtime_state->output_files().empty()) {
@@ -770,8 +782,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     g_fragmentmgr_prepare_latency << (duration_ns / 1000);
     std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
     // TODO need check the status, but when I add return_if_error the P0 will 
not pass
-    static_cast<void>(_runtimefilter_controller.add_entity(params, &handler,
-                                                           
fragment_executor->runtime_state()));
+    static_cast<void>(_runtimefilter_controller.add_entity(
+            params, &handler,
+            
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
     fragment_executor->set_merge_controller_handler(handler);
     {
         std::lock_guard<std::mutex> lock(_lock);
@@ -852,9 +865,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
         for (size_t i = 0; i < params.local_params.size(); i++) {
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
-            static_cast<void>(
-                    _runtimefilter_controller.add_entity(params, 
params.local_params[i], &handler,
-                                                         
context->get_runtime_state(UniqueId())));
+            static_cast<void>(_runtimefilter_controller.add_entity(
+                    params, params.local_params[i], &handler,
+                    
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
             context->set_merge_controller_handler(handler);
             const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
             {
@@ -933,7 +946,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
             static_cast<void>(_runtimefilter_controller.add_entity(
-                    params, local_params, &handler, 
context->get_runtime_state(UniqueId())));
+                    params, local_params, &handler,
+                    
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
             context->set_merge_controller_handler(handler);
 
             {
@@ -1010,9 +1024,15 @@ void FragmentMgr::cancel_query_unlocked(const TUniqueId& 
query_id,
         LOG(WARNING) << "Query " << print_id(query_id) << " does not exists, 
failed to cancel it";
         return;
     }
+    if (ctx->second->enable_pipeline_x_exec()) {
+        for (auto& [f_id, f_context] : 
ctx->second->fragment_id_to_pipeline_ctx) {
+            cancel_fragment_unlocked(query_id, f_id, reason, state_lock, msg);
+        }
 
-    for (auto it : ctx->second->fragment_instance_ids) {
-        cancel_instance_unlocked(it, reason, state_lock, msg);
+    } else {
+        for (auto it : ctx->second->fragment_instance_ids) {
+            cancel_instance_unlocked(it, reason, state_lock, msg);
+        }
     }
 
     ctx->second->cancel(true, msg, Status::Cancelled(msg));
@@ -1054,23 +1074,50 @@ void FragmentMgr::cancel_instance_unlocked(const 
TUniqueId& instance_id,
     }
 }
 
+void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t 
fragment_id,
+                                  const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
+    std::unique_lock<std::mutex> state_lock(_lock);
+    return cancel_fragment_unlocked(query_id, fragment_id, reason, state_lock, 
msg);
+}
+
+void FragmentMgr::cancel_fragment_unlocked(const TUniqueId& query_id, int32_t 
fragment_id,
+                                           const PPlanFragmentCancelReason& 
reason,
+                                           const std::unique_lock<std::mutex>& 
state_lock,
+                                           const std::string& msg) {
+    auto q_ctx = _query_ctx_map.find(query_id)->second;
+    auto f_context = q_ctx->fragment_id_to_pipeline_ctx.find(fragment_id);
+    if (f_context != q_ctx->fragment_id_to_pipeline_ctx.end()) {
+        f_context->second->cancel(reason, msg);
+    } else {
+        LOG(WARNING) << "Could not find the pipeline query id:" << 
print_id(query_id)
+                     << " fragment id:" << fragment_id << " to cancel";
+    }
+}
+
 bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
     std::lock_guard<std::mutex> lock(_lock);
     auto ctx = _query_ctx_map.find(query_id);
 
     if (ctx != _query_ctx_map.end()) {
         const bool is_pipeline_version = ctx->second->enable_pipeline_exec();
-        for (auto itr : ctx->second->fragment_instance_ids) {
-            if (is_pipeline_version) {
-                auto pipeline_ctx_iter = _pipeline_map.find(itr);
-                if (pipeline_ctx_iter != _pipeline_map.end() && 
pipeline_ctx_iter->second) {
-                    return pipeline_ctx_iter->second->is_canceled();
-                }
-            } else {
-                auto fragment_instance_itr = _fragment_instance_map.find(itr);
-                if (fragment_instance_itr != _fragment_instance_map.end() &&
-                    fragment_instance_itr->second) {
-                    return fragment_instance_itr->second->is_canceled();
+        const bool is_pipeline_x = ctx->second->enable_pipeline_x_exec();
+        if (is_pipeline_x) {
+            for (auto& [id, f_context] : 
ctx->second->fragment_id_to_pipeline_ctx) {
+                return f_context->is_canceled();
+            }
+        } else {
+            for (auto itr : ctx->second->fragment_instance_ids) {
+                if (is_pipeline_version) {
+                    auto pipeline_ctx_iter = _pipeline_map.find(itr);
+                    if (pipeline_ctx_iter != _pipeline_map.end() && 
pipeline_ctx_iter->second) {
+                        return pipeline_ctx_iter->second->is_canceled();
+                    }
+                } else {
+                    auto fragment_instance_itr = 
_fragment_instance_map.find(itr);
+                    if (fragment_instance_itr != _fragment_instance_map.end() 
&&
+                        fragment_instance_itr->second) {
+                        return fragment_instance_itr->second->is_canceled();
+                    }
                 }
             }
         }
@@ -1306,8 +1353,7 @@ Status FragmentMgr::apply_filter(const 
PPublishFilterRequest* request,
         pip_context = iter->second;
 
         DCHECK(pip_context != nullptr);
-        runtime_filter_mgr =
-                
pip_context->get_runtime_state(fragment_instance_id)->runtime_filter_mgr();
+        runtime_filter_mgr = 
pip_context->get_runtime_filter_mgr(fragment_instance_id);
     } else {
         std::unique_lock<std::mutex> lock(_lock);
         auto iter = _fragment_instance_map.find(tfragment_instance_id);
@@ -1349,9 +1395,7 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
             pip_context = iter->second;
 
             DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_runtime_state(fragment_instance_id)
-                                         ->get_query_ctx()
-                                         ->runtime_filter_mgr();
+            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
             pool = &pip_context->get_query_context()->obj_pool;
         } else {
             std::unique_lock<std::mutex> lock(_lock);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index a20da9387af..21d85503803 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -102,6 +102,14 @@ public:
                                   const PPlanFragmentCancelReason& reason,
                                   const std::unique_lock<std::mutex>& 
state_lock,
                                   const std::string& msg = "");
+    // Cancel fragment (only pipelineX).
+    // {query id fragment} -> PipelineXFragmentContext
+    void cancel_fragment(const TUniqueId& query_id, int32_t fragment_id,
+                         const PPlanFragmentCancelReason& reason, const 
std::string& msg = "");
+    void cancel_fragment_unlocked(const TUniqueId& query_id, int32_t 
fragment_id,
+                                  const PPlanFragmentCancelReason& reason,
+                                  const std::unique_lock<std::mutex>& 
state_lock,
+                                  const std::string& msg = "");
 
     // Can be used in both version.
     void cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 0e3a04f8998..6d392c56175 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -172,6 +172,11 @@ public:
                _query_options.enable_pipeline_engine;
     }
 
+    bool enable_pipeline_x_exec() const {
+        return _query_options.__isset.enable_pipeline_x_engine &&
+               _query_options.enable_pipeline_x_engine;
+    }
+
     int be_exec_version() const {
         if (!_query_options.__isset.be_exec_version) {
             return 0;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index bca128c652a..a2120c92389 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -51,7 +51,10 @@ struct AsyncRPCContext {
     brpc::CallId cid;
 };
 
-RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* 
state) : _state(state) {}
+RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, 
RuntimeFilterParamsContext* state) {
+    _state = state;
+    _state->runtime_filter_mgr = this;
+}
 
 RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* 
query_ctx)
         : _query_ctx(query_ctx) {}
@@ -133,8 +136,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
                                                build_bf_exactly));
         _consumer_map[key].emplace_back(node_id, filter);
     } else {
-        DCHECK(_state != nullptr);
-
         if (iter != _consumer_map.end()) {
             for (auto holder : iter->second) {
                 if (holder.node_id == node_id) {
@@ -475,7 +476,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
 
 Status RuntimeFilterMergeController::add_entity(
         const TExecPlanFragmentParams& params,
-        std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, 
RuntimeState* state) {
+        std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle,
+        RuntimeFilterParamsContext* state) {
     if (!params.params.__isset.runtime_filter_params ||
         params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) 
{
         return Status::OK();
@@ -506,7 +508,8 @@ Status RuntimeFilterMergeController::add_entity(
 
 Status RuntimeFilterMergeController::add_entity(
         const TPipelineFragmentParams& params, const TPipelineInstanceParams& 
local_params,
-        std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, 
RuntimeState* state) {
+        std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle,
+        RuntimeFilterParamsContext* state) {
     if (!local_params.__isset.runtime_filter_params ||
         local_params.runtime_filter_params.rid_to_runtime_filter.size() == 0) {
         return Status::OK();
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 939ee2c8139..5f9ee46d656 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -50,6 +50,7 @@ class RuntimeState;
 enum class RuntimeFilterRole;
 class RuntimePredicateWrapper;
 class QueryContext;
+struct RuntimeFilterParamsContext;
 
 /// producer:
 /// Filter filter;
@@ -65,7 +66,7 @@ class QueryContext;
 // RuntimeFilterMgr will be destroyed when RuntimeState is destroyed
 class RuntimeFilterMgr {
 public:
-    RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state);
+    RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* 
state);
 
     RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);
 
@@ -106,7 +107,7 @@ private:
     std::map<int32_t, std::vector<ConsumerFilterHolder>> _consumer_map;
     std::map<int32_t, IRuntimeFilter*> _producer_map;
 
-    RuntimeState* _state = nullptr;
+    RuntimeFilterParamsContext* _state = nullptr;
     QueryContext* _query_ctx = nullptr;
     std::unique_ptr<MemTracker> _tracker;
     ObjectPool _pool;
@@ -123,7 +124,7 @@ private:
 // the class is destroyed with the last fragment_exec.
 class RuntimeFilterMergeControllerEntity {
 public:
-    RuntimeFilterMergeControllerEntity(RuntimeState* state)
+    RuntimeFilterMergeControllerEntity(RuntimeFilterParamsContext* state)
             : _query_id(0, 0), _fragment_instance_id(0, 0), _state(state) {}
     ~RuntimeFilterMergeControllerEntity() = default;
 
@@ -172,7 +173,7 @@ private:
     using CntlValwithLock =
             std::pair<std::shared_ptr<RuntimeFilterCntlVal>, 
std::unique_ptr<std::mutex>>;
     std::map<int, CntlValwithLock> _filter_map;
-    RuntimeState* _state = nullptr;
+    RuntimeFilterParamsContext* _state = nullptr;
     bool _opt_remote_rf = true;
 };
 
@@ -188,11 +189,11 @@ public:
     // add_entity will return a exists entity
     Status add_entity(const TExecPlanFragmentParams& params,
                       std::shared_ptr<RuntimeFilterMergeControllerEntity>* 
handle,
-                      RuntimeState* state);
+                      RuntimeFilterParamsContext* state);
     Status add_entity(const TPipelineFragmentParams& params,
                       const TPipelineInstanceParams& local_params,
                       std::shared_ptr<RuntimeFilterMergeControllerEntity>* 
handle,
-                      RuntimeState* state);
+                      RuntimeFilterParamsContext* state);
     // thread safe
     // increase a reference count
     // if a query-id is not exist
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 43eff466019..9082a5a322d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -54,7 +54,6 @@ RuntimeState::RuntimeState(const TUniqueId& 
fragment_instance_id,
         : _profile("Fragment " + print_id(fragment_instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
-          _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
           _is_cancelled(false),
@@ -71,6 +70,8 @@ RuntimeState::RuntimeState(const TUniqueId& 
fragment_instance_id,
           _error_log_file(nullptr) {
     Status status = init(fragment_instance_id, query_options, query_globals, 
exec_env);
     DCHECK(status.ok());
+    _runtime_filter_mgr.reset(
+            new RuntimeFilterMgr(TUniqueId(), 
RuntimeFilterParamsContext::create(this)));
 }
 
 RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
@@ -79,7 +80,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
         : _profile("Fragment " + 
print_id(fragment_exec_params.fragment_instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
-          _runtime_filter_mgr(new 
RuntimeFilterMgr(fragment_exec_params.query_id, this)),
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
           _query_id(fragment_exec_params.query_id),
@@ -94,12 +94,14 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
           _normal_row_number(0),
           _error_row_number(0),
           _error_log_file(nullptr) {
-    if (fragment_exec_params.__isset.runtime_filter_params) {
-        
_runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
-    }
     Status status =
             init(fragment_exec_params.fragment_instance_id, query_options, 
query_globals, exec_env);
     DCHECK(status.ok());
+    _runtime_filter_mgr.reset(new 
RuntimeFilterMgr(fragment_exec_params.query_id,
+                                                   
RuntimeFilterParamsContext::create(this)));
+    if (fragment_exec_params.__isset.runtime_filter_params) {
+        
_runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
+    }
 }
 
 RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& 
query_id,
@@ -108,7 +110,36 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, 
const TUniqueId& query_
         : _profile("Fragment " + print_id(instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
-          _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)),
+          _data_stream_recvrs_pool(new ObjectPool()),
+          _unreported_error_idx(0),
+          _query_id(query_id),
+          _fragment_id(fragment_id),
+          _is_cancelled(false),
+          _per_fragment_instance_idx(0),
+          _num_rows_load_total(0),
+          _num_rows_load_filtered(0),
+          _num_rows_load_unselected(0),
+          _num_rows_filtered_in_strict_mode_partial_update(0),
+          _num_print_error_rows(0),
+          _num_bytes_load_total(0),
+          _num_finished_scan_range(0),
+          _normal_row_number(0),
+          _error_row_number(0),
+          _error_log_file(nullptr) {
+    [[maybe_unused]] auto status = init(instance_id, query_options, 
query_globals, exec_env);
+    DCHECK(status.ok());
+    _runtime_filter_mgr.reset(
+            new RuntimeFilterMgr(query_id, 
RuntimeFilterParamsContext::create(this)));
+}
+
+RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const 
TUniqueId& instance_id,
+                           const TUniqueId& query_id, int32_t fragment_id,
+                           const TQueryOptions& query_options, const 
TQueryGlobals& query_globals,
+                           ExecEnv* exec_env)
+        : _profile("Fragment " + print_id(instance_id)),
+          _load_channel_profile("<unnamed>"),
+          _obj_pool(new ObjectPool()),
+          _runtime_filter_mgr(nullptr),
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
           _query_id(query_id),
@@ -135,7 +166,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, 
int32_t fragment_id,
         : _profile("PipelineX  " + std::to_string(fragment_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
-          _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)),
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
           _query_id(query_id),
@@ -155,6 +185,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, 
int32_t fragment_id,
     // TODO: do we really need instance id?
     Status status = init(TUniqueId(), query_options, query_globals, exec_env);
     DCHECK(status.ok());
+    _runtime_filter_mgr.reset(
+            new RuntimeFilterMgr(query_id, 
RuntimeFilterParamsContext::create(this)));
 }
 
 RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
@@ -485,4 +517,21 @@ bool RuntimeState::enable_page_cache() const {
            (_query_options.__isset.enable_page_cache && 
_query_options.enable_page_cache);
 }
 
+RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* 
state) {
+    RuntimeFilterParamsContext* params = state->obj_pool()->add(new 
RuntimeFilterParamsContext());
+    params->runtime_filter_wait_infinitely = 
state->runtime_filter_wait_infinitely();
+    params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
+    params->enable_pipeline_exec = state->enable_pipeline_exec();
+    params->execution_timeout = state->execution_timeout();
+    params->runtime_filter_mgr = state->runtime_filter_mgr();
+    params->exec_env = state->exec_env();
+    params->query_id.set_hi(state->query_id().hi);
+    params->query_id.set_lo(state->query_id().lo);
+
+    params->fragment_instance_id.set_hi(state->fragment_instance_id().hi);
+    params->fragment_instance_id.set_lo(state->fragment_instance_id().lo);
+    params->be_exec_version = state->be_exec_version();
+    params->query_ctx = state->get_query_ctx();
+    return params;
+}
 } // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e37883abbe1..e064e6e7610 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -47,6 +47,7 @@ namespace doris {
 namespace pipeline {
 class PipelineXLocalStateBase;
 class PipelineXSinkLocalStateBase;
+class PipelineXFragmentContext;
 } // namespace pipeline
 
 class DescriptorTbl;
@@ -74,6 +75,11 @@ public:
                  const TQueryOptions& query_options, const TQueryGlobals& 
query_globals,
                  ExecEnv* exec_env);
 
+    // for only use in pipelineX
+    RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId& 
instance_id,
+                 const TUniqueId& query_id, int32 fragment_id, const 
TQueryOptions& query_options,
+                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
+
     // Used by pipelineX. This runtime state is only used for setup.
     RuntimeState(const TUniqueId& query_id, int32 fragment_id, const 
TQueryOptions& query_options,
                  const TQueryGlobals& query_globals, ExecEnv* exec_env);
@@ -437,7 +443,17 @@ public:
     // if load mem limit is not set, or is zero, using query mem limit instead.
     int64_t get_load_mem_limit();
 
-    RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); 
}
+    RuntimeFilterMgr* runtime_filter_mgr() {
+        if (_pipeline_x_runtime_filter_mgr) {
+            return _pipeline_x_runtime_filter_mgr;
+        } else {
+            return _runtime_filter_mgr.get();
+        }
+    }
+
+    void set_pipeline_x_runtime_filter_mgr(RuntimeFilterMgr* 
pipeline_x_runtime_filter_mgr) {
+        _pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr;
+    }
 
     void set_query_ctx(QueryContext* ctx) { _query_ctx = ctx; }
 
@@ -513,6 +529,8 @@ public:
 
     void resize_op_id_to_local_state(int operator_size, int sink_size);
 
+    auto& pipeline_id_to_profile() { return _pipeline_id_to_profile; }
+
 private:
     Status create_error_log_file();
 
@@ -531,6 +549,9 @@ private:
     // runtime filter
     std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
 
+    // owned by PipelineXFragmentContext
+    RuntimeFilterMgr* _pipeline_x_runtime_filter_mgr = nullptr;
+
     // Protects _data_stream_recvrs_pool
     std::mutex _data_stream_recvrs_lock;
 
@@ -623,10 +644,30 @@ private:
     // true if max_filter_ratio is 0
     bool _load_zero_tolerance = false;
 
+    std::vector<std::unique_ptr<RuntimeProfile>> _pipeline_id_to_profile;
+
     // prohibit copies
     RuntimeState(const RuntimeState&);
 };
 
+// from runtime state
+struct RuntimeFilterParamsContext {
+    RuntimeFilterParamsContext() = default;
+    static RuntimeFilterParamsContext* create(RuntimeState* state);
+
+    bool runtime_filter_wait_infinitely;
+    int32_t runtime_filter_wait_time_ms;
+    bool enable_pipeline_exec;
+    int32_t execution_timeout;
+    RuntimeFilterMgr* runtime_filter_mgr;
+    ExecEnv* exec_env;
+    PUniqueId query_id;
+    PUniqueId fragment_instance_id;
+    int be_exec_version;
+    QueryContext* query_ctx;
+    QueryContext* get_query_ctx() const { return query_ctx; }
+};
+
 #define RETURN_IF_CANCELLED(state)                                             
       \
     do {                                                                       
       \
         if (UNLIKELY((state)->is_cancelled())) return 
Status::Cancelled("Cancelled"); \
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 987a2106894..24152c67089 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -593,10 +593,19 @@ void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
                                  has_cancel_reason
                                          ? 
PPlanFragmentCancelReason_Name(request->cancel_reason())
                                          : "INTERNAL_ERROR");
-
-        _exec_env->fragment_mgr()->cancel_instance(
-                tid, has_cancel_reason ? request->cancel_reason()
-                                       : 
PPlanFragmentCancelReason::INTERNAL_ERROR);
+        if (request->has_fragment_id()) {
+            TUniqueId query_id;
+            query_id.__set_hi(request->query_id().hi());
+            query_id.__set_lo(request->query_id().lo());
+            _exec_env->fragment_mgr()->cancel_fragment(
+                    query_id, request->fragment_id(),
+                    has_cancel_reason ? request->cancel_reason()
+                                      : 
PPlanFragmentCancelReason::INTERNAL_ERROR);
+        } else {
+            _exec_env->fragment_mgr()->cancel_instance(
+                    tid, has_cancel_reason ? request->cancel_reason()
+                                           : 
PPlanFragmentCancelReason::INTERNAL_ERROR);
+        }
 
         // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
         st.to_protobuf(result->mutable_status());
diff --git a/be/test/exprs/runtime_filter_test.cpp 
b/be/test/exprs/runtime_filter_test.cpp
index b8cce3fbf7d..9739c3930ef 100644
--- a/be/test/exprs/runtime_filter_test.cpp
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -103,8 +103,9 @@ IRuntimeFilter* 
create_runtime_filter(TRuntimeFilterType::type type, TQueryOptio
     }
 
     IRuntimeFilter* runtime_filter = nullptr;
-    Status status = IRuntimeFilter::create(_runtime_stat, _obj_pool, &desc, 
options,
-                                           RuntimeFilterRole::PRODUCER, -1, 
&runtime_filter);
+    Status status = 
IRuntimeFilter::create(RuntimeFilterParamsContext::create(_runtime_stat),
+                                           _obj_pool, &desc, options, 
RuntimeFilterRole::PRODUCER,
+                                           -1, &runtime_filter);
 
     EXPECT_TRUE(status.ok()) << status.to_string();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 465bb977226..ecee299f104 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -63,9 +63,14 @@ public class ExecutionProfile {
     // Profile for load channels. Only for load job.
     private RuntimeProfile loadChannelProfile;
     // A countdown latch to mark the completion of each instance.
+    // use for old pipeline
     // instance id -> dummy value
     private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
 
+    // A countdown latch to mark the completion of each fragment. use for 
pipelineX
+    // fragmentId -> dummy value
+    private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;
+
     public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
         executionProfile = new RuntimeProfile("Execution Profile " + 
DebugUtil.printId(queryId));
         RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
@@ -79,7 +84,35 @@ public class ExecutionProfile {
         executionProfile.addChild(loadChannelProfile);
     }
 
-    public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> 
planNodeMap) {
+    private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String> 
planNodeMap) {
+        RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
+        for (int i = 0; i < fragmentProfiles.size(); ++i) {
+            RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
+            RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " 
+ i);
+            fragmentsProfile.addChild(newFragmentProfile);
+            List<RuntimeProfile> allPipelines = new 
ArrayList<RuntimeProfile>();
+            for (Pair<RuntimeProfile, Boolean> runtimeProfile : 
oldFragmentProfile.getChildList()) {
+                allPipelines.add(runtimeProfile.first);
+            }
+            int pipelineIdx = 0;
+            for (RuntimeProfile pipeline : allPipelines) {
+                List<RuntimeProfile> allPipelineTask = new 
ArrayList<RuntimeProfile>();
+                for (Pair<RuntimeProfile, Boolean> runtimeProfile : 
pipeline.getChildList()) {
+                    allPipelineTask.add(runtimeProfile.first);
+                }
+                RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
+                        "Pipeline : " + pipelineIdx + "(instance_num="
+                                + allPipelineTask.size() + ")",
+                        allPipelines.get(0).nodeId());
+                RuntimeProfile.mergeProfiles(allPipelineTask, 
mergedpipelineProfile, planNodeMap);
+                newFragmentProfile.addChild(mergedpipelineProfile);
+                pipelineIdx++;
+            }
+        }
+        return fragmentsProfile;
+    }
+
+    private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer, 
String> planNodeMap) {
         RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
         for (int i = 0; i < fragmentProfiles.size(); ++i) {
             RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
@@ -97,6 +130,54 @@ public class ExecutionProfile {
         return fragmentsProfile;
     }
 
+    public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> 
planNodeMap) {
+        if (enablePipelineX()) {
+            /*
+             * Fragment 0
+             * ---Pipeline 0
+             * ------pipelineTask 0
+             * ------pipelineTask 0
+             * ------pipelineTask 0
+             * ---Pipeline 1
+             * ------pipelineTask 1
+             * ---Pipeline 2
+             * ------pipelineTask 2
+             * ------pipelineTask 2
+             * Fragment 1
+             * ---Pipeline 0
+             * ------......
+             * ---Pipeline 1
+             * ------......
+             * ---Pipeline 2
+             * ------......
+             * ......
+             */
+            return getPipelineXAggregatedProfile(planNodeMap);
+        } else {
+            /*
+             * Fragment 0
+             * ---Instance 0
+             * ------pipelineTask 0
+             * ------pipelineTask 1
+             * ------pipelineTask 2
+             * ---Instance 1
+             * ------pipelineTask 0
+             * ------pipelineTask 1
+             * ------pipelineTask 2
+             * ---Instance 2
+             * ------pipelineTask 0
+             * ------pipelineTask 1
+             * ------pipelineTask 2
+             * Fragment 1
+             * ---Instance 0
+             * ---Instance 1
+             * ---Instance 2
+             * ......
+             */
+            return getNonPipelineXAggregatedProfile(planNodeMap);
+        }
+    }
+
     public RuntimeProfile getExecutionProfile() {
         return executionProfile;
     }
@@ -120,6 +201,17 @@ public class ExecutionProfile {
         }
     }
 
+    private boolean enablePipelineX() {
+        return profileFragmentDoneSignal != null;
+    }
+
+    public void markFragments(int fragments) {
+        profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments);
+        for (int fragmentId = 0; fragmentId < fragments; fragmentId++) {
+            profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is 
meaningless */);
+        }
+    }
+
     public void update(long startTime, boolean isFinished) {
         if (startTime > 0) {
             executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS, 
TimeUtils.getElapsedTimeMs(startTime));
@@ -133,6 +225,14 @@ public class ExecutionProfile {
             }
         }
 
+        if (isFinished && profileFragmentDoneSignal != null) {
+            try {
+                profileFragmentDoneSignal.await(2, TimeUnit.SECONDS);
+            } catch (InterruptedException e1) {
+                LOG.warn("signal await error", e1);
+            }
+        }
+
         for (RuntimeProfile fragmentProfile : fragmentProfiles) {
             fragmentProfile.sortChildren();
         }
@@ -143,6 +243,9 @@ public class ExecutionProfile {
             // count down to zero to notify all objects waiting for this
             profileDoneSignal.countDownToZero(new Status());
         }
+        if (profileFragmentDoneSignal != null) {
+            profileFragmentDoneSignal.countDownToZero(new Status());
+        }
     }
 
     public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
@@ -153,6 +256,14 @@ public class ExecutionProfile {
         }
     }
 
+    public void markOneFragmentDone(int fragmentId) {
+        if (profileFragmentDoneSignal != null) {
+            if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) {
+                LOG.warn("Mark fragment {} done failed", fragmentId);
+            }
+        }
+    }
+
     public boolean awaitAllInstancesDone(long waitTimeS) throws 
InterruptedException {
         if (profileDoneSignal == null) {
             return true;
@@ -160,6 +271,13 @@ public class ExecutionProfile {
         return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
     }
 
+    public boolean awaitAllFragmentsDone(long waitTimeS) throws 
InterruptedException {
+        if (profileFragmentDoneSignal == null) {
+            return true;
+        }
+        return profileFragmentDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
+    }
+
     public boolean isAllInstancesDone() {
         if (profileDoneSignal == null) {
             return true;
@@ -167,9 +285,16 @@ public class ExecutionProfile {
         return profileDoneSignal.getCount() == 0;
     }
 
-    public void addInstanceProfile(int instanceIdx, RuntimeProfile 
instanceProfile) {
-        Preconditions.checkArgument(instanceIdx < fragmentProfiles.size(),
-                instanceIdx + " vs. " + fragmentProfiles.size());
-        fragmentProfiles.get(instanceIdx).addChild(instanceProfile);
+    public boolean isAllFragmentsDone() {
+        if (profileFragmentDoneSignal == null) {
+            return true;
+        }
+        return profileFragmentDoneSignal.getCount() == 0;
+    }
+
+    public void addInstanceProfile(int fragmentId, RuntimeProfile 
instanceProfile) {
+        Preconditions.checkArgument(fragmentId < fragmentProfiles.size(),
+                fragmentId + " vs. " + fragmentProfiles.size());
+        fragmentProfiles.get(fragmentId).addChild(instanceProfile);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5188412bd3a..ba1499fae11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -148,6 +148,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class Coordinator implements CoordInterface {
     private static final Logger LOG = LogManager.getLogger(Coordinator.class);
@@ -677,7 +678,12 @@ public class Coordinator implements CoordInterface {
             
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId),
 scanRangeNum);
             LOG.info("dispatch load job: {} to {}", 
DebugUtil.printId(queryId), addressToBackendID.keySet());
         }
-        executionProfile.markInstances(instanceIds);
+        if (enablePipelineXEngine) {
+            executionProfile.markFragments(fragments.size());
+        } else {
+            executionProfile.markInstances(instanceIds);
+        }
+
         if (enablePipelineEngine) {
             sendPipelineCtx();
         } else {
@@ -894,7 +900,8 @@ public class Coordinator implements CoordInterface {
                     Long backendId = 
this.addressToBackendID.get(entry.getKey());
                     PipelineExecContext pipelineExecContext = new 
PipelineExecContext(fragment.getFragmentId(),
                             profileFragmentId, entry.getValue(), backendId, 
fragmentInstancesMap,
-                            executionProfile.getLoadChannelProfile());
+                            executionProfile.getLoadChannelProfile(), 
this.enablePipelineXEngine,
+                            this.executionProfile);
                     // Each tParam will set the total number of Fragments that 
need to be executed on the same BE,
                     // and the BE will determine whether all Fragments have 
been executed based on this information.
                     // Notice. load fragment has a small probability that 
FragmentNumOnHost is 0, for unknown reasons.
@@ -2459,7 +2466,7 @@ public class Coordinator implements CoordInterface {
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
         if (enablePipelineXEngine) {
             PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
-            if (!ctx.updateProfile(params, true)) {
+            if (!ctx.updateProfile(params)) {
                 return;
             }
 
@@ -2503,16 +2510,14 @@ public class Coordinator implements CoordInterface {
             }
 
             Preconditions.checkArgument(params.isSetDetailedReport());
-            for (TDetailedReportParams param : params.detailed_report) {
-                if 
(ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) {
-                    LOG.debug("Query {} instance {} is marked done",
-                            DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
-                    
executionProfile.markOneInstanceDone(param.getFragmentInstanceId());
-                }
+            if (ctx.done) {
+                LOG.debug("Query {} fragment {} is marked done",
+                        DebugUtil.printId(queryId), ctx.profileFragmentId);
+                executionProfile.markOneFragmentDone(ctx.profileFragmentId);
             }
         } else if (enablePipelineEngine) {
             PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
-            if (!ctx.updateProfile(params, false)) {
+            if (!ctx.updateProfile(params)) {
                 return;
             }
 
@@ -2657,7 +2662,11 @@ public class Coordinator implements CoordInterface {
             long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime);
             boolean awaitRes = false;
             try {
-                awaitRes = executionProfile.awaitAllInstancesDone(waitTime);
+                if (enablePipelineXEngine) {
+                    awaitRes = 
executionProfile.awaitAllFragmentsDone(waitTime);
+                } else {
+                    awaitRes = 
executionProfile.awaitAllInstancesDone(waitTime);
+                }
             } catch (InterruptedException e) {
                 // Do nothing
             }
@@ -2700,7 +2709,11 @@ public class Coordinator implements CoordInterface {
     }
 
     public boolean isDone() {
-        return executionProfile.isAllInstancesDone();
+        if (enablePipelineXEngine) {
+            return executionProfile.isAllFragmentsDone();
+        } else {
+            return executionProfile.isAllInstancesDone();
+        }
     }
 
     // map from a BE host address to the per-node assigned scan ranges;
@@ -3092,9 +3105,13 @@ public class Coordinator implements CoordInterface {
         boolean initiated;
         volatile boolean done;
         boolean hasCanceled;
+        // use for pipeline
         Map<TUniqueId, RuntimeProfile> fragmentInstancesMap;
+        // use for pipelineX
+        List<RuntimeProfile> taskProfile;
+
+        boolean enablePipelineX;
         RuntimeProfile loadChannelProfile;
-        int cancelProgress = 0;
         int profileFragmentId;
         TNetworkAddress brpcAddress;
         TNetworkAddress address;
@@ -3103,16 +3120,18 @@ public class Coordinator implements CoordInterface {
         long profileReportProgress = 0;
         long beProcessEpoch = 0;
         private final int numInstances;
+        final ExecutionProfile executionProfile;
 
         public PipelineExecContext(PlanFragmentId fragmentId, int 
profileFragmentId,
                 TPipelineFragmentParams rpcParams, Long backendId,
                 Map<TUniqueId, RuntimeProfile> fragmentInstancesMap,
-                RuntimeProfile loadChannelProfile) {
+                RuntimeProfile loadChannelProfile, boolean enablePipelineX, 
final ExecutionProfile executionProfile) {
             this.profileFragmentId = profileFragmentId;
             this.fragmentId = fragmentId;
             this.rpcParams = rpcParams;
             this.numInstances = rpcParams.local_params.size();
             this.fragmentInstancesMap = fragmentInstancesMap;
+            this.taskProfile = new ArrayList<RuntimeProfile>();
             this.loadChannelProfile = loadChannelProfile;
 
             this.initiated = false;
@@ -3125,12 +3144,27 @@ public class Coordinator implements CoordInterface {
 
             this.hasCanceled = false;
             this.lastMissingHeartbeatTime = 
backend.getLastMissingHeartbeatTime();
+            this.enablePipelineX = enablePipelineX;
+            this.executionProfile = executionProfile;
+        }
+
+        public Stream<RuntimeProfile> profileStream() {
+            if (enablePipelineX) {
+                return taskProfile.stream();
+            }
+            return fragmentInstancesMap.values().stream();
+        }
+
+        private void attachInstanceProfileToFragmentProfile() {
+            profileStream()
+                    .forEach(p -> 
executionProfile.addInstanceProfile(this.profileFragmentId, p));
         }
 
         /**
          * Some information common to all Fragments does not need to be sent 
repeatedly.
          * Therefore, when we confirm that a certain BE has accepted the 
information,
-         * we will delete the information in the subsequent Fragment to avoid 
repeated sending.
+         * we will delete the information in the subsequent Fragment to avoid 
repeated
+         * sending.
          * This information can be obtained from the cache of BE.
          */
         public void unsetFields() {
@@ -3144,29 +3178,31 @@ public class Coordinator implements CoordInterface {
 
         // update profile.
         // return true if profile is updated. Otherwise, return false.
-        public synchronized boolean updateProfile(TReportExecStatusParams 
params, boolean isPipelineX) {
-            if (isPipelineX) {
+        public synchronized boolean updateProfile(TReportExecStatusParams 
params) {
+            if (enablePipelineX) {
+                taskProfile.clear();
+                int pipelineIdx = 0;
                 for (TDetailedReportParams param : params.detailed_report) {
-                    RuntimeProfile profile = 
fragmentInstancesMap.get(param.fragment_instance_id);
-                    if (params.done && profile.getIsDone()) {
-                        continue;
-                    }
-
+                    String name = "Pipeline :" + pipelineIdx + " "
+                            + " (host=" + address + ")";
+                    RuntimeProfile profile = new RuntimeProfile(name);
+                    taskProfile.add(profile);
                     if (param.isSetProfile()) {
                         profile.update(param.profile);
                     }
-                    if (params.isSetLoadChannelProfile()) {
-                        loadChannelProfile.update(params.loadChannelProfile);
-                    }
                     if (params.done) {
                         profile.setIsDone(true);
-                        profileReportProgress++;
                     }
+                    pipelineIdx++;
                 }
-                if (profileReportProgress == numInstances) {
-                    this.done = true;
+                if (params.isSetLoadChannelProfile()) {
+                    loadChannelProfile.update(params.loadChannelProfile);
                 }
-                return true;
+                this.done = params.done;
+                if (this.done) {
+                    attachInstanceProfileToFragmentProfile();
+                }
+                return this.done;
             } else {
                 RuntimeProfile profile = 
fragmentInstancesMap.get(params.fragment_instance_id);
                 if (params.done && profile.getIsDone()) {
@@ -3192,7 +3228,7 @@ public class Coordinator implements CoordInterface {
         }
 
         public synchronized void printProfile(StringBuilder builder) {
-            this.fragmentInstancesMap.values().stream().forEach(p -> {
+            this.profileStream().forEach(p -> {
                 p.computeTimeInProfile();
                 p.prettyPrint(builder, "");
             });
@@ -3200,23 +3236,41 @@ public class Coordinator implements CoordInterface {
 
         // cancel all fragment instances.
         // return true if cancel success. Otherwise, return false
-        public synchronized boolean 
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
-            if (!this.initiated) {
-                LOG.warn("Query {}, ccancel before initiated", 
DebugUtil.printId(queryId));
+
+        private synchronized boolean 
cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
+            if (!this.hasCanceled) {
                 return false;
             }
-            // don't cancel if it is already finished
-            if (this.done) {
-                LOG.warn("Query {}, cancel after finished", 
DebugUtil.printId(queryId));
-                return false;
+            for (RuntimeProfile profile : taskProfile) {
+                profile.setIsCancel(true);
             }
-            if (this.hasCanceled) {
-                LOG.warn("Query {}, cancel after cancelled", 
DebugUtil.printId(queryId));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("cancelRemoteFragments initiated={} done={} 
hasCanceled={} backend: {},"
+                        + " fragment id={} query={}, reason: {}",
+                        this.initiated, this.done, this.hasCanceled, 
backend.getId(),
+                        this.profileFragmentId,
+                        DebugUtil.printId(queryId), cancelReason.name());
+            }
+            try {
+                try {
+                    
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
+                            this.profileFragmentId, queryId, cancelReason);
+                } catch (RpcException e) {
+                    LOG.warn("cancel plan fragment get a exception, 
address={}:{}", brpcAddress.getHostname(),
+                            brpcAddress.getPort());
+                    
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), 
e.getMessage());
+                }
+            } catch (Exception e) {
+                LOG.warn("catch a exception", e);
                 return false;
             }
+            return true;
+        }
+
+        private synchronized boolean 
cancelInstance(Types.PPlanFragmentCancelReason cancelReason) {
             for (TPipelineInstanceParams localParam : rpcParams.local_params) {
                 LOG.warn("cancelRemoteFragments initiated={} done={} 
hasCanceled={} backend:{},"
-                                + " fragment instance id={} query={}, reason: 
{}",
+                        + " fragment instance id={} query={}, reason: {}",
                         this.initiated, this.done, this.hasCanceled, 
backend.getId(),
                         DebugUtil.printId(localParam.fragment_instance_id),
                         DebugUtil.printId(queryId), cancelReason.name());
@@ -3244,14 +3298,35 @@ public class Coordinator implements CoordInterface {
             if (!this.hasCanceled) {
                 return false;
             }
-
             for (int i = 0; i < this.numInstances; i++) {
                 
fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true);
             }
-            cancelProgress = numInstances;
             return true;
         }
 
+        /// TODO: refactor rpcParams
+        public synchronized boolean 
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
+            if (!this.initiated) {
+                LOG.warn("Query {}, ccancel before initiated", 
DebugUtil.printId(queryId));
+                return false;
+            }
+            // don't cancel if it is already finished
+            if (this.done) {
+                LOG.warn("Query {}, cancel after finished", 
DebugUtil.printId(queryId));
+                return false;
+            }
+            if (this.hasCanceled) {
+                LOG.warn("Query {}, cancel after cancelled", 
DebugUtil.printId(queryId));
+                return false;
+            }
+
+            if (this.enablePipelineX) {
+                return cancelFragment(cancelReason);
+            } else {
+                return cancelInstance(cancelReason);
+            }
+        }
+
         public synchronized boolean computeTimeInProfile(int maxFragmentId) {
             if (this.profileFragmentId < 0 || this.profileFragmentId > 
maxFragmentId) {
                 LOG.warn("profileFragmentId {} should be in [0, {})", 
profileFragmentId, maxFragmentId);
@@ -3843,7 +3918,7 @@ public class Coordinator implements CoordInterface {
     private void attachInstanceProfileToFragmentProfile() {
         if (enablePipelineEngine) {
             for (PipelineExecContext ctx : pipelineExecContexts.values()) {
-                ctx.fragmentInstancesMap.values().stream()
+                ctx.profileStream()
                         .forEach(p -> 
executionProfile.addInstanceProfile(ctx.profileFragmentId, p));
             }
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 02245c83ced..52350d805bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -240,6 +240,24 @@ public class BackendServiceProxy {
         }
     }
 
+    public Future<InternalService.PCancelPlanFragmentResult> 
cancelPipelineXPlanFragmentAsync(TNetworkAddress address,
+            int fragmentId, TUniqueId queryId, Types.PPlanFragmentCancelReason 
cancelReason) throws RpcException {
+        final InternalService.PCancelPlanFragmentRequest pRequest = 
InternalService.PCancelPlanFragmentRequest
+                .newBuilder()
+                
.setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build())
+                .setCancelReason(cancelReason)
+                .setFragmentId(fragmentId)
+                
.setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build();
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.cancelPlanFragmentAsync(pRequest);
+        } catch (Throwable e) {
+            LOG.warn("Cancel plan fragment catch a exception, address={}:{}", 
address.getHostname(), address.getPort(),
+                    e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
     public Future<InternalService.PFetchDataResult> fetchDataAsync(
             TNetworkAddress address, InternalService.PFetchDataRequest 
request) throws RpcException {
         try {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index ec3714d618a..46e2e194f06 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -229,6 +229,8 @@ message PExecPlanFragmentResult {
 message PCancelPlanFragmentRequest {
     required PUniqueId finst_id = 1;
     optional PPlanFragmentCancelReason cancel_reason = 2;
+    optional PUniqueId query_id = 3;
+    optional int32 fragment_id = 4;
 };
 
 message PCancelPlanFragmentResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to