This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d86daef5a8f [Chore](query) add _query_ctx_map_delay_delete (#59262)
d86daef5a8f is described below

commit d86daef5a8f5490d028cc3bcd0fee10ef4172759
Author: Pxl <[email protected]>
AuthorDate: Sun Jan 4 11:28:57 2026 +0800

    [Chore](query) add _query_ctx_map_delay_delete (#59262)
    
    This pull request introduces a delayed deletion mechanism for
    `QueryContext` objects in the fragment manager to ensure that runtime
    filter merging works correctly after a query reaches its end-of-stream
    (EOS). The main changes involve adding a new map to retain query
    contexts temporarily, modifying the lifecycle management of query
    contexts, and introducing utility methods to support this behavior.
    
    **Query Context Lifecycle Management:**
    
    * Added a new member `_query_ctx_map_delay_delete` to `FragmentMgr` to
    keep `QueryContext` objects alive for a short period after query EOS,
    ensuring that the runtime filter coordinator can complete its work.
    * Modified `FragmentMgr::remove_query_context` to erase entries from
    `_query_ctx_map_delay_delete` as part of the cleanup process.
    * Updated the `QueryContext` destructor to call `remove_query_context`
    only if the fragment manager is available, preventing issues during unit
    tests.
    
    **Runtime Filter Merge Coordination:**
    
    * In `FragmentMgr::_get_or_create_query_ctx`, now inserts the
    `QueryContext` into `_query_ctx_map_delay_delete` if the runtime filter
    merge handler is not empty, ensuring the context is retained for
    merging.
    * Added an `empty()` method to `RuntimeFilterMergeControllerEntity` to
    check if the filter map is empty, which is used to determine if delayed
    deletion is needed.
---
 be/src/runtime/fragment_mgr.cpp              |  4 ++++
 be/src/runtime/fragment_mgr.h                |  3 +++
 be/src/runtime/query_context.cpp             |  8 ++++----
 be/src/runtime_filter/runtime_filter_mgr.cpp | 24 ------------------------
 be/src/runtime_filter/runtime_filter_mgr.h   |  5 ++++-
 5 files changed, 15 insertions(+), 29 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fed6c3931fe..8ceb04aa909 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -670,6 +670,7 @@ void 
FragmentMgr::remove_pipeline_context(std::pair<TUniqueId, int> key) {
 }
 
 void FragmentMgr::remove_query_context(const TUniqueId& key) {
+    _query_ctx_map_delay_delete.erase(key);
 #ifndef BE_TEST
     _query_ctx_map.erase(key);
 #endif
@@ -768,6 +769,9 @@ Status FragmentMgr::_get_or_create_query_ctx(const 
TPipelineFragmentParams& para
 
                                 
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
                                         info.runtime_filter_params);
+                                if (!handler->empty()) {
+                                    
_query_ctx_map_delay_delete.insert(query_id, query_ctx);
+                                }
                             }
                             if (info.__isset.topn_filter_descs) {
                                 
query_ctx->init_runtime_predicates(info.topn_filter_descs);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 826bca81901..63cb4dc0808 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -209,6 +209,9 @@ private:
 
     // query id -> QueryContext
     ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> 
_query_ctx_map;
+    // keep query ctx do not delete immediately to make rf coordinator merge 
filter work well after query eos
+    ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryContext>, 
QueryContext>
+            _query_ctx_map_delay_delete;
 
     CountDownLatch _stop_background_threads_latch;
     std::shared_ptr<Thread> _cancel_thread;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2b142f9a9f0..ddf76988fe4 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -231,13 +231,13 @@ QueryContext::~QueryContext() {
     _runtime_predicates.clear();
     file_scan_range_params_map.clear();
     obj_pool.clear();
-    if (_merge_controller_handler) {
-        _merge_controller_handler->release_undone_filters(this);
-    }
     _merge_controller_handler.reset();
 
     DorisMetrics::instance()->query_ctx_cnt->increment(-1);
-    
ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);
+    // fragment_mgr is nullptr in unittest
+    if (ExecEnv::GetInstance()->fragment_mgr()) {
+        
ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);
+    }
     // the only one msg shows query's end. any other msg should append to it 
if need.
     LOG_INFO("Query {} deconstructed, mem_tracker: {}", 
print_id(this->_query_id), mem_tracker_msg);
 }
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 48645ffdb8e..b3c1cfa6c93 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -433,30 +433,6 @@ Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
     return st;
 }
 
-void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* 
query_ctx) {
-    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
-    for (auto& [filter_id, ctx] : _filter_map) {
-        if (!ctx.done && !ctx.targetv2_info.empty()) {
-            {
-                std::lock_guard<std::mutex> l(ctx.mtx);
-                ctx.merger->set_wrapper_state_and_ready_to_apply(
-                        RuntimeFilterWrapper::State::DISABLED,
-                        "rf coordinator's query context released before 
runtime filter is ready to "
-                        "apply");
-            }
-            auto st = _send_rf_to_target(ctx, std::weak_ptr<QueryContext> {}, 
0,
-                                         
UniqueId(query_ctx->query_id()).to_proto(),
-                                         query_ctx->execution_timeout());
-            if (!st.ok()) {
-                LOG(WARNING)
-                        << "Failed to send runtime filter to target before 
query done. filter_id:"
-                        << filter_id << " " << ctx.merger->debug_string() << " 
reason:" << st;
-            }
-        }
-    }
-    _filter_map.clear();
-}
-
 std::string RuntimeFilterMergeControllerEntity::debug_string() {
     std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
     std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 160babf278d..2dcea3f7a4a 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -150,7 +150,10 @@ public:
 
     std::string debug_string();
 
-    void release_undone_filters(QueryContext* query_ctx);
+    bool empty() {
+        std::shared_lock<std::shared_mutex> read_lock(_filter_map_mutex);
+        return _filter_map.empty();
+    }
 
 private:
     Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to