This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new cf28b101547 [Bug](runtime-filter) fix some rf error problems (#37273) cf28b101547 is described below commit cf28b1015473683313b895752a93dcd1e6bf729e Author: Pxl <pxl...@qq.com> AuthorDate: Thu Jul 4 19:31:50 2024 +0800 [Bug](runtime-filter) fix some rf error problems (#37273) ## Proposed changes 1. ignore rf when rf-mgr released 2. move acquire rf controller to after acquire query_ctx on send_filter_size 3. enlarge timeout limit on sync_filter_size/apply_filterv2 4. logout rf's debug string when rpc meet error --- be/src/exprs/runtime_filter.cpp | 17 +++++++++++++---- be/src/exprs/runtime_filter_slots.h | 9 +++++++++ be/src/pipeline/pipeline_fragment_context.h | 3 --- be/src/runtime/fragment_mgr.cpp | 9 +++++---- be/src/runtime/runtime_filter_mgr.cpp | 2 ++ 5 files changed, 29 insertions(+), 11 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 2dcfc97b096..e69ff714d32 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1033,25 +1033,34 @@ Status IRuntimeFilter::publish(bool publish_local) { class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>> { std::shared_ptr<pipeline::Dependency> _dependency; + IRuntimeFilter* _filter; using Base = AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>; ENABLE_FACTORY_CREATOR(SyncSizeClosure); void _process_if_rpc_failed() override { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); + LOG(WARNING) << "sync filter size meet rpc error, filter=" << _filter->debug_string(); Base::_process_if_rpc_failed(); } void _process_if_meet_error_status(const Status& status) override { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); - Base::_process_if_meet_error_status(status); + if (status.is<ErrorCode::END_OF_FILE>()) { + // rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case. + _filter->set_ignored(); + } else { + LOG(WARNING) << "sync filter size meet error status, filter=" + << _filter->debug_string(); + Base::_process_if_meet_error_status(status); + } } public: SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req, std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback, - std::shared_ptr<pipeline::Dependency> dependency) - : Base(req, callback), _dependency(std::move(dependency)) {} + std::shared_ptr<pipeline::Dependency> dependency, IRuntimeFilter* filter) + : Base(req, callback), _dependency(std::move(dependency)), _filter(filter) {} }; Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { @@ -1094,7 +1103,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt auto request = std::make_shared<PSendFilterSizeRequest>(); auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared(); - auto closure = SyncSizeClosure::create_unique(request, callback, _dependency); + auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, this); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 0bf8a33f9f2..ebda4b56fcc 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -71,6 +71,9 @@ public: // process ignore duplicate IN_FILTER std::unordered_set<int> has_in_filter; for (auto* filter : _runtime_filters) { + if (filter->get_ignored()) { + continue; + } if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) { continue; } @@ -83,6 +86,9 @@ public: // process ignore filter when it has IN_FILTER on same expr, and init bloom filter size for (auto* filter : _runtime_filters) { + if (filter->get_ignored()) { + continue; + } if (filter->get_real_type() == RuntimeFilterType::IN_FILTER || !has_in_filter.contains(filter->expr_order())) { continue; @@ -95,6 +101,9 @@ public: Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto* filter : _runtime_filters) { + if (filter->get_ignored()) { + continue; + } if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER && get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) { RETURN_IF_ERROR(filter->change_to_bloom_filter()); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 94dd96731c2..3b6c73dbef4 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -249,9 +249,6 @@ private: bool _need_local_merge = false; - // It is used to manage the lifecycle of RuntimeFilterMergeController - std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> _merge_controller_handlers; - // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both // of it in pipeline task not the fragment_context #ifdef __clang__ diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index fe7f0d13c2b..a23095e78bd 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1082,8 +1082,6 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { UniqueId queryid = request->query_id(); - std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; - RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); std::shared_ptr<QueryContext> query_ctx; { @@ -1094,10 +1092,13 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { query_ctx = q_ctx; } else { - return Status::InvalidArgument("Query context (query-id: {}) not found", - queryid.to_string()); + return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished", + queryid.to_string()); } } + + std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; + RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); auto merge_status = filter_controller->send_filter_size(request); return merge_status; } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index c9812508446..0e5b37c8ffa 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -341,6 +341,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz auto* pquery_id = closure->request_->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); + closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000); closure->request_->set_filter_id(filter_id); closure->request_->set_filter_size(cnt_val->global_size); @@ -453,6 +454,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ if (has_attachment) { closure->cntl_->request_attachment().append(request_attachment); } + closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000); // set fragment-id for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) { PUniqueId* cur_id = closure->request_->add_fragment_instance_ids(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org