This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b9dacc700a4 [refactor](recursive-cte) Replace in-place PFC reset with 
full recreation between recursion rounds (#61130)
b9dacc700a4 is described below

commit b9dacc700a47149fe4cfa7dc4708224ed2ccc7fa
Author: Pxl <[email protected]>
AuthorDate: Fri Mar 27 11:08:14 2026 +0800

    [refactor](recursive-cte) Replace in-place PFC reset with full recreation 
between recursion rounds (#61130)
    
    This pull request introduces significant improvements to the recursive
    Common Table Expression (CTE) pipeline execution and fragment lifecycle
    management, focusing on safe and deadlock-free fragment shutdown,
    runtime filter tracking, and clearer orchestration of recursive fragment
    reruns. The changes add detailed documentation, refactor fragment
    shutdown logic to prevent deadlocks, and enhance runtime filter
    management for recursive CTEs.
    
    **Recursive CTE fragment rerun lifecycle and runtime filter
    management:**
    
    * Added a comprehensive comment and state transition diagram to
    `RecCTESourceOperatorX`, clarifying the orchestration of recursive CTE
    fragment reruns, including the four major rerun opcodes and their role
    in fragment destruction, recreation, and cleanup.
    * Updated rerun fragment opcode naming for clarity (e.g., `wait` →
    `WAIT_FOR_DESTROY`, `close` → `FINAL_CLOSE`) and ensured correct usage
    throughout the recursive process logic.
    
    **Deadlock prevention and safe fragment shutdown:**
    
    * Refactored `PipelineFragmentContext` shutdown logic to avoid ABBA
    deadlocks: moved calls to `remove_pipeline_context()` outside of
    `_task_mutex` locks, and updated `_close_fragment_instance()` to return
    a flag indicating if removal is needed. This pattern is now consistently
    used in `submit()`, `decrement_running_task()`, and `cancel()`.
    
[[1]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548R170-R198)
    
[[2]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L179-L185)
    
[[3]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548R1796-R1805)
    
[[4]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L1805-R1843)
    
[[5]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L1853-R1886)
    
[[6]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548R1899-R1909)
    * Removed unnecessary runtime filter waiting logic from task
    preparation, simplifying task state management.
    
    **Runtime filter tracking for recursive CTEs:**
    
    * Added a method to collect all runtime filter IDs registered by a
    fragment's tasks (`get_deregister_runtime_filter()`), facilitating
    correct deregistration and preventing filter leaks during recursive
    reruns. This is now used in the partitioned hash join sink operator and
    recursive CTE orchestration.
    
[[1]](diffhunk://#diff-71e403ca9f968c4129c7073f832b0467d8c3543acb3fc87a1f15dd97aaf8199eR481-R484)
    
[[2]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L2075-R2134)
    * Ensured that runtime filter IDs from inner (spill) runtime states are
    merged into the parent state for proper deregistration during recursive
    reruns.
    
    **Other improvements:**
    
    * Improved debug output for `PipelineFragmentContext` by including
    fragment ID and recursion stage in the debug string.
    * Adjusted `send_report()` logic to avoid unnecessary retries and to
    safely close fragments when the query finishes successfully.
    
[[1]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L1911-R1948)
    
[[2]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548R1958-R1961)
    * Cleaned up unused or redundant code, such as the `wait_close`,
    `set_to_rerun`, and `rebuild` methods.
    
    These changes collectively make recursive CTE execution more robust,
    maintainable, and easier to reason about, especially in complex
    distributed query execution scenarios.
---
 .../partitioned_hash_join_sink_operator.cpp        |   4 +
 be/src/exec/operator/rec_cte_source_operator.h     | 122 +++++++++++++-
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 170 +++++++++++---------
 be/src/exec/pipeline/pipeline_fragment_context.h   |  50 +++++-
 be/src/exec/runtime_filter/runtime_filter.cpp      |   6 +-
 be/src/exec/runtime_filter/runtime_filter.h        |   7 +
 be/src/exec/runtime_filter/runtime_filter_mgr.cpp  |  57 ++++++-
 be/src/exec/runtime_filter/runtime_filter_mgr.h    |  21 ++-
 .../runtime_filter/runtime_filter_producer.cpp     |  10 ++
 be/src/exec/scan/scanner_context.cpp               |   6 -
 be/src/runtime/fragment_mgr.cpp                    | 177 +++++++++++++++++----
 be/src/runtime/fragment_mgr.h                      |  28 +++-
 be/src/runtime/query_context.cpp                   |   8 +-
 be/src/runtime/query_context.h                     |   2 +-
 be/src/runtime/runtime_state.cpp                   |  49 ++++--
 be/src/runtime/runtime_state.h                     |  12 +-
 be/src/runtime/task_execution_context.cpp          |  14 +-
 be/src/runtime/task_execution_context.h            |  22 +--
 be/src/service/internal_service.cpp                |  13 +-
 .../runtime_filter/runtime_filter_mgr_test.cpp     |  10 +-
 gensrc/proto/internal_service.proto                |  13 +-
 .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy |  18 +++
 22 files changed, 609 insertions(+), 210 deletions(-)

diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
index c75db33799b..dacceabb59c 100644
--- a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
@@ -478,6 +478,10 @@ Status 
PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState*
     /// Set these two values after all the work is ready.
     _shared_state->_inner_shared_state = std::move(inner_shared_state);
     _shared_state->_inner_runtime_state = std::move(inner_runtime_state);
+    // The inner (spill) runtime state registers its own runtime filters. 
Merge those IDs
+    // into the parent state so they are tracked for deregistration during 
recursive CTE rerun.
+    state->merge_register_runtime_filter(
+            
_shared_state->_inner_runtime_state->get_deregister_runtime_filter());
     return Status::OK();
 }
 
diff --git a/be/src/exec/operator/rec_cte_source_operator.h 
b/be/src/exec/operator/rec_cte_source_operator.h
index 84a1f81f6b2..ba01829d80c 100644
--- a/be/src/exec/operator/rec_cte_source_operator.h
+++ b/be/src/exec/operator/rec_cte_source_operator.h
@@ -58,6 +58,119 @@ private:
     std::shared_ptr<Dependency> _anchor_dependency = nullptr;
 };
 
+// RecCTESourceOperatorX drives the recursive CTE fragment rerun lifecycle.
+//
+// It orchestrates child fragment destruction and recreation for each 
recursion round
+// via the rerun_fragment RPC (defined in internal_service.proto). The 4 
opcodes are:
+//
+//   wait_for_destroy (=1) : notify old PFC to close, async wait for tasks to 
finish
+//   rebuild          (=2) : increment stage, deregister old runtime filters, 
create new PFC
+//   submit           (=3) : submit new PFC pipeline tasks for execution
+//   final_close      (=4) : last round cleanup, send final report, destroy 
fragment
+//
+// State Transition Diagram
+// ========================
+//
+//   ┌────────────────────────────────────┐
+//   │          Query Start (FE)          │
+//   │  FE sets need_notify_close = true  │
+//   │  on recursive-side child fragments │
+//   └──────────────┬─────────────────────┘
+//                  │
+//                  ▼
+//   ┌────────────────────────────────────┐
+//   │      Initial Registration (BE)     │
+//   │  FragmentMgr saves params in       │
+//   │  _rerunnable_params_map            │
+//   │  PFC created, prepared & submitted │
+//   └──────────────┬─────────────────────┘
+//                  │
+//                  ▼
+//   ┌────────────────────────────────────┐
+//   │        Fragment Running            │
+//   │  PFC executing pipeline tasks      │◄─────────────────────────────┐
+//   │  _need_notify_close = true         │                              │
+//   └──────────────┬─────────────────────┘                              │
+//                  │                                                    │
+//                  │ tasks complete, but PFC does NOT                   │
+//                  │ self-remove (blocked by _need_notify_close)        │
+//                  ▼                                                    │
+//   ┌────────────────────────────────────┐                              │
+//   │  RecCTESourceOperatorX::get_block()│                              │
+//   │  ready_to_return? round < max?     │                              │
+//   └───────┬─────────────────┬──────────┘                              │
+//           │                 │                                         │
+//   more rounds remain   no more rounds                                │
+//           │                 │                                         │
+//           ▼                 │                                         │
+//   ┌──────────────────┐     │                                         │
+//   │_recursive_process│     │                                         │
+//   └───────┬──────────┘     │                                         │
+//           │                │                                         │
+//           ▼                │                                         │
+//   ┌────────────────────────────────────┐                              │
+//   │ Step 1: wait_for_destroy (=1)      │                              │
+//   │  • collect deregister RF IDs       │                              │
+//   │  • store brpc closure guard        │                              │
+//   │  • notify_close() on old PFC       │                              │
+//   │    → _need_notify_close = false    │                              │
+//   │    → old PFC begins shutdown       │                              │
+//   └──────────────┬─────────────────────┘                              │
+//                  │                                                    │
+//                  ▼                                                    │
+//   ┌────────────────────────────────────┐                              │
+//   │ Step 1.5: reset_global_rf          │                              │
+//   │  • reset global runtime filters    │                              │
+//   │    on merge coordinator            │                              │
+//   └──────────────┬─────────────────────┘                              │
+//                  │                                                    │
+//                  ▼                                                    │
+//   ┌────────────────────────────────────┐                              │
+//   │ Step 2: rebuild (=2)               │                              │
+//   │  • increment recursion stage       │                              │
+//   │  • deregister old runtime filters  │                              │
+//   │  • create NEW PFC from saved params│                              │
+//   │  • prepare() new PFC              │                              │
+//   │  • insert into _pipeline_map       │                              │
+//   └──────────────┬─────────────────────┘                              │
+//                  │                                                    │
+//                  ▼                                                    │
+//   ┌────────────────────────────────────┐                              │
+//   │ Step 3: submit (=3)                │                              │
+//   │  • find new PFC in pipeline_map    │                              │
+//   │  • call fragment_ctx->submit()     │                              │
+//   │  • pipeline tasks start running    │                              │
+//   └──────────────┬─────────────────────┘                              │
+//                  │                                                    │
+//                  ▼                                                    │
+//   ┌────────────────────────────────────┐                              │
+//   │ Step 4: send_data_to_targets       │                              │
+//   │  • transmit_rec_cte_block() to     │                              │
+//   │    RecursiveCteScanNode targets    │                              │
+//   │  • new recursion round begins      │                              │
+//   └──────────────┬─────────────────────┘                              │
+//                  │                                                    │
+//                  └──── loop back to Fragment Running ─────────────────┘
+//
+//           (when no more rounds remain)
+//                  │
+//                  ▼
+//   ┌────────────────────────────────────┐
+//   │ final_close (=4)                   │
+//   │  • listen_wait_close(guard,        │
+//   │    need_send_report = true)        │
+//   │  • notify_close() on PFC           │
+//   │  • send final status report        │
+//   │  • clean up completely             │
+//   └──────────────┬─────────────────────┘
+//                  │
+//                  ▼
+//   ┌────────────────────────────────────┐
+//   │       Fragment Destroyed           │
+//   │  rerunnable_params removed on      │
+//   │  query end                         │
+//   └────────────────────────────────────┘
+//
 class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
 public:
     using Base = OperatorX<RecCTESourceLocalState>;
@@ -135,17 +248,16 @@ private:
                                               "RecursiveRound", TUnit::UNIT);
             
