This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 226dbd06e4b [Feature](runtime-filter) support rf on set operator: be part (#49679) 226dbd06e4b is described below commit 226dbd06e4b5153220d8864a407de263ddc426a0 Author: Pxl <x...@selectdb.com> AuthorDate: Thu Apr 3 15:22:07 2025 +0800 [Feature](runtime-filter) support rf on set operator: be part (#49679) ### What problem does this PR solve? support rf on set operator: be part ```sql create table tx ( k1 int null, k2 int not null, k3 bigint null, k4 varchar(100) null ) duplicate key (k1) properties("replication_num" = "1"); insert into tx select e1,e1,e1,e1 from (select 1 k1) as t lateral view explode_numbers(x) tmp1 as e1; select k1 from t1000 intersect select k1 from t10000000; 10.03 sec -> 0.13 sec select k1 from t1000 intersect select k1 from t1000000; 1.17 sec -> 0.13 sec select k1 from t10000 intersect select k1 from t100000; 0.23 sec -> 0.18 sec select k1 from t10000 intersect select * from( select k1 from t100000 where k1 > 10000)t; 0.15 sec -> 0.16 sec select k1 from t10000 intersect select * from( select k1 from t100000 where k1 < 10000)t; 0.13sec -> 0.18sec ``` part of https://github.com/apache/doris/pull/49573 ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [x] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/pipeline/exec/set_sink_operator.cpp | 76 ++++++++++++++++------ be/src/pipeline/exec/set_sink_operator.h | 21 ++++-- be/src/runtime/fragment_mgr.cpp | 4 +- .../runtime_filter_producer_helper_set.h | 59 +++++++++++++++++ 4 files changed, 134 insertions(+), 26 deletions(-) diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 4faeb975ef9..8d28fe1e369 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -26,6 +26,49 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" +uint64_t get_hash_table_size(const auto& hash_table_variant) { + uint64_t hash_table_size = 0; + std::visit( + [&](auto&& arg) { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { + hash_table_size = arg.hash_table->size(); + } + }, + hash_table_variant); + return hash_table_size; +} + +template <bool is_intersect> +Status SetSinkLocalState<is_intersect>::terminate(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + if (_terminated) { + return Status::OK(); + } + RETURN_IF_ERROR(_runtime_filter_producer_helper->terminate(state)); + return Base::terminate(state); +} + +template <bool is_intersect> +Status SetSinkLocalState<is_intersect>::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + + if (!_terminated && _runtime_filter_producer_helper && !state->is_cancelled()) { + try { + RETURN_IF_ERROR(_runtime_filter_producer_helper->process( + state, &_shared_state->build_block, + get_hash_table_size(_shared_state->hash_table_variants->method_variant))); + } catch (Exception& e) { + return Status::InternalError( + "rf process meet error: {}, _terminated: {}, _finish_dependency: {}", + e.to_string(), _terminated, _finish_dependency->debug_string()); + } + } + return Base::close(state, exec_status); +} + template <bool is_intersect> Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { @@ -57,23 +100,15 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo local_state._mutable_block.clear(); if (eos) { - if constexpr (is_intersect) { - valid_element_in_hash_tbl = 0; - } else { - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - valid_element_in_hash_tbl = arg.hash_table->size(); - } - }, - local_state._shared_state->hash_table_variants->method_variant); - } + uint64_t hash_table_size = get_hash_table_size( + local_state._shared_state->hash_table_variants->method_variant); + valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size; + local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1] ->set_ready(); - if (_child_quantity == 1) { - local_state._dependency->set_ready_to_read(); - } + RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size( + state, hash_table_size, local_state._finish_dependency)); + local_state._eos = true; } } return Status::OK(); @@ -113,16 +148,16 @@ template <bool is_intersect> Status SetSinkOperatorX<is_intersect>::_extract_build_column( SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block, vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) { - std::vector<int> result_locs(_child_exprs.size(), -1); + std::vector<int> result_locs(local_state._child_exprs.size(), -1); bool is_all_const = true; - for (size_t i = 0; i < _child_exprs.size(); ++i) { - RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_locs[i])); + for (size_t i = 0; i < local_state._child_exprs.size(); ++i) { + RETURN_IF_ERROR(local_state._child_exprs[i]->execute(&block, &result_locs[i])); is_all_const &= is_column_const(*block.get_by_position(result_locs[i]).column); } rows = is_all_const ? 1 : rows; - for (size_t i = 0; i < _child_exprs.size(); ++i) { + for (size_t i = 0; i < local_state._child_exprs.size(); ++i) { size_t result_col_id = result_locs[i]; if (is_all_const) { @@ -175,6 +210,9 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkState RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs)); + _runtime_filter_producer_helper = std::make_shared<RuntimeFilterProducerHelperSet>(profile()); + RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs, + parent._runtime_filter_descs)); return Status::OK(); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index b2795c23a5b..aadfc7ee6d2 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -17,10 +17,8 @@ #pragma once -#include <stdint.h> - -#include "olap/olap_common.h" #include "operator.h" +#include "runtime_filter/runtime_filter_producer_helper_set.h" namespace doris { #include "common/compile_check_begin.h" @@ -46,6 +44,8 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; + Status terminate(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; private: friend class SetSinkOperatorX<is_intersect>; @@ -57,6 +57,9 @@ private: RuntimeProfile::Counter* _merge_block_timer = nullptr; RuntimeProfile::Counter* _build_timer = nullptr; + + std::shared_ptr<RuntimeFilterProducerHelperSet> _runtime_filter_producer_helper; + std::shared_ptr<CountedFinishDependency> _finish_dependency; }; template <bool is_intersect> @@ -71,14 +74,17 @@ public: SetSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : Base(sink_id, tnode.node_id, dest_id), - _cur_child_id(child_id), _child_quantity(tnode.node_type == TPlanNodeType::type::INTERSECT_NODE ? tnode.intersect_node.result_expr_lists.size() : tnode.except_node.result_expr_lists.size()), _is_colocate(is_intersect ? tnode.intersect_node.is_colocate : tnode.except_node.is_colocate), _partition_exprs(is_intersect ? tnode.intersect_node.result_expr_lists[child_id] - : tnode.except_node.result_expr_lists[child_id]) {} + : tnode.except_node.result_expr_lists[child_id]), + _runtime_filter_descs(tnode.runtime_filters) { + DCHECK_EQ(child_id, _cur_child_id); + DCHECK_GT(_child_quantity, 1); + } #ifdef BE_TEST SetSinkOperatorX(int _child_quantity) @@ -87,6 +93,7 @@ public: _is_colocate(false), _partition_exprs() {} #endif + ~SetSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TDataSink", @@ -115,13 +122,15 @@ private: vectorized::Block& block, vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows); - const int _cur_child_id; + const int _cur_child_id = 0; const size_t _child_quantity; // every child has its result expr list vectorized::VExprContextSPtrs _child_exprs; const bool _is_colocate; const std::vector<TExpr> _partition_exprs; using OperatorBase::_child; + + const std::vector<TRuntimeFilterDesc> _runtime_filter_descs; }; #include "common/compile_check_end.h" diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3d81bee17d5..f7980e41c91 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1343,7 +1343,9 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, query_id.__set_lo(queryid.lo); if (auto q_ctx = get_query_ctx(query_id)) { SCOPED_ATTACH_TASK(q_ctx.get()); - std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; + if (!q_ctx->get_merge_controller_handler()) { + return Status::InternalError("Merge filter failed: Merge controller handler is null"); + } return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data); } else { return Status::EndOfFile( diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h b/be/src/runtime_filter/runtime_filter_producer_helper_set.h new file mode 100644 index 00000000000..2e4e5bfe86a --- /dev/null +++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "pipeline/pipeline_task.h" +#include "runtime/runtime_state.h" +#include "runtime_filter/runtime_filter.h" +#include "runtime_filter/runtime_filter_mgr.h" +#include "runtime_filter/runtime_filter_producer_helper.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { +#include "common/compile_check_begin.h" +// this class used in set sink node +class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper { +public: + ~RuntimeFilterProducerHelperSet() override = default; + + RuntimeFilterProducerHelperSet(RuntimeProfile* profile) + : RuntimeFilterProducerHelper(profile, true, false) {} + + Status process(RuntimeState* state, const vectorized::Block* block, uint64_t cardinality) { + if (_skip_runtime_filters_process) { + return Status::OK(); + } + + RETURN_IF_ERROR(_init_filters(state, cardinality)); + if (cardinality != 0) { + RETURN_IF_ERROR(_insert(block, 0)); + } + + for (const auto& filter : _producers) { + filter->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); + } + + RETURN_IF_ERROR(_publish(state)); + return Status::OK(); + } +}; +#include "common/compile_check_end.h" +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org