This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 846fe83152c [Chore](runtime-filter) add rpc error msg to RuntimeFilterContext (#43517) (#44622) (#44719) 846fe83152c is described below commit 846fe83152cf8610d742dbfae6d3b6df5819040f Author: Pxl <x...@selectdb.com> AuthorDate: Thu Nov 28 16:46:27 2024 +0800 [Chore](runtime-filter) add rpc error msg to RuntimeFilterContext (#43517) (#44622) (#44719) pick from #43517 --- be/src/exprs/runtime_filter.cpp | 37 ++++++++++++++--------- be/src/vec/runtime/shared_hash_table_controller.h | 1 + 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index b2817e291ca..d7bef524f01 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -22,7 +22,6 @@ #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/internal_service.pb.h> -#include <stddef.h> #include <algorithm> // IWYU pragma: no_include <bits/chrono.h> @@ -1038,23 +1037,34 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, std::shared_ptr<pipeline::Dependency> _dependency; RuntimeFilterContextSPtr _rf_context; std::string _rf_debug_info; + using Base = AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>; ENABLE_FACTORY_CREATOR(SyncSizeClosure); void _process_if_rpc_failed() override { - ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); - LOG(WARNING) << "sync filter size meet rpc error, filter=" << _rf_debug_info; + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; + auto ctx = _rf_context; + if (!ctx) { + return; + } + + ctx->err_msg = cntl_->ErrorText(); Base::_process_if_rpc_failed(); } void _process_if_meet_error_status(const Status& status) override { - ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; + auto ctx = _rf_context; + if (!ctx) { + return; + } + if (status.is<ErrorCode::END_OF_FILE>()) { // rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case. - _rf_context->ignored = true; + ctx->ignored = true; } else { - LOG(WARNING) << "sync filter size meet error status, filter=" << _rf_debug_info; + ctx->err_msg = status.to_string(); Base::_process_if_meet_error_status(status); } } @@ -1063,11 +1073,8 @@ public: SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req, std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback, std::shared_ptr<pipeline::Dependency> dependency, - RuntimeFilterContextSPtr rf_context, std::string_view rf_debug_info) - : Base(req, callback), - _dependency(std::move(dependency)), - _rf_context(rf_context), - _rf_debug_info(rf_debug_info) {} + RuntimeFilterContextSPtr rf_context) + : Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {} }; Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { @@ -1111,8 +1118,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared(); // IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use // a raw pointer in closure. Has to use the context's shared ptr. - auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, - _wrapper->_context, this->debug_string()); + auto closure = + SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); @@ -1576,9 +1583,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { std::string IRuntimeFilter::debug_string() const { return fmt::format( "RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, " - "build_bf_cardinality: {}", + "build_bf_cardinality: {}, error_msg: {}", _filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join, - _wrapper->get_build_bf_cardinality()); + _wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg); } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 4581bb762e8..aba441f282a 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -46,6 +46,7 @@ struct RuntimeFilterContext { std::shared_ptr<BloomFilterFuncBase> bloom_filter_func; std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func; bool ignored = false; + std::string err_msg; }; using RuntimeFilterContextSPtr = std::shared_ptr<RuntimeFilterContext>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org