round_counter->set(int64_t(get_local_state(state)._shared_state->current_round));
 
-            RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::close));
+            RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::FINAL_CLOSE));
         }
         return Status::OK();
     }
 
     Status _recursive_process(RuntimeState* state, size_t last_round_offset) 
const {
-        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::wait));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::WAIT_FOR_DESTROY));
         RETURN_IF_ERROR(_send_reset_global_rf(state));
-        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::release));
-        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::rebuild));
-        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::submit));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::REBUILD));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::SUBMIT));
         
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(
                 state, last_round_offset));
         return Status::OK();
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index ebb56df51d0..64f1bbf8c30 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -169,6 +169,35 @@ bool PipelineFragmentContext::is_timeout(timespec now) 
const {
     return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
 }
 
+// notify_close() transitions the PFC from "waiting for external close 
notification" to
+// "self-managed close". For recursive CTE fragments, the old PFC is kept 
alive until
+// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
+// Returns true if all tasks have already closed (i.e., the PFC can be safely 
destroyed).
+bool PipelineFragmentContext::notify_close() {
+    bool all_closed = false;
+    bool need_remove = false;
+    {
+        std::lock_guard<std::mutex> l(_task_mutex);
+        if (_closed_tasks >= _total_tasks) {
+            if (_need_notify_close) {
+                // Fragment was cancelled and waiting for notify to close.
+                // Record that we need to remove from fragment mgr, but do it
+                // after releasing _task_mutex to avoid ABBA deadlock with
+                // dump_pipeline_tasks() (which acquires _pipeline_map lock
+                // first, then _task_mutex via debug_string()).
+                need_remove = true;
+            }
+            all_closed = true;
+        }
+        // make fragment release by self after cancel
+        _need_notify_close = false;
+    }
+    if (need_remove) {
+        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    }
+    return all_closed;
+}
+
 // Must not add lock in this method. Because it will call query ctx cancel. And
 // QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's 
running
 // Method like exchange sink buffer will call query ctx cancel. If we add lock 
here
@@ -178,12 +207,8 @@ void PipelineFragmentContext::cancel(const Status reason) {
             .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id)
             .tag("reason", reason.to_string());
