This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d86daef5a8f [Chore](query) add _query_ctx_map_delay_delete (#59262)
d86daef5a8f is described below
commit d86daef5a8f5490d028cc3bcd0fee10ef4172759
Author: Pxl <[email protected]>
AuthorDate: Sun Jan 4 11:28:57 2026 +0800
[Chore](query) add _query_ctx_map_delay_delete (#59262)
This pull request introduces a delayed deletion mechanism for
`QueryContext` objects in the fragment manager to ensure that runtime
filter merging works correctly after a query reaches its end-of-stream
(EOS). The main changes involve adding a new map to retain query
contexts temporarily, modifying the lifecycle management of query
contexts, and introducing utility methods to support this behavior.
**Query Context Lifecycle Management:**
* Added a new member `_query_ctx_map_delay_delete` to `FragmentMgr` to
keep `QueryContext` objects alive for a short period after query EOS,
ensuring that the runtime filter coordinator can complete its work.
* Modified `FragmentMgr::remove_query_context` to erase entries from
`_query_ctx_map_delay_delete` as part of the cleanup process.
* Updated the `QueryContext` destructor to call `remove_query_context`
only if the fragment manager is available, preventing issues during unit
tests.
**Runtime Filter Merge Coordination:**
* In `FragmentMgr::_get_or_create_query_ctx`, now inserts the
`QueryContext` into `_query_ctx_map_delay_delete` if the runtime filter
merge handler is not empty, ensuring the context is retained for
merging.
* Added an `empty()` method to `RuntimeFilterMergeControllerEntity` to
check if the filter map is empty, which is used to determine if delayed
deletion is needed.
---
be/src/runtime/fragment_mgr.cpp | 4 ++++
be/src/runtime/fragment_mgr.h | 3 +++
be/src/runtime/query_context.cpp | 8 ++++----
be/src/runtime_filter/runtime_filter_mgr.cpp | 24 ------------------------
be/src/runtime_filter/runtime_filter_mgr.h | 5 ++++-
5 files changed, 15 insertions(+), 29 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fed6c3931fe..8ceb04aa909 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -670,6 +670,7 @@ void
FragmentMgr::remove_pipeline_context(std::pair<TUniqueId, int> key) {
}
void FragmentMgr::remove_query_context(const TUniqueId& key) {
+ _query_ctx_map_delay_delete.erase(key);
#ifndef BE_TEST
_query_ctx_map.erase(key);
#endif
@@ -768,6 +769,9 @@ Status FragmentMgr::_get_or_create_query_ctx(const
TPipelineFragmentParams& para
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
info.runtime_filter_params);
+ if (!handler->empty()) {
+
_query_ctx_map_delay_delete.insert(query_id, query_ctx);
+ }
}
if (info.__isset.topn_filter_descs) {
query_ctx->init_runtime_predicates(info.topn_filter_descs);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 826bca81901..63cb4dc0808 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -209,6 +209,9 @@ private:
// query id -> QueryContext
ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext>
_query_ctx_map;
+ // keep query ctx do not delete immediately to make rf coordinator merge
filter work well after query eos
+ ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryContext>,
QueryContext>
+ _query_ctx_map_delay_delete;
CountDownLatch _stop_background_threads_latch;
std::shared_ptr<Thread> _cancel_thread;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2b142f9a9f0..ddf76988fe4 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -231,13 +231,13 @@ QueryContext::~QueryContext() {
_runtime_predicates.clear();
file_scan_range_params_map.clear();
obj_pool.clear();
- if (_merge_controller_handler) {
- _merge_controller_handler->release_undone_filters(this);
- }
_merge_controller_handler.reset();
DorisMetrics::instance()->query_ctx_cnt->increment(-1);
-
ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);
+ // fragment_mgr is nullptr in unittest
+ if (ExecEnv::GetInstance()->fragment_mgr()) {
+
ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);
+ }
// the only one msg shows query's end. any other msg should append to it
if need.
LOG_INFO("Query {} deconstructed, mem_tracker: {}",
print_id(this->_query_id), mem_tracker_msg);
}
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 48645ffdb8e..b3c1cfa6c93 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -433,30 +433,6 @@ Status
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
return st;
}
-void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext*
query_ctx) {
- std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
- for (auto& [filter_id, ctx] : _filter_map) {
- if (!ctx.done && !ctx.targetv2_info.empty()) {
- {
- std::lock_guard<std::mutex> l(ctx.mtx);
- ctx.merger->set_wrapper_state_and_ready_to_apply(
- RuntimeFilterWrapper::State::DISABLED,
- "rf coordinator's query context released before
runtime filter is ready to "
- "apply");
- }
- auto st = _send_rf_to_target(ctx, std::weak_ptr<QueryContext> {},
0,
-
UniqueId(query_ctx->query_id()).to_proto(),
- query_ctx->execution_timeout());
- if (!st.ok()) {
- LOG(WARNING)
- << "Failed to send runtime filter to target before
query done. filter_id:"
- << filter_id << " " << ctx.merger->debug_string() << "
reason:" << st;
- }
- }
- }
- _filter_map.clear();
-}
-
std::string RuntimeFilterMergeControllerEntity::debug_string() {
std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h
b/be/src/runtime_filter/runtime_filter_mgr.h
index 160babf278d..2dcea3f7a4a 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -150,7 +150,10 @@ public:
std::string debug_string();
- void release_undone_filters(QueryContext* query_ctx);
+ bool empty() {
+ std::shared_lock<std::shared_mutex> read_lock(_filter_map_mutex);
+ return _filter_map.empty();
+ }
private:
Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]