This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 846fe83152c [Chore](runtime-filter) add rpc error msg to 
RuntimeFilterContext (#43517) (#44622) (#44719)
846fe83152c is described below

commit 846fe83152cf8610d742dbfae6d3b6df5819040f
Author: Pxl <x...@selectdb.com>
AuthorDate: Thu Nov 28 16:46:27 2024 +0800

    [Chore](runtime-filter) add rpc error msg to RuntimeFilterContext (#43517) 
(#44622) (#44719)
    
    pick from #43517
---
 be/src/exprs/runtime_filter.cpp                   | 37 ++++++++++++++---------
 be/src/vec/runtime/shared_hash_table_controller.h |  1 +
 2 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index b2817e291ca..d7bef524f01 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -22,7 +22,6 @@
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
-#include <stddef.h>
 
 #include <algorithm>
 // IWYU pragma: no_include <bits/chrono.h>
@@ -1038,23 +1037,34 @@ class SyncSizeClosure : public 
AutoReleaseClosure<PSendFilterSizeRequest,
     std::shared_ptr<pipeline::Dependency> _dependency;
     RuntimeFilterContextSPtr _rf_context;
     std::string _rf_debug_info;
+
     using Base =
             AutoReleaseClosure<PSendFilterSizeRequest, 
DummyBrpcCallback<PSendFilterSizeResponse>>;
     ENABLE_FACTORY_CREATOR(SyncSizeClosure);
 
     void _process_if_rpc_failed() override {
-        ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
-        LOG(WARNING) << "sync filter size meet rpc error, filter=" << 
_rf_debug_info;
+        Defer defer {[&]() { 
((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
+        auto ctx = _rf_context;
+        if (!ctx) {
+            return;
+        }
+
+        ctx->err_msg = cntl_->ErrorText();
         Base::_process_if_rpc_failed();
     }
 
     void _process_if_meet_error_status(const Status& status) override {
-        ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
+        Defer defer {[&]() { 
((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
+        auto ctx = _rf_context;
+        if (!ctx) {
+            return;
+        }
+
         if (status.is<ErrorCode::END_OF_FILE>()) {
             // rf merger backend may finished before rf's send_filter_size, we 
just ignore filter in this case.
-            _rf_context->ignored = true;
+            ctx->ignored = true;
         } else {
-            LOG(WARNING) << "sync filter size meet error status, filter=" << 
_rf_debug_info;
+            ctx->err_msg = status.to_string();
             Base::_process_if_meet_error_status(status);
         }
     }
@@ -1063,11 +1073,8 @@ public:
     SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
                     
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
                     std::shared_ptr<pipeline::Dependency> dependency,
-                    RuntimeFilterContextSPtr rf_context, std::string_view 
rf_debug_info)
-            : Base(req, callback),
-              _dependency(std::move(dependency)),
-              _rf_context(rf_context),
-              _rf_debug_info(rf_debug_info) {}
+                    RuntimeFilterContextSPtr rf_context)
+            : Base(req, callback), _dependency(std::move(dependency)), 
_rf_context(rf_context) {}
 };
 
 Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t 
local_filter_size) {
@@ -1111,8 +1118,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
     auto callback = 
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
     // IRuntimeFilter maybe deconstructed before the rpc finished, so that 
could not use
     // a raw pointer in closure. Has to use the context's shared ptr.
-    auto closure = SyncSizeClosure::create_unique(request, callback, 
_dependency,
-                                                  _wrapper->_context, 
this->debug_string());
+    auto closure =
+            SyncSizeClosure::create_unique(request, callback, _dependency, 
_wrapper->_context);
     auto* pquery_id = request->mutable_query_id();
     pquery_id->set_hi(_state->query_id.hi());
     pquery_id->set_lo(_state->query_id.lo());
@@ -1576,9 +1583,9 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
 std::string IRuntimeFilter::debug_string() const {
     return fmt::format(
             "RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, 
is_broadcast: {}, "
-            "build_bf_cardinality: {}",
+            "build_bf_cardinality: {}, error_msg: {}",
             _filter_id, to_string(_runtime_filter_type), _need_local_merge, 
_is_broadcast_join,
-            _wrapper->get_build_bf_cardinality());
+            _wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
 }
 
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index 4581bb762e8..aba441f282a 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -46,6 +46,7 @@ struct RuntimeFilterContext {
     std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
     std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
     bool ignored = false;
+    std::string err_msg;
 };
 
 using RuntimeFilterContextSPtr = std::shared_ptr<RuntimeFilterContext>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to