-    {
-        std::lock_guard<std::mutex> l(_task_mutex);
-        if (_closed_tasks >= _total_tasks) {
-            // All tasks in this PipelineXFragmentContext already closed.
-            return;
-        }
+    if (notify_close()) {
+        return;
     }
     // Timeout is a special error code, we need print current stack to debug 
timeout issue.
     if (reason.is<ErrorCode::TIMEOUT>()) {
@@ -419,10 +444,6 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks_for_instance(
 
                 
task_runtime_state->set_task_execution_context(shared_from_this());
                 task_runtime_state->set_be_number(local_params.backend_num);
-                if (_need_notify_close) {
-                    // rec cte require child rf to wait infinitely to make 
sure all rpc done
-                    task_runtime_state->set_force_make_rf_wait_infinite();
-                }
 
                 if (_params.__isset.backend_id) {
                     task_runtime_state->set_backend_id(_params.backend_id);
@@ -1795,9 +1816,16 @@ Status PipelineFragmentContext::submit() {
         }
     }
     if (!st.ok()) {
-        std::lock_guard<std::mutex> l(_task_mutex);
-        if (_closed_tasks >= _total_tasks) {
-            _close_fragment_instance();
+        bool need_remove = false;
+        {
+            std::lock_guard<std::mutex> l(_task_mutex);
+            if (_closed_tasks >= _total_tasks) {
+                need_remove = _close_fragment_instance();
+            }
+        }
+        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA 
deadlock.
+        if (need_remove) {
+            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
         }
         return Status::InternalError("Submit pipeline failed. err = {}, BE: 
{}", st.to_string(),
                                      BackendOptions::get_localhost());
@@ -1825,14 +1853,17 @@ void PipelineFragmentContext::print_profile(const 
std::string& extra_info) {
 }
 // If all pipeline tasks binded to the fragment instance are finished, then we 
could
 // close the fragment instance.
-void PipelineFragmentContext::_close_fragment_instance() {
+// Returns true if the caller should call remove_pipeline_context() **after** 
releasing
+// _task_mutex. We must not call remove_pipeline_context() here because it 
acquires
+// _pipeline_map's shard lock, and this function is called while _task_mutex 
is held.
+// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock 
with
+// dump_pipeline_tasks(), which acquires _pipeline_map first and then 
_task_mutex
+// (via debug_string()).
+bool PipelineFragmentContext::_close_fragment_instance() {
     if (_is_fragment_instance_closed) {
-        return;
+        return false;
     }
-    Defer defer_op {[&]() {
-        _is_fragment_instance_closed = true;
-        _notify_cv.notify_all();
-    }};
+    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
     
_fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     if (!_need_notify_close) {
         auto st = send_report(true);
@@ -1873,10 +1904,9 @@ void PipelineFragmentContext::_close_fragment_instance() 
{
                                          
collect_realtime_load_channel_profile());
     }
 
-    if (!_need_notify_close) {
-        // all submitted tasks done
-        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
-    }
+    // Return whether the caller needs to remove from the pipeline map.
+    // The caller must do this after releasing _task_mutex.
+    return !_need_notify_close;
 }
 
 void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
@@ -1889,10 +1919,17 @@ void 
PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
             }
         }
     }
-    std::lock_guard<std::mutex> l(_task_mutex);
-    ++_closed_tasks;
-    if (_closed_tasks >= _total_tasks) {
-        _close_fragment_instance();
+    bool need_remove = false;
+    {
+        std::lock_guard<std::mutex> l(_task_mutex);
+        ++_closed_tasks;
+        if (_closed_tasks >= _total_tasks) {
+            need_remove = _close_fragment_instance();
+        }
+    }
+    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA 
deadlock.
+    if (need_remove) {
+        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
     }
 }
 
@@ -1931,7 +1968,7 @@ Status PipelineFragmentContext::send_report(bool done) {
     // Load will set _is_report_success to true because load wants to know
     // the process.
     if (!_is_report_success && done && exec_status.ok()) {
-        return Status::NeedSendAgain("");
+        return Status::OK();
     }
 
     // If both _is_report_success and _is_report_on_cancel are false,
@@ -1941,6 +1978,10 @@ Status PipelineFragmentContext::send_report(bool done) {
     // When limit is reached the fragment is also cancelled, but 
_is_report_on_cancel will
     // be set to false, to avoid sending fault report to FE.
     if (!_is_report_success && !_is_report_on_cancel) {
+        if (done) {
+            // if done is true, which means the query is finished 
successfully, we can safely close the fragment instance without sending report 
to FE, and just return OK status here.
+            return Status::OK();
+        }
         return Status::NeedSendAgain("");
     }
 
@@ -2018,9 +2059,8 @@ std::string PipelineFragmentContext::debug_string() {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "PipelineFragmentContext Info: _closed_tasks={}, 
_total_tasks={}, "
-                   "need_notify_close={}, 
has_task_execution_ctx_ref_count={}\n",
-                   _closed_tasks, _total_tasks, _need_notify_close,
-                   _has_task_execution_ctx_ref_count);
+                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
+                   _closed_tasks, _total_tasks, _need_notify_close, 
_fragment_id, _rec_cte_stage);
     for (size_t j = 0; j < _tasks.size(); j++) {
         fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
         for (size_t i = 0; i < _tasks[j].size(); i++) {
@@ -2095,54 +2135,26 @@ 
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
     return load_channel_profile;
 }
 
-Status PipelineFragmentContext::wait_close(bool close) {
-    if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) {
-        return Status::InternalError("stream load do not support reset");
-    }
-    if (!_need_notify_close) {
-        return Status::InternalError("_need_notify_close is false, do not 
support reset");
-    }
-
-    {
-        std::unique_lock<std::mutex> lock(_task_mutex);
-        while (!(_is_fragment_instance_closed.load() && 
!_has_task_execution_ctx_ref_count)) {
-            if (_query_ctx->is_cancelled()) {
-                return Status::Cancelled("Query has been cancelled");
-            }
-            _notify_cv.wait_for(lock, std::chrono::seconds(1));
-        }
-    }
-
-    if (close) {
-        auto st = send_report(true);
-        if (!st) {
-            LOG(WARNING) << fmt::format("Failed to send report for query {}, 
fragment {}: {}",
-                                        print_id(_query_id), _fragment_id, 
st.to_string());
-        }
-        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
-    }
-    return Status::OK();
-}
-
-Status PipelineFragmentContext::set_to_rerun() {
-    {
-        std::lock_guard<std::mutex> l(_task_mutex);
-        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
-        for (auto& tasks : _tasks) {
-            for (const auto& task : tasks) {
-                task.first->runtime_state()->reset_to_rerun();
-            }
-        }
-    }
-    _release_resource();
-    _runtime_state->reset_to_rerun();
-    return Status::OK();
-}
-
-Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) {
-    _submitted = false;
-    _is_fragment_instance_closed = false;
-    return _build_and_prepare_full_pipeline(thread_pool);
+// Collect runtime filter IDs registered by all tasks in this PFC.
+// Used during recursive CTE stage transitions to know which filters to 
deregister
+// before creating the new PFC for the next recursion round.
+// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
+// Thread safety: safe because _tasks is structurally immutable after 
prepare() —
+// the vector sizes do not change, and individual RuntimeState filter sets are
+// written only during open() which has completed by the time we reach rerun.
+std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
+    std::set<int> result;
+    for (const auto& _task : _tasks) {
+        for (const auto& task : _task) {
+            auto set = 
task.first->runtime_state()->get_deregister_runtime_filter();
+            result.merge(set);
+        }
+    }
+    if (_runtime_state) {
+        auto set = _runtime_state->get_deregister_runtime_filter();
+        result.merge(set);
+    }
+    return result;
 }
 
 void PipelineFragmentContext::_release_resource() {
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h 
b/be/src/exec/pipeline/pipeline_fragment_context.h
index e523e711898..6372737c0e7 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.h
+++ b/be/src/exec/pipeline/pipeline_fragment_context.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <brpc/closure_guard.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/types.pb.h>
 
@@ -26,6 +27,7 @@
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -89,10 +91,15 @@ public:
 
     void cancel(const Status reason);
 
+    bool notify_close();
+
     TUniqueId get_query_id() const { return _query_id; }
 
     [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
 
+    uint32_t rec_cte_stage() const { return _rec_cte_stage; }
+    void set_rec_cte_stage(uint32_t stage) { _rec_cte_stage = stage; }
+
     void decrement_running_task(PipelineId pipeline_id);
 
     Status send_report(bool);
@@ -126,11 +133,28 @@ public:
     std::string get_load_error_url();
     std::string get_first_error_msg();
 
-    Status wait_close(bool close);
-    Status rebuild(ThreadPool* thread_pool);
-    Status set_to_rerun();
+    std::set<int> get_deregister_runtime_filter() const;
 
-    bool need_notify_close() const { return _need_notify_close; }
+    // Store the brpc ClosureGuard so the RPC response is deferred until this 
PFC is destroyed.
+    // When need_send_report_on_destruction is true (final_close), send the 
report immediately
+    // and do not store the guard (let it fire on return to complete the RPC).
+    //
+    // Thread safety: This method is NOT thread-safe. It reads/writes 
_wait_close_guard without
+    // synchronization. Currently it is only called from rerun_fragment() 
which is invoked
+    // sequentially by RecCTESourceOperatorX (a serial operator) — one opcode 
at a time per
+    // fragment. Do NOT call this concurrently from multiple threads.
+    Status listen_wait_close(const std::shared_ptr<brpc::ClosureGuard>& guard,
+                             bool need_send_report_on_destruction) {
+        if (_wait_close_guard) {
+            return Status::InternalError("Already listening wait close");
+        }
+        if (need_send_report_on_destruction) {
+            return send_report(true);
+        } else {
+            _wait_close_guard = guard;
+        }
+        return Status::OK();
+    }
 
 private:
     void _release_resource();
@@ -183,7 +207,11 @@ private:
     Status _build_pipeline_tasks_for_instance(
             int instance_idx,
             const std::vector<std::shared_ptr<RuntimeProfile>>& 
pipeline_id_to_profile);
-    void _close_fragment_instance();
+    // Close the fragment instance and return true if the caller should call
+    // remove_pipeline_context() **after** releasing _task_mutex. This avoids
+    // holding _task_mutex while acquiring _pipeline_map's shard lock, which
+    // would create an ABBA deadlock with dump_pipeline_tasks().
+    bool _close_fragment_instance();
     void _init_next_report_time();
 
     // Id of this query
@@ -339,6 +367,16 @@ private:
     TPipelineFragmentParams _params;
     int32_t _parallel_instances = 0;
 
-    bool _need_notify_close = false;
+    std::atomic<bool> _need_notify_close = false;
+    // Holds the brpc ClosureGuard for async wait-close during recursive CTE 
rerun.
+    // When the PFC finishes closing and is destroyed, the shared_ptr 
destructor fires
+    // the ClosureGuard, which completes the brpc response to the 
RecCTESourceOperatorX.
+    // Only written by listen_wait_close() from a single rerun_fragment RPC 
thread.
+    std::shared_ptr<brpc::ClosureGuard> _wait_close_guard = nullptr;
+
+    // The recursion round number for recursive CTE fragments.
+    // Incremented each time the fragment is rebuilt via 
rerun_fragment(rebuild).
+    // Used to stamp runtime filter RPCs so stale messages from old rounds are 
discarded.
+    uint32_t _rec_cte_stage = 0;
 };
 } // namespace doris
diff --git a/be/src/exec/runtime_filter/runtime_filter.cpp 
b/be/src/exec/runtime_filter/runtime_filter.cpp
index 9a2ac39edc5..8ee920d3c33 100644
--- a/be/src/exec/runtime_filter/runtime_filter.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter.cpp
@@ -35,6 +35,7 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, 
const TNetworkAddress
     }
 
     auto merge_filter_request = std::make_shared<PMergeFilterRequest>();
+    merge_filter_request->set_stage(_stage);
     auto merge_filter_callback = 
DummyBrpcCallback<PMergeFilterResponse>::create_shared();
     auto merge_filter_closure =
             AutoReleaseClosure<PMergeFilterRequest, 
DummyBrpcCallback<PMergeFilterResponse>>::
@@ -126,8 +127,9 @@ Status RuntimeFilter::_init_with_desc(const 
TRuntimeFilterDesc* desc,
 }
 
 std::string RuntimeFilter::_debug_string() const {
-    return fmt::format("{}, mode: {}", _wrapper ? _wrapper->debug_string() : 
"<null wrapper>",
-                       _has_remote_target ? "GLOBAL" : "LOCAL");
+    return fmt::format("{}, mode: {}, stage: {}",
+                       _wrapper ? _wrapper->debug_string() : "<null wrapper>",
+                       _has_remote_target ? "GLOBAL" : "LOCAL", _stage);
 }
 
 void RuntimeFilter::_check_wrapper_state(
diff --git a/be/src/exec/runtime_filter/runtime_filter.h 
b/be/src/exec/runtime_filter/runtime_filter.h
index f53042cc120..a79cdf225e6 100644
--- a/be/src/exec/runtime_filter/runtime_filter.h
+++ b/be/src/exec/runtime_filter/runtime_filter.h
@@ -44,6 +44,9 @@ public:
 
     bool has_remote_target() const { return _has_remote_target; }
 
+    uint32_t stage() const { return _stage; }
+    void set_stage(uint32_t stage) { _stage = stage; }
+
     template <class T>
     Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) {
         std::unique_lock<std::recursive_mutex> l(_rmtx);
@@ -120,6 +123,10 @@ protected:
     // _wrapper is a runtime filter function wrapper
     std::shared_ptr<RuntimeFilterWrapper> _wrapper;
 
+    // The recursion round number for recursive CTE.
+    // Stamped onto outgoing RPC requests so stale messages from old rounds 
are discarded.
+    uint32_t _stage = 0;
+
     // will apply to remote node
     const bool _has_remote_target;
 
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
index 7f4d7ef0f1c..e0695cb2440 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
@@ -25,6 +25,7 @@
 #include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
 
+#include <mutex>
 #include <ostream>
 #include <string>
 #include <utility>
@@ -103,11 +104,21 @@ Status LocalMergeContext::register_producer(const 
QueryContext* query_ctx,
                                             const TRuntimeFilterDesc* desc,
                                             
std::shared_ptr<RuntimeFilterProducer> producer) {
     std::lock_guard<std::mutex> l(mtx);
+    if (producer->stage() > stage) {
+        // New recursive CTE round: discard stale merger and producers from
+        // the previous round and recreate the merger for the new round.
+        merger.reset();
+        producers.clear();
+        stage = producer->stage();
+    }
     if (!merger) {
         RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger));
     }
     producers.emplace_back(producer);
     merger->set_expected_producer_num(cast_set<int>(producers.size()));
+    // Sync the local merger's stage from the producer so that outgoing merge 
RPCs
+    // (via _push_to_remote) carry the correct recursive CTE round number.
+    merger->set_stage(producer->stage());
     return Status::OK();
 }
 
@@ -120,10 +131,10 @@ Status 
RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
     std::lock_guard<std::mutex> l(_lock);
     auto iter = _local_merge_map.find(filter_id);
     if (iter == _local_merge_map.end()) {
-        return Status::InternalError(
-                "get_local_merge_producer_filters meet unknown filter: {}, 
role: "
-                "LOCAL_MERGE_PRODUCER.",
-                filter_id);
+        // Filter may have been removed during a recursive CTE stage reset.
+        // Return OK with nullptr to let the caller skip gracefully.
+        *local_merge_filters = nullptr;
+        return Status::OK();
     }
     *local_merge_filters = &iter->second;
     if (!iter->second.merger) {
@@ -236,6 +247,13 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
     }
     auto& cnt_val = iter->second;
     std::unique_lock<std::mutex> l(iter->second.mtx);
+    // Discard stale-stage runtime filter size requests from old recursive CTE 
rounds.
+    // Each round increments the stage counter; only messages matching the 
current stage
+    // should be processed. This prevents old PFC's runtime filters from 
corrupting
+    // the merge state of the new round's filters.
+    if (request->stage() != iter->second.stage) {
+        return Status::OK();
+    }
     cnt_val.source_addrs.push_back(request->source_addr());
 
     Status st = Status::OK();
@@ -253,9 +271,12 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
                 continue;
             }
 
+            auto sync_request = std::make_shared<PSyncFilterSizeRequest>();
+            sync_request->set_stage(iter->second.stage);
+
             auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
                                               
DummyBrpcCallback<PSyncFilterSizeResponse>>::
-                    create_unique(std::make_shared<PSyncFilterSizeRequest>(),
+                    create_unique(sync_request,
                                   
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
 
             auto* pquery_id = closure->request_->mutable_query_id();
@@ -281,6 +302,10 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
 Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* 
request) {
     LocalMergeContext* local_merge_filters = nullptr;
     RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), 
&local_merge_filters));
+    if (local_merge_filters == nullptr) {
+        // Filter was removed during a recursive CTE stage reset; discard 
stale request.
+        return Status::OK();
+    }
     for (auto producer : local_merge_filters->producers) {
         producer->set_synced_size(request->filter_size());
     }
