This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cf28b101547 [Bug](runtime-filter) fix some rf error problems (#37273)
cf28b101547 is described below

commit cf28b1015473683313b895752a93dcd1e6bf729e
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Jul 4 19:31:50 2024 +0800

    [Bug](runtime-filter) fix some rf error problems (#37273)
    
    ## Proposed changes
    1. ignore rf when rf-mgr released
    2. move acquire rf controller to after acquire query_ctx on
    send_filter_size
    3. enlarge timeout limit on sync_filter_size/apply_filterv2
    4. logout rf's debug string when rpc meet error
---
 be/src/exprs/runtime_filter.cpp             | 17 +++++++++++++----
 be/src/exprs/runtime_filter_slots.h         |  9 +++++++++
 be/src/pipeline/pipeline_fragment_context.h |  3 ---
 be/src/runtime/fragment_mgr.cpp             |  9 +++++----
 be/src/runtime/runtime_filter_mgr.cpp       |  2 ++
 5 files changed, 29 insertions(+), 11 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2dcfc97b096..e69ff714d32 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1033,25 +1033,34 @@ Status IRuntimeFilter::publish(bool publish_local) {
 class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
                                                   
DummyBrpcCallback<PSendFilterSizeResponse>> {
     std::shared_ptr<pipeline::Dependency> _dependency;
+    IRuntimeFilter* _filter;
     using Base =
             AutoReleaseClosure<PSendFilterSizeRequest, 
DummyBrpcCallback<PSendFilterSizeResponse>>;
     ENABLE_FACTORY_CREATOR(SyncSizeClosure);
 
     void _process_if_rpc_failed() override {
         ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
+        LOG(WARNING) << "sync filter size meet rpc error, filter=" << 
_filter->debug_string();
         Base::_process_if_rpc_failed();
     }
 
     void _process_if_meet_error_status(const Status& status) override {
         ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
-        Base::_process_if_meet_error_status(status);
+        if (status.is<ErrorCode::END_OF_FILE>()) {
+            // rf merger backend may finished before rf's send_filter_size, we 
just ignore filter in this case.
+            _filter->set_ignored();
+        } else {
+            LOG(WARNING) << "sync filter size meet error status, filter="
+                         << _filter->debug_string();
+            Base::_process_if_meet_error_status(status);
+        }
     }
 
 public:
     SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
                     
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
-                    std::shared_ptr<pipeline::Dependency> dependency)
-            : Base(req, callback), _dependency(std::move(dependency)) {}
+                    std::shared_ptr<pipeline::Dependency> dependency, 
IRuntimeFilter* filter)
+            : Base(req, callback), _dependency(std::move(dependency)), 
_filter(filter) {}
 };
 
 Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t 
local_filter_size) {
@@ -1094,7 +1103,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
 
     auto request = std::make_shared<PSendFilterSizeRequest>();
     auto callback = 
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
-    auto closure = SyncSizeClosure::create_unique(request, callback, 
_dependency);
+    auto closure = SyncSizeClosure::create_unique(request, callback, 
_dependency, this);
     auto* pquery_id = request->mutable_query_id();
     pquery_id->set_hi(_state->query_id.hi());
     pquery_id->set_lo(_state->query_id.lo());
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 0bf8a33f9f2..ebda4b56fcc 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -71,6 +71,9 @@ public:
         // process ignore duplicate IN_FILTER
         std::unordered_set<int> has_in_filter;
         for (auto* filter : _runtime_filters) {
+            if (filter->get_ignored()) {
+                continue;
+            }
             if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
                 continue;
             }
@@ -83,6 +86,9 @@ public:
 
         // process ignore filter when it has IN_FILTER on same expr, and init 
bloom filter size
         for (auto* filter : _runtime_filters) {
+            if (filter->get_ignored()) {
+                continue;
+            }
             if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
                 !has_in_filter.contains(filter->expr_order())) {
                 continue;
@@ -95,6 +101,9 @@ public:
     Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
         // process IN_OR_BLOOM_FILTER's real type
         for (auto* filter : _runtime_filters) {
+            if (filter->get_ignored()) {
+                continue;
+            }
             if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
                 get_real_size(filter, local_hash_table_size) > 
state->runtime_filter_max_in_num()) {
                 RETURN_IF_ERROR(filter->change_to_bloom_filter());
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 94dd96731c2..3b6c73dbef4 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -249,9 +249,6 @@ private:
 
     bool _need_local_merge = false;
 
-    // It is used to manage the lifecycle of RuntimeFilterMergeController
-    std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> 
_merge_controller_handlers;
-
     // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
     // of it in pipeline task not the fragment_context
 #ifdef __clang__
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fe7f0d13c2b..a23095e78bd 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1082,8 +1082,6 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
 
 Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
     UniqueId queryid = request->query_id();
-    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
-    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
 
     std::shared_ptr<QueryContext> query_ctx;
     {
@@ -1094,10 +1092,13 @@ Status FragmentMgr::send_filter_size(const 
PSendFilterSizeRequest* request) {
         if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
             query_ctx = q_ctx;
         } else {
-            return Status::InvalidArgument("Query context (query-id: {}) not 
found",
-                                           queryid.to_string());
+            return Status::EndOfFile("Query context (query-id: {}) not found, 
maybe finished",
+                                     queryid.to_string());
         }
     }
+
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
     auto merge_status = filter_controller->send_filter_size(request);
     return merge_status;
 }
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index c9812508446..0e5b37c8ffa 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -341,6 +341,7 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
             auto* pquery_id = closure->request_->mutable_query_id();
             pquery_id->set_hi(_state->query_id.hi());
             pquery_id->set_lo(_state->query_id.lo());
+            closure->cntl_->set_timeout_ms(std::min(3600, 
_state->execution_timeout) * 1000);
 
             closure->request_->set_filter_id(filter_id);
             closure->request_->set_filter_size(cnt_val->global_size);
@@ -453,6 +454,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             if (has_attachment) {
                 
closure->cntl_->request_attachment().append(request_attachment);
             }
+            closure->cntl_->set_timeout_ms(std::min(3600, 
_state->execution_timeout) * 1000);
             // set fragment-id
             for (auto& target_fragment_instance_id : 
target.target_fragment_instance_ids) {
                 PUniqueId* cur_id = 
closure->request_->add_fragment_instance_ids();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to