@@ -326,6 +351,14 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
     bool is_ready = false;
     {
         std::lock_guard<std::mutex> l(iter->second.mtx);
+        // Discard stale-stage merge requests from old recursive CTE rounds.
+        if (request->stage() != iter->second.stage) {
+            return Status::OK();
+        }
+        if (cnt_val.merger == nullptr) {
+            return Status::InternalError("Merger is null for filter id {}",
+                                         std::to_string(request->filter_id()));
+        }
         // Skip the other broadcast join runtime filter
         if (cnt_val.arrive_id.size() == 1 && 
cnt_val.runtime_filter_desc.is_broadcast_join) {
             return Status::OK();
@@ -372,6 +405,8 @@ Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
     butil::IOBuf request_attachment;
 
     PPublishFilterRequestV2 apply_request;
+    apply_request.set_stage(cnt_val.stage);
+
     // serialize filter
     void* data = nullptr;
     int len = 0;
@@ -440,13 +475,22 @@ Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
     return st;
 }
 
+// Reset merge context for the next recursive CTE round.
+// Recreates the merger to clear accumulated state, preserving expected 
producer count.
+// Increments the stage counter so stale merge/size RPCs from old rounds are 
discarded.
 Status GlobalMergeContext::reset(QueryContext* query_ctx) {
+    std::unique_lock<std::mutex> lock(mtx);
+    // Merger must exist: reset() is only called on fully initialized merge 
contexts.
+    DORIS_CHECK(merger);
     int producer_size = merger->get_expected_producer_num();
     RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, 
&runtime_filter_desc, &merger));
     merger->set_expected_producer_num(producer_size);
     arrive_id.clear();
     source_addrs.clear();
     done = false;
+    stage++;
+    // Keep the Merger's own stage in sync for consistent debug output.
+    merger->set_stage(stage);
     return Status::OK();
 }
 
@@ -467,7 +511,8 @@ std::string 
RuntimeFilterMergeControllerEntity::debug_string() {
     std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
     std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
     for (const auto& [filter_id, ctx] : _filter_map) {
-        result += fmt::format("{}\n", ctx.merger->debug_string());
+        result += fmt::format("filter_id: {}, stage: {}, {}\n", filter_id, 
ctx.stage,
+                              ctx.merger->debug_string());
     }
     return result;
 }
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h 
b/be/src/exec/runtime_filter/runtime_filter_mgr.h
index 5fc0e381264..9959cb567a4 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h
@@ -58,6 +58,9 @@ struct LocalMergeContext {
     std::mutex mtx;
     std::shared_ptr<RuntimeFilterMerger> merger;
     std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
+    // Tracks the recursive CTE round.  When a producer from a newer round
+    // registers, the context is reset (merger recreated, old producers 
dropped).
+    uint32_t stage = 0;
 
     Status register_producer(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc,
                              std::shared_ptr<RuntimeFilterProducer> producer);
@@ -72,6 +75,10 @@ struct GlobalMergeContext {
     std::vector<PNetworkAddress> source_addrs;
     std::atomic<bool> done = false;
 
+    // for represent the round number of recursive cte
+    // if lower stage rf input to higher stage, we just discard the rf
+    uint32_t stage = 0;
+
     Status reset(QueryContext* query_ctx);
 };
 
@@ -104,13 +111,15 @@ public:
 
     std::string debug_string();
 
-    void remove_filters(const std::set<int32_t>& filter_ids) {
+    void remove_filter(int32_t filter_id) {
         std::lock_guard<std::mutex> l(_lock);
-        for (const auto& id : filter_ids) {
-            _consumer_map.erase(id);
-            _local_merge_map.erase(id);
-            _producer_id_set.erase(id);
-        }
+        _consumer_map.erase(filter_id);
+        // NOTE: _local_merge_map is NOT erased here.  It is reset lazily in
+        // LocalMergeContext::register_producer when a producer from a newer
+        // recursive CTE round registers.  Erasing eagerly here would race with
+        // multi-fragment REBUILD: a consumer-only fragment's remove_filter 
could
+        // delete the entry that the producer fragment just re-registered.
+        _producer_id_set.erase(filter_id);
     }
 
 private:
diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp 
b/be/src/exec/runtime_filter/runtime_filter_producer.cpp
index 76e4ad0a0c2..4fbd1704288 100644
--- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp
@@ -57,6 +57,10 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, 
bool build_hash_table
         LocalMergeContext* context = nullptr;
         
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
                 _wrapper->filter_id(), &context));
+        if (context == nullptr) {
+            // Filter was removed during a recursive CTE stage reset; this 
producer is stale.
+            return Status::OK();
+        }
         std::lock_guard l(context->mtx);
         RETURN_IF_ERROR(context->merger->merge_from(this));
         if (context->merger->ready()) {
@@ -170,6 +174,10 @@ Status RuntimeFilterProducer::send_size(RuntimeState* 
state, uint64_t local_filt
         LocalMergeContext* merger_context = nullptr;
         
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
                 _wrapper->filter_id(), &merger_context));
+        if (merger_context == nullptr) {
+            // Filter was removed during a recursive CTE stage reset; this 
producer is stale.
+            return Status::OK();
+        }
         std::lock_guard merger_lock(merger_context->mtx);
         if (merger_context->merger->add_rf_size(local_filter_size)) {
             if (!_has_remote_target) {
@@ -199,6 +207,8 @@ Status RuntimeFilterProducer::send_size(RuntimeState* 
state, uint64_t local_filt
     }
 
     auto request = std::make_shared<PSendFilterSizeRequest>();
+    request->set_stage(_stage);
+
     auto callback = 
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
     // RuntimeFilter maybe deconstructed before the rpc finished, so that 
could not use
     // a raw pointer in closure. Has to use the context's shared ptr.
diff --git a/be/src/exec/scan/scanner_context.cpp 
b/be/src/exec/scan/scanner_context.cpp
index d62b6880ecd..f35c6e1732a 100644
--- a/be/src/exec/scan/scanner_context.cpp
+++ b/be/src/exec/scan/scanner_context.cpp
@@ -100,9 +100,6 @@ ScannerContext::ScannerContext(RuntimeState* state, 
ScanLocalStateBase* local_st
     }
     _dependency = dependency;
     DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
-    if (auto ctx = task_exec_ctx(); ctx) {
-        ctx->ref_task_execution_ctx();
-    }
 }
 
 int64_t ScannerContext::acquire_limit_quota(int64_t desired) {
@@ -218,9 +215,6 @@ ScannerContext::~ScannerContext() {
         }
         _task_handle = nullptr;
     }
-    if (auto ctx = task_exec_ctx(); ctx) {
-        ctx->unref_task_execution_ctx();
-    }
 }
 
 BlockUPtr ScannerContext::get_free_block(bool force) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 10c4a7a96b6..55377524ec2 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -280,13 +280,11 @@ Status ConcurrentContextMap<Key, Value, 
ValueType>::apply_if_not_exists(
 }
 
 template <typename Key, typename Value, typename ValueType>
-void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
+bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
     auto id = get_map_id(query_id, _internal_map.size());
-    {
-        std::unique_lock lock(*_internal_map[id].first);
-        auto& map = _internal_map[id].second;
-        map.erase(query_id);
-    }
+    std::unique_lock lock(*_internal_map[id].first);
+    auto& map = _internal_map[id].second;
+    return map.erase(query_id) != 0;
 }
 
 template <typename Key, typename Value, typename ValueType>
@@ -344,6 +342,10 @@ void FragmentMgr::stop() {
     // destructred and remove it from _query_ctx_map_delay_delete which is 
destructring. it's UB.
     _query_ctx_map_delay_delete.clear();
     _pipeline_map.clear();
+    {
+        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+        _rerunnable_params_map.clear();
+    }
 }
 
 std::string FragmentMgr::to_http_path(const std::string& file_name) {
@@ -676,16 +678,28 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
 }
 
 void FragmentMgr::remove_pipeline_context(std::pair<TUniqueId, int> key) {
-    int64_t now = duration_cast<std::chrono::milliseconds>(
-                          std::chrono::system_clock::now().time_since_epoch())
-                          .count();
-    g_fragment_executing_count << -1;
-    g_fragment_last_active_time.set_value(now);
-
-    _pipeline_map.erase(key);
+    if (_pipeline_map.erase(key)) {
+        int64_t now = duration_cast<std::chrono::milliseconds>(
+                              
std::chrono::system_clock::now().time_since_epoch())
+                              .count();
+        g_fragment_executing_count << -1;
+        g_fragment_last_active_time.set_value(now);
+    }
 }
 
 void FragmentMgr::remove_query_context(const TUniqueId& key) {
+    // Clean up any saved rerunnable params for this query to avoid memory 
leaks.
+    // This covers both cancel and normal destruction paths.
+    {
+        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+        for (auto it = _rerunnable_params_map.begin(); it != 
_rerunnable_params_map.end();) {
+            if (it->first.first == key) {
+                it = _rerunnable_params_map.erase(it);
+            } else {
+                ++it;
+            }
+        }
+    }
     _query_ctx_map_delay_delete.erase(key);
 #ifndef BE_TEST
     _query_ctx_map.erase(key);
@@ -922,6 +936,19 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         _pipeline_map.insert({params.query_id, params.fragment_id}, context);
     }
 
+    // Save params for recursive CTE child fragments so we can recreate the 
PFC later.
+    // For recursive CTE, the child fragment needs to be destroyed and rebuilt 
between rounds,
+    // so we save the original params here and use them in 
rerun_fragment(rebuild).
+    if (params.__isset.need_notify_close && params.need_notify_close) {
+        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+        _rerunnable_params_map[{params.query_id, params.fragment_id}] = {
+                .deregister_runtime_filter_ids = {},
+                .params = params,
+                .parent = parent,
+                .finish_callback = cb,
+                .query_ctx = query_ctx};
+    }
+
     if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
         query_ctx->set_ready_to_execute_only();
     }
@@ -1337,6 +1364,10 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
 
         // 2. create the filter wrapper to replace or ignore/disable the 
target filters
         if (!filters.empty()) {
+            // Discard stale-stage requests from old recursive CTE rounds.
+            if (filters[0]->stage() != request->stage()) {
+                return Status::OK();
+            }
             RETURN_IF_ERROR(filters[0]->assign(*request, attach_data));
             std::ranges::for_each(filters, [&](auto& filter) { 
filter->signal(filters[0].get()); });
         }
@@ -1462,35 +1493,112 @@ Status FragmentMgr::transmit_rec_cte_block(
     }
 }
 
-Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+// Orchestrates the recursive CTE fragment lifecycle through 4 phases:
+//
+// wait_for_destroy: collect deregister RF IDs, store brpc closure, trigger 
old PFC close
+// rebuild: increment stage, deregister old RFs, create+prepare new PFC from 
saved params
+// submit: submit the new PFC's pipeline tasks for execution
+// final_close: async wait for close, send final report, clean up (last round 
only)
+//
+// The brpc ClosureGuard is stored in the PFC so the RPC response is deferred 
until
+// the PFC is fully destroyed. This gives the caller (RecCTESourceOperatorX) a
+// synchronization point to know when the old PFC has finished all its tasks.
+Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>& 
guard,
+                                   const TUniqueId& query_id, int fragment_id,
                                    PRerunFragmentParams_Opcode stage) {
-    if (auto q_ctx = get_query_ctx(query_id)) {
-        SCOPED_ATTACH_TASK(q_ctx.get());
-        auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+    if (stage == PRerunFragmentParams::WAIT_FOR_DESTROY ||
+        stage == PRerunFragmentParams::FINAL_CLOSE) {
+        auto fragment_ctx = _pipeline_map.find({query_id, fragment_id});
         if (!fragment_ctx) {
             return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
-                                    print_id(query_id), fragment);
+                                    print_id(query_id), fragment_id);
         }
 
-        if (stage == PRerunFragmentParams::wait) {
-            return fragment_ctx->wait_close(false);
-        } else if (stage == PRerunFragmentParams::release) {
-            return fragment_ctx->set_to_rerun();
-        } else if (stage == PRerunFragmentParams::rebuild) {
-            return fragment_ctx->rebuild(_thread_pool.get());
-        } else if (stage == PRerunFragmentParams::submit) {
-            return fragment_ctx->submit();
-        } else if (stage == PRerunFragmentParams::close) {
-            return fragment_ctx->wait_close(true);
-        } else {
-            return Status::InvalidArgument("Unknown rerun fragment opcode: 
{}", stage);
+        if (stage == PRerunFragmentParams::WAIT_FOR_DESTROY) {
+            std::unique_lock<std::mutex> lk(_rerunnable_params_lock);
+            auto it = _rerunnable_params_map.find({query_id, fragment_id});
+            if (it == _rerunnable_params_map.end()) {
+                lk.unlock();
+                auto st = fragment_ctx->listen_wait_close(guard, true);
+                if (!st.ok()) {
+                    LOG(WARNING) << fmt::format(
+                            "wait_for_destroy fragment context (query-id: {}, 
fragment-id: "
+                            "{}) failed: {}",
+                            print_id(query_id), fragment_id, st.to_string());
+                }
+                return Status::NotFound(
+                        "Rerunnable params (query-id: {}, fragment-id: {}) not 
found",
+                        print_id(query_id), fragment_id);
+            }
+
+            it->second.deregister_runtime_filter_ids.merge(
+                    fragment_ctx->get_deregister_runtime_filter());
         }
+
+        auto* query_ctx = fragment_ctx->get_query_ctx();
+        SCOPED_ATTACH_TASK(query_ctx);
+        RETURN_IF_ERROR(
+                fragment_ctx->listen_wait_close(guard, stage == 
PRerunFragmentParams::FINAL_CLOSE));
+        fragment_ctx->notify_close();
+        return Status::OK();
+    } else if (stage == PRerunFragmentParams::REBUILD) {
+        auto q_ctx = get_query_ctx(query_id);
+        if (!q_ctx) {
+            return Status::NotFound(
+                    "rerun_fragment: Query context (query-id: {}) not found, 
maybe finished",
+                    print_id(query_id));
+        }
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        RerunableFragmentInfo info;
+        {
+            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+            auto it = _rerunnable_params_map.find({query_id, fragment_id});
+            if (it == _rerunnable_params_map.end()) {
+                return Status::NotFound("rebuild (query-id: {}, fragment-id: 
{}) not found",
+                                        print_id(query_id), fragment_id);
+            }
+            it->second.stage++;
+            // Deregister old runtime filters so new ones can be registered in 
the new PFC.
+            for (int32_t filter_id : it->second.deregister_runtime_filter_ids) 
{
+                q_ctx->runtime_filter_mgr()->remove_filter(filter_id);
+            }
+            info = it->second;
+        }
+
+        auto context = std::make_shared<PipelineFragmentContext>(
+                q_ctx->query_id(), info.params, q_ctx, _exec_env, 
info.finish_callback,
+                [this](const ReportStatusRequest& req, auto&& ctx) {
+                    return this->trigger_pipeline_context_report(req, 
std::move(ctx));
+                });
+        // Propagate the recursion stage so that runtime filters created by 
this PFC
+        // carry the correct stage number.
+        context->set_rec_cte_stage(info.stage);
+
+        Status prepare_st = Status::OK();
+        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = 
context->prepare(_thread_pool.get()),
+                                         prepare_st);
+        if (!prepare_st.ok()) {
+            q_ctx->cancel(prepare_st, info.params.fragment_id);
+            return prepare_st;
+        }
+
+        // Insert new PFC into _pipeline_map (old one was removed)
+        _pipeline_map.insert({info.params.query_id, info.params.fragment_id}, 
context);
+
+        // Update QueryContext mapping (must support overwrite)
+        q_ctx->set_pipeline_context(info.params.fragment_id, context);
+        return Status::OK();
+
+    } else if (stage == PRerunFragmentParams::SUBMIT) {
+        auto fragment_ctx = _pipeline_map.find({query_id, fragment_id});
+        if (!fragment_ctx) {
+            return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
+                                    print_id(query_id), fragment_id);
+        }
+        return fragment_ctx->submit();
     } else {
-        return Status::NotFound(
-                "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
-                print_id(query_id));
+        return Status::InvalidArgument("Unknown rerun fragment opcode: {}", 
stage);
     }
-    return Status::OK();
 }
 
 Status FragmentMgr::reset_global_rf(const TUniqueId& query_id,
@@ -1503,7 +1611,6 @@ Status FragmentMgr::reset_global_rf(const TUniqueId& 
query_id,
                 "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
                 print_id(query_id));
     }
-    return Status::OK();
 }
 
 #include "common/compile_check_end.h"
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c9462d79010..6a306b60eda 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <brpc/closure_guard.h>
 #include <gen_cpp/FrontendService_types.h>
 #include <gen_cpp/QueryPlanExtra_types.h>
 #include <gen_cpp/Types_types.h>
@@ -25,8 +26,10 @@
 #include <cstdint>
 #include <functional>
 #include <iosfwd>
+#include <map>
 #include <memory>
 #include <mutex>
+#include <set>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -74,7 +77,7 @@ public:
     Value find(const Key& query_id);
     void insert(const Key& query_id, std::shared_ptr<ValueType>);
     void clear();
-    void erase(const Key& query_id);
+    bool erase(const Key& query_id);
     size_t num_items() const {
         size_t n = 0;
         for (auto& pair : _internal_map) {
@@ -189,7 +192,8 @@ public:
                                   const 
google::protobuf::RepeatedPtrField<PBlock>& pblocks,
                                   bool eos);
 
-    Status rerun_fragment(const TUniqueId& query_id, int fragment,
+    Status rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>& guard,
+                          const TUniqueId& query_id, int fragment,
                           PRerunFragmentParams_Opcode stage);
 
     Status reset_global_rf(const TUniqueId& query_id,
@@ -230,6 +234,26 @@ private:
                          PipelineFragmentContext>
             _pipeline_map;
 
+    // Saved params and callback for rerunnable (recursive CTE) fragments.
+    // Only populated when need_notify_close == true during exec_plan_fragment.
+    // Lifecycle: created in exec_plan_fragment(), used in 
rerun_fragment(rebuild)
+    // to recreate PFC with fresh state, cleaned up in remove_query_context().
+    struct RerunableFragmentInfo {
+        // Runtime filter IDs registered by the old PFC, collected during 
wait_for_destroy.
+        // These are deregistered from the RuntimeFilterMgr before the new PFC 
is created.
+        std::set<int> deregister_runtime_filter_ids;
+        // Original params from FE, used to recreate the PFC each round.
+        TPipelineFragmentParams params;
+        TPipelineFragmentParamsList parent;
+        FinishCallback finish_callback;
+        // Hold query_ctx to prevent it from being destroyed while rerunnable 
fragments exist.
+        std::shared_ptr<QueryContext> query_ctx;
+        // Monotonically increasing stage counter, stamps runtime filter RPCs.
+        uint32_t stage = 0;
+    };
+    std::mutex _rerunnable_params_lock;
+    std::map<std::pair<TUniqueId, int>, RerunableFragmentInfo> 
_rerunnable_params_map;
+
     // query id -> QueryContext
     ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> 
_query_ctx_map;
     // keep query ctx do not delete immediately to make rf coordinator merge 
filter work well after query eos
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 3081ae70464..24831a1f485 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -388,7 +388,9 @@ std::string QueryContext::print_all_pipeline_context() {
 void QueryContext::set_pipeline_context(const int fragment_id,
                                         
std::shared_ptr<PipelineFragmentContext> pip_ctx) {
     std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
-    _fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx});
+    // Use insert_or_assign instead of insert to support overwriting old 
entries
+    // when recursive CTE recreates PipelineFragmentContext between rounds.
+    _fragment_id_to_pipeline_ctx.insert_or_assign(fragment_id, pip_ctx);
 }
 
 doris::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
@@ -472,10 +474,6 @@ QueryContext::_collect_realtime_query_profile() {
                 continue;
             }
 
-            if (fragment_ctx->need_notify_close()) {
-                continue;
-            }
-
             auto profile = fragment_ctx->collect_realtime_profile();
 
             if (profile.empty()) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index aa1746ed8b0..2cf8ec76e98 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -411,7 +411,7 @@ public:
     timespec get_query_arrival_timestamp() const { return 
this->_query_arrival_timestamp; }
     QuerySource get_query_source() const { return this->_query_source; }
 
-    const TQueryOptions get_query_options() const { return _query_options; }
+    TQueryOptions get_query_options() const { return _query_options; }
 };
 
 } // namespace doris
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index dcd00b49996..a6f8319476d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -37,8 +37,11 @@
 #include "common/status.h"
 #include "core/value/vdatetime_value.h"
 #include "exec/operator/operator.h"
+#include "exec/pipeline/pipeline_fragment_context.h"
 #include "exec/pipeline/pipeline_task.h"
+#include "exec/runtime_filter/runtime_filter_consumer.h"
 #include "exec/runtime_filter/runtime_filter_mgr.h"
+#include "exec/runtime_filter/runtime_filter_producer.h"
 #include "io/fs/s3_file_system.h"
 #include "load/load_path_mgr.h"
 #include "runtime/exec_env.h"
@@ -184,10 +187,17 @@ RuntimeState::~RuntimeState() {
     if (_error_log_file != nullptr && _error_log_file->is_open()) {
         _error_log_file->close();
     }
-
     _obj_pool->clear();
 }
 
+const std::set<int>& RuntimeState::get_deregister_runtime_filter() const {
+    return _registered_runtime_filter_ids;
+}
+
+void RuntimeState::merge_register_runtime_filter(const std::set<int>& 
runtime_filter_ids) {
+    _registered_runtime_filter_ids.insert(runtime_filter_ids.begin(), 
runtime_filter_ids.end());
+}
+
 Status RuntimeState::init(const TUniqueId& fragment_instance_id, const 
TQueryOptions& query_options,
                           const TQueryGlobals& query_globals, ExecEnv* 
exec_env) {
     _fragment_instance_id = fragment_instance_id;
@@ -473,6 +483,18 @@ Status RuntimeState::register_producer_runtime_filter(
     // When RF is published, consumers in both global and local RF mgr will be 
found.
     
RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(_query_ctx,
 desc,
                                                                          
producer_filter));
+    // Stamp the producer with the current recursive CTE stage so that 
outgoing merge RPCs
+    // carry the correct round number and stale messages from old rounds are 
discarded.
+    // PFC must still be alive: this runs inside a pipeline task, so the 
execution context
+    // cannot have expired yet.
+    // In unit-test scenarios the task execution context is never set (no 
PipelineFragmentContext
+    // exists), so skip the stage stamping — the default stage (0) is correct.
+    if (task_execution_context_inited()) {
+        auto pfc = std::static_pointer_cast<PipelineFragmentContext>(
+                get_task_execution_context().lock());
+        DORIS_CHECK(pfc);
+        (*producer_filter)->set_stage(pfc->rec_cte_stage());
+    }
     
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter(
             _query_ctx, desc, *producer_filter));
     return Status::OK();
@@ -484,7 +506,20 @@ Status RuntimeState::register_consumer_runtime_filter(
     _registered_runtime_filter_ids.insert(desc.filter_id);
     bool need_merge = desc.has_remote_targets || need_local_merge;
     RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : 
local_runtime_filter_mgr();
-    return mgr->register_consumer_filter(this, desc, node_id, consumer_filter);
+    RETURN_IF_ERROR(mgr->register_consumer_filter(this, desc, node_id, 
consumer_filter));
+    // Stamp the consumer with the current recursive CTE stage so that 
incoming publish RPCs
+    // from old rounds are detected and discarded.
+    // PFC must still be alive: this runs inside a pipeline task, so the 
execution context
+    // cannot have expired yet.
+    // In unit-test scenarios the task execution context is never set (no 
PipelineFragmentContext
+    // exists), so skip the stage stamping — the default stage (0) is correct.
+    if (task_execution_context_inited()) {
+        auto pfc = std::static_pointer_cast<PipelineFragmentContext>(
+                get_task_execution_context().lock());
+        DORIS_CHECK(pfc);
+        (*consumer_filter)->set_stage(pfc->rec_cte_stage());
+    }
+    return Status::OK();
 }
 
 bool RuntimeState::is_nereids() const {
@@ -496,16 +531,6 @@ std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::pipeline_id_to_profil
     return _pipeline_id_to_profile;
 }
 
-void RuntimeState::reset_to_rerun() {
-    if (local_runtime_filter_mgr()) {
-        
local_runtime_filter_mgr()->remove_filters(_registered_runtime_filter_ids);
-        
global_runtime_filter_mgr()->remove_filters(_registered_runtime_filter_ids);
-        _registered_runtime_filter_ids.clear();
-    }
-    std::unique_lock lc(_pipeline_profile_lock);
-    _pipeline_id_to_profile.clear();
-}
-
 std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::build_pipeline_profile(
         std::size_t pipeline_size) {
     std::unique_lock lc(_pipeline_profile_lock);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e44062b6888..5445cfb1d03 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -644,6 +644,8 @@ public:
         _task_execution_context = context;
     }
 
+    bool task_execution_context_inited() const { return 
_task_execution_context_inited; }
+
     std::weak_ptr<TaskExecutionContext> get_task_execution_context() {
         CHECK(_task_execution_context_inited)
                 << "_task_execution_context_inited == false, the ctx is not 
inited";
@@ -814,17 +816,15 @@ public:
                                       _query_options.hnsw_bounded_queue, 
_query_options.ivf_nprobe);
     }
 
-    void reset_to_rerun();
-
-    void set_force_make_rf_wait_infinite() {
-        _query_options.__set_runtime_filter_wait_infinitely(true);
-    }
-
     bool runtime_filter_wait_infinitely() const {
         return _query_options.__isset.runtime_filter_wait_infinitely &&
                _query_options.runtime_filter_wait_infinitely;
     }
 
+    const std::set<int>& get_deregister_runtime_filter() const;
+
+    void merge_register_runtime_filter(const std::set<int>& 
runtime_filter_ids);
+
 private:
     Status create_error_log_file();
 
diff --git a/be/src/runtime/task_execution_context.cpp 
b/be/src/runtime/task_execution_context.cpp
index 91ed5813510..2f44dde7c2c 100644
--- a/be/src/runtime/task_execution_context.cpp
+++ b/be/src/runtime/task_execution_context.cpp
@@ -19,22 +19,14 @@
 
 #include <glog/logging.h>
 
-#include <condition_variable>
-
 namespace doris {
-void TaskExecutionContext::ref_task_execution_ctx() {
-    ++_has_task_execution_ctx_ref_count;
-}
 
-void TaskExecutionContext::unref_task_execution_ctx() {
-    --_has_task_execution_ctx_ref_count;
-    if (_has_task_execution_ctx_ref_count == 0) {
-        _notify_cv.notify_all();
-    }
-}
+TaskExecutionContext ::TaskExecutionContext() = default;
+TaskExecutionContext ::~TaskExecutionContext() = default;
 
 HasTaskExecutionCtx::HasTaskExecutionCtx(RuntimeState* state)
         : task_exec_ctx_(state->get_task_execution_context()) {}
 
 HasTaskExecutionCtx::~HasTaskExecutionCtx() = default;
+
 } // namespace doris
diff --git a/be/src/runtime/task_execution_context.h 
b/be/src/runtime/task_execution_context.h
index f84bcc0ceb1..9b90e6ae95d 100644
--- a/be/src/runtime/task_execution_context.h
+++ b/be/src/runtime/task_execution_context.h
@@ -17,8 +17,6 @@
 
 #pragma once
 
-#include <atomic>
-#include <condition_variable>
 #include <memory>
 
 #include "runtime/runtime_state.h"
@@ -27,22 +25,14 @@ namespace doris {
 
 class RuntimeState;
 
-// This class act as a super class of all context like things such as
-// plan fragment executor or pipelinefragmentcontext or 
pipelinexfragmentcontext
+// Base class for execution contexts (e.g. PipelineFragmentContext).
+//
+// For recursive CTE, the PFC (which inherits from this class) is held by 
external threads
+// (scanner threads, brpc callbacks, etc.) via weak_ptr<TaskExecutionContext>.
 class TaskExecutionContext : public 
std::enable_shared_from_this<TaskExecutionContext> {
 public:
-    TaskExecutionContext() = default;
-    virtual ~TaskExecutionContext() = default;
-
-    void ref_task_execution_ctx();
-
-    void unref_task_execution_ctx();
-
-    int has_task_execution_ctx_ref_count() const { return 
_has_task_execution_ctx_ref_count; }
-
-protected:
-    std::atomic<int> _has_task_execution_ctx_ref_count = 0;
-    std::condition_variable _notify_cv;
+    TaskExecutionContext();
+    virtual ~TaskExecutionContext();
 };
 
 using TaskExecutionContextSPtr = std::shared_ptr<TaskExecutionContext>;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 2f2250f265d..e4ac5b79bca 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1699,10 +1699,15 @@ void 
PInternalService::rerun_fragment(google::protobuf::RpcController* controlle
                                       PRerunFragmentResult* response,
                                       google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        auto st =
-                
_exec_env->fragment_mgr()->rerun_fragment(UniqueId(request->query_id()).to_thrift(),
-                                                          
request->fragment_id(), request->stage());
+        // Use shared_ptr<ClosureGuard> so we can transfer ownership to the 
PFC.
+        // For wait_for_destroy/final_close, the guard is stored in the PFC 
and the RPC
+        // response is deferred until the PFC is fully destroyed. For 
rebuild/submit,
+        // the guard fires immediately when this lambda returns.
+        std::shared_ptr<brpc::ClosureGuard> closure_guard =
+                std::make_shared<brpc::ClosureGuard>(done);
+        auto st = _exec_env->fragment_mgr()->rerun_fragment(
+                closure_guard, UniqueId(request->query_id()).to_thrift(), 
request->fragment_id(),
+                request->stage());
         st.to_protobuf(response->mutable_status());
     });
     if (!ret) {
diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
index b80d682dba1..d6ccc080961 100644
--- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
@@ -104,9 +104,13 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
         EXPECT_NE(producer_filter, nullptr);
 
         LocalMergeContext* local_merge_filters = nullptr;
-        EXPECT_FALSE(global_runtime_filter_mgr
-                             ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
-                             .ok());
+        // filter_id not yet registered: global mgr returns OK with nullptr
+        // (graceful skip for recursive CTE stage reset).
+        EXPECT_TRUE(global_runtime_filter_mgr
+                            ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
+                            .ok());
+        EXPECT_EQ(local_merge_filters, nullptr);
+        // local mgr always returns error (not supported)
         EXPECT_FALSE(local_runtime_filter_mgr
                              ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
                              .ok());
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 23a47080e07..50274c17627 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -75,11 +75,10 @@ message PTransmitRecCTEBlockResult {
 
 message PRerunFragmentParams {
     enum Opcode {
-    wait = 1;    // wait fragment execute done
-    release = 2; // release current round's resource
-    rebuild = 3; // rebuild next round pipeline tasks
-    submit = 4;  // submit tasks to execute
-    close = 5;   // close fragment
+    WAIT_FOR_DESTROY = 1;     // deregister RF, destroy old PFC, async wait 
for tasks to close via brpc closure
+    REBUILD = 2;              // rebuild next round pipeline tasks
+    SUBMIT = 3;               // submit tasks to execute
+    FINAL_CLOSE = 4;          // async wait for tasks to close, send report, 
and clean up (last round)
     }
     optional PUniqueId query_id = 1;
     optional int32 fragment_id = 2;
@@ -615,6 +614,7 @@ message PSendFilterSizeRequest {
     required PUniqueId query_id = 2;
     required PNetworkAddress source_addr = 3;
     required uint64 filter_size = 4;
+    optional uint32 stage = 5;
 };
 
 message PSendFilterSizeResponse {
@@ -625,6 +625,7 @@ message PSyncFilterSizeRequest {
     required int32 filter_id = 1;
     required PUniqueId query_id = 2;
     required uint64 filter_size = 3;
+    optional uint32 stage = 5;
 };
 
 message PSyncFilterSizeResponse {
@@ -646,6 +647,7 @@ message PMergeFilterRequest {
     optional bool ignored = 12;
     optional uint64 local_merge_time = 13;
     optional bool disabled = 14;
+    optional uint32 stage = 15;
 };
 
 message PMergeFilterResponse {
@@ -667,6 +669,7 @@ message PPublishFilterRequestV2 {
     repeated int32 fragment_ids = 12; // deprecated
     optional uint64 local_merge_time = 13;
     optional bool disabled = 14;
+    optional uint32 stage = 15;
 };
 
 message PPublishFilterResponse {
diff --git 
a/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
 
b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
index a01ab03db34..22d7be8ef08 100644
--- 
a/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
+++ 
b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
@@ -118,6 +118,24 @@ suite ("rec_cte_from_ck_doc") {
 
     // test global rf
     sql "set enable_runtime_filter_prune = false;"
+    sql "set runtime_filter_wait_infinitely = false;"
+    test {
+        sql """
+        WITH RECURSIVE search_graph AS (
+            SELECT c_from, c_to, label FROM graph g
+            UNION ALL
+            SELECT g.c_from, g.c_to, g.label
+            FROM graph g join [shuffle] search_graph sg
+            on g.c_from = sg.c_to
+        )
+        SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to;
+        """
+        exception "ABORTED"
+    }
+
+    // test global rf
+    sql "set enable_runtime_filter_prune = false;"
+    sql "set runtime_filter_wait_infinitely = true;"
     test {
         sql """
         WITH RECURSIVE search_graph AS (


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to