This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refactor_rf by this push: new 9fb4944c72e [refactor](runtime filter) Refine consumer (#48198) 9fb4944c72e is described below commit 9fb4944c72e6664ee7441497f5ea10682e5aa293 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Fri Feb 21 23:39:13 2025 +0800 [refactor](runtime filter) Refine consumer (#48198) --- .../exec/multi_cast_data_stream_source.cpp | 4 +- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/pipeline/exec/scan_operator.h | 4 +- be/src/runtime/fragment_mgr.cpp | 141 ++++------- be/src/runtime/fragment_mgr.h | 2 - be/src/runtime/query_context.h | 3 + be/src/runtime/runtime_state.cpp | 4 +- be/src/runtime_filter/role/consumer.cpp | 152 ++++++++++- be/src/runtime_filter/role/consumer.h | 41 ++- be/src/runtime_filter/role/merger.h | 7 +- be/src/runtime_filter/role/producer.cpp | 23 +- be/src/runtime_filter/role/producer.h | 5 +- be/src/runtime_filter/role/runtime_filter.cpp | 2 +- be/src/runtime_filter/role/runtime_filter.h | 16 +- be/src/runtime_filter/runtime_filter_helper.cpp | 26 +- be/src/runtime_filter/runtime_filter_helper.h | 28 +-- be/src/runtime_filter/runtime_filter_mgr.cpp | 37 +-- be/src/runtime_filter/runtime_filter_mgr.h | 89 +------ be/src/runtime_filter/runtime_filter_slots.cpp | 18 -- be/src/runtime_filter/runtime_filter_wrapper.cpp | 280 ++++++--------------- be/src/runtime_filter/runtime_filter_wrapper.h | 132 ++++------ be/src/vec/exec/scan/vscanner.cpp | 3 +- be/src/vec/exprs/vbitmap_predicate.cpp | 2 +- be/src/vec/exprs/vbitmap_predicate.h | 2 +- be/src/vec/exprs/vbloom_predicate.cpp | 3 +- be/src/vec/exprs/vbloom_predicate.h | 2 +- gensrc/proto/internal_service.proto | 2 +- gensrc/thrift/PlanNodes.thrift | 2 +- 28 files changed, 409 insertions(+), 623 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 796eb6345e1..ae35a1aebf9 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -29,7 +29,7 @@ MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime OperatorXBase* parent) : Base(state, parent), _helper(static_cast<Parent*>(parent)->dest_id_from_sink(), parent->runtime_filter_descs(), - static_cast<Parent*>(parent)->_multi_cast_output_row_descriptor, _conjuncts) {} + static_cast<Parent*>(parent)->_multi_cast_output_row_descriptor) {} Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); @@ -52,7 +52,7 @@ Status MultiCastDataStreamSourceLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); - RETURN_IF_ERROR(_helper.acquire_runtime_filter()); + RETURN_IF_ERROR(_helper.acquire_runtime_filter(_conjuncts)); auto& p = _parent->cast<Parent>(); _output_expr_contexts.resize(p._output_expr_contexts.size()); for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 3b0a276fcfe..535db749d9d 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -97,7 +97,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) { RETURN_IF_ERROR( p._common_expr_ctxs_push_down[i]->clone(state, _common_expr_ctxs_push_down[i])); } - RETURN_IF_ERROR(_helper.acquire_runtime_filter()); + RETURN_IF_ERROR(_helper.acquire_runtime_filter(_conjuncts)); _stale_expr_ctxs.resize(p._stale_expr_ctxs.size()); for (size_t i = 0; i < _stale_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._stale_expr_ctxs[i]->clone(state, _stale_expr_ctxs[i])); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 0fc866679c0..6cd3da930de 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -66,8 +66,8 @@ class ScanLocalStateBase : public PipelineXLocalState<> { public: ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState<>(state, parent), - _helper(parent->node_id(), parent->runtime_filter_descs(), parent->row_descriptor(), - _conjuncts) {} + _helper(parent->node_id(), parent->runtime_filter_descs(), parent->row_descriptor()) { + } ~ScanLocalStateBase() override = default; [[nodiscard]] virtual bool should_run_serial() const = 0; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9dc0febc892..b2780938d38 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -860,11 +860,12 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed", { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); }); - std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; - RETURN_IF_ERROR(_runtimefilter_controller.add_entity( - params.local_params[0], params.query_id, params.query_options, &handler, - RuntimeFilterParamsContext::create(context->get_runtime_state()))); - if (handler) { + if (params.local_params[0].__isset.runtime_filter_params && + params.local_params[0].runtime_filter_params.rid_to_runtime_filter.size() > 0) { + auto handler = std::make_shared<RuntimeFilterMergeControllerEntity>( + RuntimeFilterParamsContext::create(context->get_runtime_state())); + RETURN_IF_ERROR(handler->init(params.query_id, params.local_params[0].runtime_filter_params, + params.query_options)); query_ctx->set_merge_controller_handler(handler); } @@ -1285,109 +1286,75 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, butil::IOBufAsZeroCopyInputStream* attach_data) { - std::shared_ptr<pipeline::PipelineFragmentContext> pip_context; - - RuntimeFilterMgr* runtime_filter_mgr = nullptr; - - const auto& fragment_ids = request->fragment_ids(); - { - for (auto fragment_id : fragment_ids) { - pip_context = - _pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id}); - if (pip_context == nullptr) { - continue; - } - - DCHECK(pip_context != nullptr); - runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); - break; + UniqueId queryid = request->query_id(); + TUniqueId query_id; + query_id.__set_hi(queryid.hi); + query_id.__set_lo(queryid.lo); + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + RuntimeFilterMgr* runtime_filter_mgr = q_ctx->runtime_filter_mgr(); + DCHECK(runtime_filter_mgr != nullptr); + + // 1. get the target filters + std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters = + runtime_filter_mgr->get_consume_filters(request->filter_id()); + + // 2. create the filter wrapper to replace or ignore the target filters + if (!filters.empty()) { + RETURN_IF_ERROR(filters[0]->assign(*request, attach_data)); + std::ranges::for_each(filters, [&](auto& filter) { filter->signal(filters[0].get()); }); } - } - - if (runtime_filter_mgr == nullptr) { + } else { // all instance finished - return Status::OK(); - } - - SCOPED_ATTACH_TASK(pip_context->get_query_ctx()); - // 1. get the target filters - std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters = - runtime_filter_mgr->get_consume_filters(request->filter_id()); - - // 2. create the filter wrapper to replace or ignore the target filters - if (!filters.empty()) { - RETURN_IF_ERROR(filters[0]->assign_data_into_wrapper(*request, attach_data)); - - std::ranges::for_each(filters, [&](auto& filter) { filter->signal(filters[0].get()); }); } - return Status::OK(); } Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { UniqueId queryid = request->query_id(); - - std::shared_ptr<QueryContext> query_ctx; - { - TUniqueId query_id; - query_id.__set_hi(queryid.hi); - query_id.__set_lo(queryid.lo); - if (auto q_ctx = get_query_ctx(query_id)) { - query_ctx = q_ctx; - } else { - return Status::EndOfFile( - "Send filter size failed: Query context (query-id: {}) not found, maybe " - "finished", - queryid.to_string()); - } + TUniqueId query_id; + query_id.__set_hi(queryid.hi); + query_id.__set_lo(queryid.lo); + if (auto q_ctx = get_query_ctx(query_id)) { + return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request); + } else { + return Status::EndOfFile( + "Send filter size failed: Query context (query-id: {}) not found, maybe " + "finished", + queryid.to_string()); } - - std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; - RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); - auto merge_status = filter_controller->send_filter_size(query_ctx, request); - return merge_status; } Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { UniqueId queryid = request->query_id(); - std::shared_ptr<QueryContext> query_ctx; - { - TUniqueId query_id; - query_id.__set_hi(queryid.hi); - query_id.__set_lo(queryid.lo); - if (auto q_ctx = get_query_ctx(query_id)) { - query_ctx = q_ctx; - } else { - return Status::EndOfFile( - "Sync filter size failed: Query context (query-id: {}) already finished", - queryid.to_string()); - } + TUniqueId query_id; + query_id.__set_hi(queryid.hi); + query_id.__set_lo(queryid.lo); + if (auto q_ctx = get_query_ctx(query_id)) { + return q_ctx->runtime_filter_mgr()->sync_filter_size(request); + } else { + return Status::EndOfFile( + "Sync filter size failed: Query context (query-id: {}) already finished", + queryid.to_string()); } - return query_ctx->runtime_filter_mgr()->sync_filter_size(request); } Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data) { UniqueId queryid = request->query_id(); - std::shared_ptr<QueryContext> query_ctx; - { - TUniqueId query_id; - query_id.__set_hi(queryid.hi); - query_id.__set_lo(queryid.lo); - if (auto q_ctx = get_query_ctx(query_id)) { - query_ctx = q_ctx; - } else { - return Status::EndOfFile( - "Merge filter size failed: Query context (query-id: {}) already finished", - queryid.to_string()); - } + TUniqueId query_id; + query_id.__set_hi(queryid.hi); + query_id.__set_lo(queryid.lo); + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; + return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data); + } else { + return Status::EndOfFile( + "Merge filter size failed: Query context (query-id: {}) already finished", + queryid.to_string()); } - SCOPED_ATTACH_TASK(query_ctx.get()); - std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; - RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); - auto merge_status = filter_controller->merge(query_ctx, request, attach_data); - return merge_status; } void FragmentMgr::get_runtime_query_info( diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 8aaa572c041..60b13348ad4 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -218,8 +218,6 @@ private: std::shared_ptr<MetricEntity> _entity; UIntGauge* timeout_canceled_fragment_count = nullptr; - - RuntimeFilterMergeController _runtimefilter_controller; }; uint64_t get_fragment_executing_count(); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 7056c8bd7c5..abe6b0208af 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -259,6 +259,9 @@ public: std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { _merge_controller_handler = handler; } + std::shared_ptr<RuntimeFilterMergeControllerEntity> get_merge_controller_handler() const { + return _merge_controller_handler; + } bool is_nereids() const { return _is_nereids; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 012113114f1..7d7d1ee2b3b 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -496,8 +496,8 @@ Status RuntimeState::register_producer_runtime_filter( // When RF is published, consumers in both global and local RF mgr will be found. RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter( desc, query_options(), producer_filter, parent_profile)); - RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_filter(desc, query_options(), - *producer_filter)); + RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter( + desc, query_options(), *producer_filter)); return Status::OK(); } diff --git a/be/src/runtime_filter/role/consumer.cpp b/be/src/runtime_filter/role/consumer.cpp index 97f07702b79..d150dd41d1c 100644 --- a/be/src/runtime_filter/role/consumer.cpp +++ b/be/src/runtime_filter/role/consumer.cpp @@ -17,10 +17,15 @@ #include "runtime_filter/role/consumer.h" +#include "exprs/create_predicate_function.h" +#include "vec/exprs/vbitmap_predicate.h" +#include "vec/exprs/vbloom_predicate.h" +#include "vec/exprs/vdirect_in_predicate.h" +#include "vec/exprs/vexpr_context.h" + namespace doris { Status RuntimeFilterConsumer::_apply_ready_expr( - std::list<vectorized::VExprContextSPtr>& probe_ctxs, std::vector<vectorized::VRuntimeFilterPtr>& push_exprs) { _check_state({State::READY}); _set_state(State::APPLIED); @@ -32,12 +37,12 @@ Status RuntimeFilterConsumer::_apply_ready_expr( } auto origin_size = push_exprs.size(); - RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); + RETURN_IF_ERROR(_get_push_exprs(push_exprs, _probe_expr)); // The runtime filter is pushed down, adding filtering information. auto* expr_filtered_rows_counter = - ADD_COUNTER(_excution_profile, "ExprFilteredRows", TUnit::UNIT); - auto* expr_input_rows_counter = ADD_COUNTER(_excution_profile, "ExprInputRows", TUnit::UNIT); - auto* always_true_counter = ADD_COUNTER(_excution_profile, "AlwaysTruePassRows", TUnit::UNIT); + ADD_COUNTER(_execution_profile, "ExprFilteredRows", TUnit::UNIT); + auto* expr_input_rows_counter = ADD_COUNTER(_execution_profile, "ExprInputRows", TUnit::UNIT); + auto* always_true_counter = ADD_COUNTER(_execution_profile, "AlwaysTruePassRows", TUnit::UNIT); for (auto i = origin_size; i < push_exprs.size(); i++) { push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter, always_true_counter); @@ -45,10 +50,9 @@ Status RuntimeFilterConsumer::_apply_ready_expr( return Status::OK(); } -Status RuntimeFilterConsumer::acquire_expr(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VRuntimeFilterPtr>& push_exprs) { +Status RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& push_exprs) { if (_rf_state == State::READY) { - RETURN_IF_ERROR(_apply_ready_expr(probe_ctxs, push_exprs)); + RETURN_IF_ERROR(_apply_ready_expr(push_exprs)); } if (_rf_state != State::APPLIED && _rf_state != State::TIMEOUT) { _check_state({State::NOT_READY}); @@ -73,11 +77,139 @@ void RuntimeFilterConsumer::signal(RuntimeFilter* other) { } std::shared_ptr<pipeline::RuntimeFilterTimer> RuntimeFilterConsumer::create_filter_timer( - std::shared_ptr<pipeline::RuntimeFilterDependency> dependencie) { + std::shared_ptr<pipeline::RuntimeFilterDependency> dependencies) { auto timer = std::make_shared<pipeline::RuntimeFilterTimer>(_registration_time, - _rf_wait_time_ms, dependencie); + _rf_wait_time_ms, dependencies); _filter_timer.push_back(timer); return timer; } +Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFilterPtr>& container, + const TExpr& probe_expr) { + // TODO: `VExprContextSPtr` is not need, we should just create an expr. + vectorized::VExprContextSPtr probe_ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, probe_ctx)); + + auto real_filter_type = _wrapper->get_real_type(); + bool null_aware = _wrapper->contain_null(); + switch (real_filter_type) { + case RuntimeFilterType::IN_FILTER: { + TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); + type_desc.__set_is_nullable(false); + TExprNode node; + node.__set_type(type_desc); + node.__set_node_type(null_aware ? TExprNodeType::NULL_AWARE_IN_PRED + : TExprNodeType::IN_PRED); + node.in_predicate.__set_is_not_in(false); + node.__set_opcode(TExprOpcode::FILTER_IN); + node.__set_is_nullable(false); + auto in_pred = vectorized::VDirectInPredicate::create_shared(node, _wrapper->hybrid_set()); + in_pred->add_child(probe_ctx->root()); + auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared( + node, in_pred, get_in_list_ignore_thredhold(_wrapper->hybrid_set()->size()), + null_aware); + container.push_back(wrapper); + break; + } + case RuntimeFilterType::MIN_FILTER: { + // create min filter + vectorized::VExprSPtr min_pred; + TExprNode min_pred_node; + RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::GE, min_pred, + &min_pred_node, null_aware)); + vectorized::VExprSPtr min_literal; + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), + _wrapper->minmax_func()->get_min(), min_literal)); + min_pred->add_child(probe_ctx->root()); + min_pred->add_child(min_literal); + container.push_back(vectorized::VRuntimeFilterWrapper::create_shared( + min_pred_node, min_pred, get_comparison_ignore_thredhold())); + break; + } + case RuntimeFilterType::MAX_FILTER: { + vectorized::VExprSPtr max_pred; + // create max filter + TExprNode max_pred_node; + RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred, + &max_pred_node, null_aware)); + vectorized::VExprSPtr max_literal; + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), + _wrapper->minmax_func()->get_max(), max_literal)); + max_pred->add_child(probe_ctx->root()); + max_pred->add_child(max_literal); + container.push_back(vectorized::VRuntimeFilterWrapper::create_shared( + max_pred_node, max_pred, get_comparison_ignore_thredhold())); + break; + } + case RuntimeFilterType::MINMAX_FILTER: { + vectorized::VExprSPtr max_pred; + // create max filter + TExprNode max_pred_node; + RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred, + &max_pred_node, null_aware)); + vectorized::VExprSPtr max_literal; + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), + _wrapper->minmax_func()->get_max(), max_literal)); + max_pred->add_child(probe_ctx->root()); + max_pred->add_child(max_literal); + container.push_back(vectorized::VRuntimeFilterWrapper::create_shared( + max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware)); + + vectorized::VExprContextSPtr new_probe_ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx)); + + // create min filter + vectorized::VExprSPtr min_pred; + TExprNode min_pred_node; + RETURN_IF_ERROR(create_vbin_predicate(new_probe_ctx->root()->type(), TExprOpcode::GE, + min_pred, &min_pred_node, null_aware)); + vectorized::VExprSPtr min_literal; + RETURN_IF_ERROR(create_literal(new_probe_ctx->root()->type(), + _wrapper->minmax_func()->get_min(), min_literal)); + min_pred->add_child(new_probe_ctx->root()); + min_pred->add_child(min_literal); + container.push_back(vectorized::VRuntimeFilterWrapper::create_shared( + min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware)); + break; + } + case RuntimeFilterType::BLOOM_FILTER: { + // create a bloom filter + TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); + type_desc.__set_is_nullable(false); + TExprNode node; + node.__set_type(type_desc); + node.__set_node_type(TExprNodeType::BLOOM_PRED); + node.__set_opcode(TExprOpcode::RT_FILTER); + node.__set_is_nullable(false); + auto bloom_pred = vectorized::VBloomPredicate::create_shared(node); + bloom_pred->set_filter(_wrapper->bloom_filter_func()); + bloom_pred->add_child(probe_ctx->root()); + auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared( + node, bloom_pred, get_bloom_filter_ignore_thredhold()); + container.push_back(wrapper); + break; + } + case RuntimeFilterType::BITMAP_FILTER: { + // create a bitmap filter + TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); + type_desc.__set_is_nullable(false); + TExprNode node; + node.__set_type(type_desc); + node.__set_node_type(TExprNodeType::BITMAP_PRED); + node.__set_opcode(TExprOpcode::RT_FILTER); + node.__set_is_nullable(false); + auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node); + bitmap_pred->set_filter(_wrapper->bitmap_filter_func()); + bitmap_pred->add_child(probe_ctx->root()); + auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, bitmap_pred, 0); + container.push_back(wrapper); + break; + } + default: + DCHECK(false); + break; + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime_filter/role/consumer.h b/be/src/runtime_filter/role/consumer.h index 8b5a490baf1..c2926864c39 100644 --- a/be/src/runtime_filter/role/consumer.h +++ b/be/src/runtime_filter/role/consumer.h @@ -26,6 +26,12 @@ namespace doris { class RuntimeFilterConsumer : public RuntimeFilter { public: + enum class State { + NOT_READY, + READY, + TIMEOUT, + APPLIED, + }; static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* res, RuntimeProfile* parent_profile) { @@ -36,28 +42,20 @@ public: return Status::OK(); } - int node_id() const { return _node_id; } - + // Published by producer. void signal(RuntimeFilter* other); std::shared_ptr<pipeline::RuntimeFilterTimer> create_filter_timer( - std::shared_ptr<pipeline::RuntimeFilterDependency> dependencie); + std::shared_ptr<pipeline::RuntimeFilterDependency> dependencies); - Status acquire_expr(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VRuntimeFilterPtr>& push_exprs); + // Called after `State` is ready (e.g. signaled) + Status acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& push_exprs); std::string debug_string() const override { return fmt::format("Consumer: ({}, state: {})", _debug_string(), to_string(_rf_state)); } - bool applied() { return _rf_state == State::APPLIED; } - - enum class State { - NOT_READY, - READY, - TIMEOUT, - APPLIED, - }; + bool is_applied() { return _rf_state == State::APPLIED; } static std::string to_string(const State& state) { switch (state) { @@ -78,11 +76,10 @@ private: RuntimeFilterConsumer(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, int node_id, RuntimeProfile* parent_profile) : RuntimeFilter(state, desc), - _node_id(node_id), _probe_expr(desc->planId_to_target_expr.find(node_id)->second), _profile(new RuntimeProfile(fmt::format("RF{}", desc->filter_id))), _storage_profile(new RuntimeProfile(fmt::format("Storage", desc->filter_id))), - _excution_profile(new RuntimeProfile(fmt::format("Execution", desc->filter_id))), + _execution_profile(new RuntimeProfile(fmt::format("Execution", desc->filter_id))), _registration_time(MonotonicMillis()), _rf_state(State::NOT_READY) { // If bitmap filter is not applied, it will cause the query result to be incorrect @@ -93,13 +90,13 @@ private: parent_profile->add_child(_profile.get(), true, nullptr); _profile->add_child(_storage_profile.get(), true, nullptr); - _profile->add_child(_excution_profile.get(), true, nullptr); + _profile->add_child(_execution_profile.get(), true, nullptr); _wait_timer = ADD_TIMER(_profile, "WaitTime"); } - Status _apply_ready_expr(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VRuntimeFilterPtr>& push_exprs); - + Status _apply_ready_expr(std::vector<vectorized::VRuntimeFilterPtr>& push_exprs); + Status _get_push_exprs(std::vector<vectorized::VRuntimeFilterPtr>& container, + const TExpr& probe_expr); void _check_state(std::vector<State> assumed_states) { if (!check_state_impl<RuntimeFilterConsumer>(_rf_state, assumed_states)) { throw Exception(ErrorCode::INTERNAL_ERROR, @@ -113,15 +110,13 @@ private: _profile->add_info_string("Info", debug_string()); } - int _node_id; - TExpr _probe_expr; std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer; std::unique_ptr<RuntimeProfile> _profile; - std::unique_ptr<RuntimeProfile> _storage_profile; // for storage layer stats - std::unique_ptr<RuntimeProfile> _excution_profile; // for execution layer stats + std::unique_ptr<RuntimeProfile> _storage_profile; // for storage layer stats + std::unique_ptr<RuntimeProfile> _execution_profile; // for execution layer stats RuntimeProfile::Counter* _wait_timer = nullptr; int32_t _rf_wait_time_ms; diff --git a/be/src/runtime_filter/role/merger.h b/be/src/runtime_filter/role/merger.h index 627f6760afc..32308b5feda 100644 --- a/be/src/runtime_filter/role/merger.h +++ b/be/src/runtime_filter/role/merger.h @@ -60,8 +60,9 @@ public: bool add_rf_size(uint64_t size) { _received_rf_size_num++; + _received_sum_size += size; DCHECK_GE(_expected_producer_num, _received_rf_size_num) << debug_string(); - return (_received_rf_size_num == _expected_producer_num); + return _received_rf_size_num == _expected_producer_num; } uint64_t get_received_sum_size() const { return _received_sum_size; } @@ -90,10 +91,10 @@ private: std::atomic<State> _rf_state; int _expected_producer_num = 0; - std::atomic_int _received_producer_num = 0; + int _received_producer_num = 0; uint64_t _received_sum_size = 0; - std::atomic_int _received_rf_size_num = 0; + int _received_rf_size_num = 0; friend class RuntimeFilterProducer; }; diff --git a/be/src/runtime_filter/role/producer.cpp b/be/src/runtime_filter/role/producer.cpp index d1c2286b41c..1cb44cb0501 100644 --- a/be/src/runtime_filter/role/producer.cpp +++ b/be/src/runtime_filter/role/producer.cpp @@ -147,7 +147,7 @@ Status RuntimeFilterProducer::send_size( // two case we need do local merge: // 1. has remote target // 2. has local target and has global consumer (means target scan has local shuffle) - if (_has_remote_target || + if (!_has_local_target || !_state->global_runtime_filter_mgr()->get_consume_filters(_wrapper->filter_id()).empty()) { LocalMergeContext* local_merge_filters = nullptr; RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( @@ -224,26 +224,7 @@ void RuntimeFilterProducer::set_synced_size(uint64_t global_size) { } Status RuntimeFilterProducer::init(size_t local_size) { - size_t real_size = _synced_size != -1 ? _synced_size : local_size; - if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && - real_size > _wrapper->max_in_num()) { - RETURN_IF_ERROR(_wrapper->change_to_bloom_filter()); - } - - if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) { - RETURN_IF_ERROR(_wrapper->init_bloom_filter(real_size)); - } - if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER && - real_size > _wrapper->max_in_num()) { - set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED, - "reach max in num"); - } - if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER && !_callback.empty()) { - for (auto& call : _callback) { - call(); - } - } - return Status::OK(); + return _wrapper->init(_synced_size != -1 ? _synced_size : local_size); } } // namespace doris diff --git a/be/src/runtime_filter/role/producer.h b/be/src/runtime_filter/role/producer.h index ae1b3708457..b71ac1e86dd 100644 --- a/be/src/runtime_filter/role/producer.h +++ b/be/src/runtime_filter/role/producer.h @@ -29,7 +29,6 @@ namespace doris { */ class RuntimeFilterProducer : public RuntimeFilter { public: - using Callback = std::function<void()>; enum class State { WAITING_FOR_SEND_SIZE = 0, WAITING_FOR_SYNCED_SIZE = 1, @@ -61,7 +60,7 @@ public: return; } _check_state({State::WAITING_FOR_DATA}); - _wrapper->insert_batch(column, start); + _wrapper->insert(column, start); } Status publish(RuntimeState* state, bool build_hash_table); std::string debug_string() const override { @@ -70,7 +69,6 @@ public: _dependency ? _dependency->debug_string() : "none", _synced_size); } - void with_callback(Callback& callback) { _callback.emplace_back(callback); } int expr_order() const { return _expr_order; } void set_synced_size(uint64_t global_size); void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State state, @@ -145,7 +143,6 @@ private: std::atomic<State> _rf_state; std::unique_ptr<RuntimeProfile> _profile; - std::vector<Callback> _callback; }; } // namespace doris diff --git a/be/src/runtime_filter/role/runtime_filter.cpp b/be/src/runtime_filter/role/runtime_filter.cpp index 6289afd5d76..f5b902df7e2 100644 --- a/be/src/runtime_filter/role/runtime_filter.cpp +++ b/be/src/runtime_filter/role/runtime_filter.cpp @@ -103,7 +103,7 @@ Status RuntimeFilter::_init_with_desc(const TRuntimeFilterDesc* desc, params.enable_fixed_len_to_uint32_v2 = options->__isset.enable_fixed_len_to_uint32_v2 && options->enable_fixed_len_to_uint32_v2; if (_runtime_filter_type == RuntimeFilterType::BITMAP_FILTER) { - if (_has_remote_target) { + if (!_has_local_target) { return Status::InternalError("bitmap filter do not support remote target"); } if (!build_ctx->root()->type().is_bitmap_type()) { diff --git a/be/src/runtime_filter/role/runtime_filter.h b/be/src/runtime_filter/role/runtime_filter.h index 41623fbc74d..f4cb1f10301 100644 --- a/be/src/runtime_filter/role/runtime_filter.h +++ b/be/src/runtime_filter/role/runtime_filter.h @@ -44,8 +44,8 @@ public: bool has_local_target() const { return _has_local_target; } template <class T> - Status assign_data_into_wrapper(const T& request, butil::IOBufAsZeroCopyInputStream* data) { - return _wrapper->assign_data(request, data); + Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) { + return _wrapper->assign(request, data); } template <class T> @@ -68,10 +68,8 @@ public: auto in_filter = request->mutable_in_filter(); _to_protobuf(in_filter); } else if (real_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) { - _wrapper->get_bloom_filter_desc((char**)data, len); DCHECK(data != nullptr); - request->mutable_bloom_filter()->set_filter_length(*len); - request->mutable_bloom_filter()->set_always_true(false); + _to_protobuf(request->mutable_bloom_filter(), (char**)data, len); } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER || real_runtime_filter_type == RuntimeFilterType::MIN_FILTER || real_runtime_filter_type == RuntimeFilterType::MAX_FILTER) { @@ -88,10 +86,9 @@ public: protected: RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc) : _state(state), - _has_remote_target(desc->has_remote_targets), _has_local_target(desc->has_local_targets), _runtime_filter_type(get_runtime_filter_type(desc)) { - DCHECK_NE(_has_remote_target, _has_local_target); + DCHECK_NE(desc->has_remote_targets, _has_local_target); } Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options); @@ -100,6 +97,9 @@ protected: void _to_protobuf(T* filter) { _wrapper->_to_protobuf(filter); } + void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length) { + _wrapper->_to_protobuf(filter, data, filter_length); + } Status _push_to_remote(RuntimeState* state, const TNetworkAddress* addr); @@ -118,8 +118,6 @@ protected: // _wrapper is a runtime filter function wrapper std::shared_ptr<RuntimeFilterWrapper> _wrapper; - // will apply to remote node - bool _has_remote_target; // will apply to local node bool _has_local_target; diff --git a/be/src/runtime_filter/runtime_filter_helper.cpp b/be/src/runtime_filter/runtime_filter_helper.cpp index c2851ba7e71..85c718c74f6 100644 --- a/be/src/runtime_filter/runtime_filter_helper.cpp +++ b/be/src/runtime_filter/runtime_filter_helper.cpp @@ -24,12 +24,10 @@ namespace doris::pipeline { RuntimeFilterHelper::RuntimeFilterHelper(const int32_t _node_id, const std::vector<TRuntimeFilterDesc>& runtime_filters, - const RowDescriptor& row_descriptor, - vectorized::VExprContextSPtrs& conjuncts) + const RowDescriptor& row_descriptor) : _node_id(_node_id), _runtime_filter_descs(runtime_filters), _row_descriptor_ref(row_descriptor), - _conjuncts_ref(conjuncts), _profile(new RuntimeProfile("RuntimeFilterHelper")) { _blocked_by_rf = std::make_shared<std::atomic_bool>(false); } @@ -85,22 +83,23 @@ void RuntimeFilterHelper::init_runtime_filter_dependency( } } -Status RuntimeFilterHelper::acquire_runtime_filter() { +Status RuntimeFilterHelper::acquire_runtime_filter(vectorized::VExprContextSPtrs& conjuncts) { SCOPED_TIMER(_acquire_runtime_filter_timer); std::vector<vectorized::VRuntimeFilterPtr> vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { - RETURN_IF_ERROR(_consumers[i]->acquire_expr(_probe_ctxs, vexprs)); + RETURN_IF_ERROR(_consumers[i]->acquire_expr(vexprs)); - if (!_consumers[i]->applied()) { + if (!_consumers[i]->is_applied()) { _is_all_rf_applied = false; } } - RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs)); + RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs, conjuncts)); return Status::OK(); } Status RuntimeFilterHelper::_append_rf_into_conjuncts( - const std::vector<vectorized::VRuntimeFilterPtr>& vexprs) { + const std::vector<vectorized::VRuntimeFilterPtr>& vexprs, + vectorized::VExprContextSPtrs& conjuncts) { if (vexprs.empty()) { return Status::OK(); } @@ -109,13 +108,14 @@ Status RuntimeFilterHelper::_append_rf_into_conjuncts( vectorized::VExprContextSPtr conjunct = vectorized::VExprContext::create_shared(expr); RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref)); RETURN_IF_ERROR(conjunct->open(_state)); - _conjuncts_ref.emplace_back(conjunct); + conjuncts.emplace_back(conjunct); } return Status::OK(); } -Status RuntimeFilterHelper::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { +Status RuntimeFilterHelper::try_append_late_arrival_runtime_filter( + int* arrived_rf_num, vectorized::VExprContextSPtrs& conjuncts) { if (_is_all_rf_applied) { *arrived_rf_num = _runtime_filter_descs.size(); return Status::OK(); @@ -133,12 +133,12 @@ Status RuntimeFilterHelper::try_append_late_arrival_runtime_filter(int* arrived_ std::vector<vectorized::VRuntimeFilterPtr> exprs; int current_arrived_rf_num = 0; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { - RETURN_IF_ERROR(_consumers[i]->acquire_expr(_probe_ctxs, exprs)); - current_arrived_rf_num += _consumers[i]->applied(); + RETURN_IF_ERROR(_consumers[i]->acquire_expr(exprs)); + current_arrived_rf_num += _consumers[i]->is_applied(); } // 2. Append unapplied runtime filters to _conjuncts if (!exprs.empty()) { - RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs)); + RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs, conjuncts)); } if (current_arrived_rf_num == _runtime_filter_descs.size()) { _is_all_rf_applied = true; diff --git a/be/src/runtime_filter/runtime_filter_helper.h b/be/src/runtime_filter/runtime_filter_helper.h index fa1a421de7b..c9fafd58b0b 100644 --- a/be/src/runtime_filter/runtime_filter_helper.h +++ b/be/src/runtime_filter/runtime_filter_helper.h @@ -28,32 +28,31 @@ class RuntimeFilterHelper { public: RuntimeFilterHelper(const int32_t node_id, const std::vector<TRuntimeFilterDesc>& runtime_filters, - const RowDescriptor& row_descriptor, - vectorized::VExprContextSPtrs& conjuncts); + const RowDescriptor& row_descriptor); ~RuntimeFilterHelper() = default; Status init(RuntimeState* state, RuntimeProfile* profile, bool need_local_merge); - - // Try to append late arrived runtime filters. - // Return num of filters which are applied already. - Status try_append_late_arrival_runtime_filter(int* arrived_rf_num); + // Get all arrived runtime filters at Open phase which will be push down to storage. + // Called by Operator. + Status acquire_runtime_filter(vectorized::VExprContextSPtrs& conjuncts); + // The un-arrival filters will be checked every time the scanner is scheduled. + // And once new runtime filters arrived, we will use it to do operator's filtering. + // Called by Scanner. + Status try_append_late_arrival_runtime_filter(int* arrived_rf_num, + vectorized::VExprContextSPtrs& conjuncts); void init_runtime_filter_dependency( std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>& runtime_filter_dependencies, const int id, const int node_id, const std::string& name); - // Get all arrived runtime filters at Open phase. - Status acquire_runtime_filter(); - - bool is_all_rf_applied() const { return _is_all_rf_applied; } - private: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(bool need_local_merge); // Append late-arrival runtime filters to the vconjunct_ctx. - Status _append_rf_into_conjuncts(const std::vector<vectorized::VRuntimeFilterPtr>& vexprs); + Status _append_rf_into_conjuncts(const std::vector<vectorized::VRuntimeFilterPtr>& vexprs, + vectorized::VExprContextSPtrs& conjuncts); std::vector<std::shared_ptr<RuntimeFilterConsumer>> _consumers; std::mutex _rf_locks; @@ -61,18 +60,13 @@ private: int32_t _node_id; std::vector<TRuntimeFilterDesc> _runtime_filter_descs; - std::list<vectorized::VExprContextSPtr> _probe_ctxs; - const RowDescriptor& _row_descriptor_ref; - vectorized::VExprContextSPtrs& _conjuncts_ref; - // True means all runtime filters are applied to scanners bool _is_all_rf_applied = true; std::shared_ptr<std::atomic_bool> _blocked_by_rf; RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr; - std::unique_ptr<RuntimeProfile> _profile; }; diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index ba222c22b94..d8ebeeacb9f 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -88,7 +88,7 @@ Status RuntimeFilterMgr::register_consumer_filter( return Status::OK(); } -Status RuntimeFilterMgr::register_local_merger_filter( +Status RuntimeFilterMgr::register_local_merger_producer_filter( const TRuntimeFilterDesc& desc, const TQueryOptions& options, std::shared_ptr<RuntimeFilterProducer> producer_filter) { DCHECK(_is_global); @@ -137,15 +137,14 @@ Status RuntimeFilterMgr::register_producer_filter( DCHECK(!_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; - std::lock_guard<std::mutex> l(_lock); - auto iter = _producer_map.find(key); - DCHECK(_state != nullptr); - if (iter != _producer_map.end()) { - return Status::InvalidArgument("filter has registed"); + + std::lock_guard<std::mutex> l(_lock); + if (_producer_id_set.contains(key)) { + return Status::InvalidArgument("filter {} has been registered", key); } RETURN_IF_ERROR(RuntimeFilterProducer::create(_state, &desc, producer_filter, parent_profile)); - _producer_map.emplace(key, *producer_filter); + _producer_id_set.insert(key); return Status::OK(); } @@ -235,6 +234,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptr<QueryC cnt_val.source_addrs.push_back(request->source_addr()); Status st = Status::OK(); + // After all runtime filters' size are collected, we should send response to all producers. if (cnt_val.merger->add_rf_size(request->filter_size())) { auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ? std::weak_ptr<QueryContext> {} : query_ctx; @@ -311,7 +311,7 @@ Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que RETURN_IF_ERROR(RuntimeFilterProducer::create(_state, &cnt_val.runtime_filter_desc, &tmp_filter, nullptr)); - RETURN_IF_ERROR(tmp_filter->assign_data_into_wrapper(*request, attach_data)); + RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data)); RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get())); @@ -395,27 +395,6 @@ Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que return st; } -Status RuntimeFilterMergeController::acquire( - UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) { - uint32_t shard = _get_controller_shard_idx(query_id); - std::lock_guard<std::mutex> guard(_controller_mutex[shard]); - auto iter = _filter_controller_map[shard].find(query_id); - if (iter == _filter_controller_map[shard].end()) { - return Status::InvalidArgument("not found entity, query-id:{}", query_id.to_string()); - } - *handle = _filter_controller_map[shard][query_id].lock(); - if (*handle == nullptr) { - return Status::InvalidArgument("entity is closed"); - } - return Status::OK(); -} - -void RuntimeFilterMergeController::remove_entity(UniqueId query_id) { - uint32_t shard = _get_controller_shard_idx(query_id); - std::lock_guard<std::mutex> guard(_controller_mutex[shard]); - _filter_controller_map[shard].erase(query_id); -} - RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) { RuntimeFilterParamsContext* params = state->get_query_ctx()->obj_pool.add(new RuntimeFilterParamsContext()); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 507e6ad085b..e0b53409c59 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -81,38 +81,27 @@ public: ~RuntimeFilterMgr(); + // get/set consumer std::vector<std::shared_ptr<RuntimeFilterConsumer>> get_consume_filters(int filter_id); - - std::shared_ptr<RuntimeFilterProducer> try_get_product_filter(const int filter_id) { - std::lock_guard<std::mutex> l(_lock); - auto iter = _producer_map.find(filter_id); - if (iter == _producer_map.end()) { - return nullptr; - } - return iter->second; - } - - // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer_filter, bool need_local_merge, RuntimeProfile* parent_profile); - Status register_local_merger_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, - std::shared_ptr<RuntimeFilterProducer> producer_filter); - + // get/set local-merge producer + Status register_local_merger_producer_filter( + const TRuntimeFilterDesc& desc, const TQueryOptions& options, + std::shared_ptr<RuntimeFilterProducer> producer_filter); Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters); + // Create local producer. This producer is hold by RuntimeFilterSlots. Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, std::shared_ptr<RuntimeFilterProducer>* producer_filter, RuntimeProfile* parent_profile); // update filter by remote void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); - Status get_merge_addr(TNetworkAddress* addr); - Status sync_filter_size(const PSyncFilterSizeRequest* request); private: @@ -132,7 +121,7 @@ private: // use filter_id as key // key: "filter-id" std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>> _consumer_map; - std::map<int32_t, std::shared_ptr<RuntimeFilterProducer>> _producer_map; + std::set<int32_t> _producer_id_set; std::map<int32_t, LocalMergeContext> _local_merge_map; RuntimeFilterParamsContext* _state = nullptr; @@ -182,68 +171,4 @@ private: RuntimeFilterParamsContext* _state = nullptr; }; -// RuntimeFilterMergeController has a map query-id -> entity -class RuntimeFilterMergeController { -public: - RuntimeFilterMergeController() = default; - ~RuntimeFilterMergeController() = default; - - // thread safe - // add a query-id -> entity - // If a query-id -> entity already exists - // add_entity will return a exists entity - Status add_entity(const auto& params, UniqueId query_id, const TQueryOptions& query_options, - std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, - RuntimeFilterParamsContext* state) { - if (!params.__isset.runtime_filter_params || - params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { - return Status::OK(); - } - - // TODO: why we need string, direct use UniqueId - uint32_t shard = _get_controller_shard_idx(query_id); - std::lock_guard<std::mutex> guard(_controller_mutex[shard]); - auto iter = _filter_controller_map[shard].find(query_id); - if (iter == _filter_controller_map[shard].end()) { - *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>( - new RuntimeFilterMergeControllerEntity(state), - [this](RuntimeFilterMergeControllerEntity* entity) { - remove_entity(entity->query_id()); - delete entity; - }); - _filter_controller_map[shard][query_id] = *handle; - const TRuntimeFilterParams& filter_params = params.runtime_filter_params; - RETURN_IF_ERROR(handle->get()->init(query_id, filter_params, query_options)); - } else { - *handle = _filter_controller_map[shard][query_id].lock(); - } - return Status::OK(); - } - - // thread safe - // increase a reference count - // if a query-id is not exist - // Status.not_ok will be returned and a empty ptr will returned by *handle - Status acquire(UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle); - - // thread safe - // remove a entity by query-id - // remove_entity will be called automatically by entity when entity is destroyed - void remove_entity(UniqueId query_id); - - static const int kShardNum = 128; - -private: - uint32_t _get_controller_shard_idx(UniqueId& query_id) { - return (uint32_t)query_id.hi % kShardNum; - } - - std::mutex _controller_mutex[kShardNum]; - // We store the weak pointer here. - // When the external object is destroyed, we need to clear this record - using FilterControllerMap = - std::unordered_map<UniqueId, std::weak_ptr<RuntimeFilterMergeControllerEntity>>; - // str(query-id) -> entity - FilterControllerMap _filter_controller_map[kShardNum]; -}; } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_slots.cpp b/be/src/runtime_filter/runtime_filter_slots.cpp index 5031c59a04b..26fb7eca46f 100644 --- a/be/src/runtime_filter/runtime_filter_slots.cpp +++ b/be/src/runtime_filter/runtime_filter_slots.cpp @@ -27,27 +27,9 @@ namespace doris { Status RuntimeFilterSlots::init(RuntimeState* state, const std::vector<TRuntimeFilterDesc>& runtime_filter_descs) { _runtime_filters.resize(runtime_filter_descs.size()); - std::unordered_map<int, RuntimeFilterProducer*> id_to_in_filter; for (size_t i = 0; i < runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->register_producer_runtime_filter( runtime_filter_descs[i], &_runtime_filters[i], _profile.get())); - if (runtime_filter_descs[i].type == TRuntimeFilterType::IN) { - id_to_in_filter.insert({runtime_filter_descs[i].expr_order, _runtime_filters[i].get()}); - } else if (runtime_filter_descs[i].type == TRuntimeFilterType::IN_OR_BLOOM && - !id_to_in_filter.contains(runtime_filter_descs[i].expr_order)) { - id_to_in_filter.insert({runtime_filter_descs[i].expr_order, _runtime_filters[i].get()}); - } - } - for (size_t i = 0; i < runtime_filter_descs.size(); i++) { - if (id_to_in_filter.contains(_runtime_filters[i]->expr_order()) && - _runtime_filters[i].get() != id_to_in_filter[_runtime_filters[i]->expr_order()]) { - RuntimeFilterProducer::Callback callback = - [&, filter = _runtime_filters[i].get()]() -> void { - filter->set_wrapper_state_and_ready_to_publish( - RuntimeFilterWrapper::State::DISABLED, "exist in_filter"); - }; - id_to_in_filter[_runtime_filters[i]->expr_order()]->with_callback(callback); - } } return Status::OK(); } diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp b/be/src/runtime_filter/runtime_filter_wrapper.cpp index f688af21a4c..c81e04758f2 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.cpp +++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp @@ -18,17 +18,12 @@ #include "runtime_filter/runtime_filter_wrapper.h" #include "exprs/create_predicate_function.h" -#include "vec/exprs/vbitmap_predicate.h" -#include "vec/exprs/vbloom_predicate.h" -#include "vec/exprs/vdirect_in_predicate.h" -#include "vec/exprs/vexpr_context.h" namespace doris { RuntimeFilterWrapper::RuntimeFilterWrapper(const RuntimeFilterParams* params) : RuntimeFilterWrapper(params->column_return_type, params->filter_type, params->filter_id, - State::UNINITED) { - _max_in_num = params->max_in_num; + State::UNINITED, params->max_in_num) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { _hybrid_set.reset(create_set(_column_return_type)); @@ -75,150 +70,14 @@ RuntimeFilterWrapper::RuntimeFilterWrapper(const RuntimeFilterParams* params) } } -Status RuntimeFilterWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VRuntimeFilterPtr>& container, - const TExpr& probe_expr) { - vectorized::VExprContextSPtr probe_ctx; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, probe_ctx)); - probe_ctxs.push_back(probe_ctx); - DCHECK(probe_ctx->root()->type().type == _column_return_type || - (is_string_type(probe_ctx->root()->type().type) && - is_string_type(_column_return_type)) || - _filter_type == RuntimeFilterType::BITMAP_FILTER) - << " prob_expr->root()->type().type: " << int(probe_ctx->root()->type().type) - << " _column_return_type: " << int(_column_return_type) - << " _filter_type: " << filter_type_to_string(_filter_type); - - auto real_filter_type = get_real_type(); - bool null_aware = contain_null(); - switch (real_filter_type) { - case RuntimeFilterType::IN_FILTER: { - TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); - type_desc.__set_is_nullable(false); - TExprNode node; - node.__set_type(type_desc); - node.__set_node_type(null_aware ? TExprNodeType::NULL_AWARE_IN_PRED - : TExprNodeType::IN_PRED); - node.in_predicate.__set_is_not_in(false); - node.__set_opcode(TExprOpcode::FILTER_IN); - node.__set_is_nullable(false); - auto in_pred = vectorized::VDirectInPredicate::create_shared(node, _hybrid_set); - in_pred->add_child(probe_ctx->root()); - auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared( - node, in_pred, get_in_list_ignore_thredhold(_hybrid_set->size()), null_aware); - container.push_back(wrapper); - break; - } - case RuntimeFilterType::MIN_FILTER: { - // create min filter - vectorized::VExprSPtr min_pred; - TExprNode min_pred_node; - RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::GE, min_pred, - &min_pred_node, null_aware)); - vectorized::VExprSPtr min_literal; - RETURN_IF_ERROR( - create_literal(probe_ctx->root()->type(), _minmax_func->get_min(), min_literal)); - min_pred->add_child(probe_ctx->root()); - min_pred->add_child(min_literal); - container.push_back(vectorized::VRuntimeFilterWrapper::create_shared( - min_pred_node, min_pred, get_comparison_ignore_thredhold())); - break; - } - case RuntimeFilterType::MAX_FILTER: { - vectorized::VExprSPtr max_pred; - // create max filter - TExprNode max_pred_node; - RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred, - &max_pred_node, null_aware)); - vectorized::VExprSPtr max_literal; - RETURN_IF_ERROR( - create_literal(probe_ctx->root()->type(), _minmax_func->get_max(), max_literal)); - max_pred->add_child(probe_ctx->root()); - max_pred->add_child(max_literal); - container.push_back(vectorized::VRuntimeFilterWrapper::create_shared( - max_pred_node, max_pred, get_comparison_ignore_thredhold())); - break; - } - case RuntimeFilterType::MINMAX_FILTER: { - vectorized::VExprSPtr max_pred; - // create max filter - TExprNode max_pred_node; - RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred, - &max_pred_node, null_aware)); - vectorized::VExprSPtr max_literal; - RETURN_IF_ERROR( - create_literal(probe_ctx->root()->type(), _minmax_func->get_max(), max_literal)); - max_pred->add_child(probe_ctx->root()); - max_pred->add_child(max_literal); - container.push_back(vectorized::VRuntimeFilterWrapper::create_shared( - max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware)); - - vectorized::VExprContextSPtr new_probe_ctx; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx)); - probe_ctxs.push_back(new_probe_ctx); - - // create min filter - vectorized::VExprSPtr min_pred; - TExprNode min_pred_node; - RETURN_IF_ERROR(create_vbin_predicate(new_probe_ctx->root()->type(), TExprOpcode::GE, - min_pred, &min_pred_node, null_aware)); - vectorized::VExprSPtr min_literal; - RETURN_IF_ERROR(create_literal(new_probe_ctx->root()->type(), _minmax_func->get_min(), - min_literal)); - min_pred->add_child(new_probe_ctx->root()); - min_pred->add_child(min_literal); - container.push_back(vectorized::VRuntimeFilterWrapper::create_shared( - min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware)); - break; - } - case RuntimeFilterType::BLOOM_FILTER: { - // create a bloom filter - TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); - type_desc.__set_is_nullable(false); - TExprNode node; - node.__set_type(type_desc); - node.__set_node_type(TExprNodeType::BLOOM_PRED); - node.__set_opcode(TExprOpcode::RT_FILTER); - node.__set_is_nullable(false); - auto bloom_pred = vectorized::VBloomPredicate::create_shared(node); - bloom_pred->set_filter(_bloom_filter_func); - bloom_pred->add_child(probe_ctx->root()); - auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared( - node, bloom_pred, get_bloom_filter_ignore_thredhold()); - container.push_back(wrapper); - break; - } - case RuntimeFilterType::BITMAP_FILTER: { - // create a bitmap filter - TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); - type_desc.__set_is_nullable(false); - TExprNode node; - node.__set_type(type_desc); - node.__set_node_type(TExprNodeType::BITMAP_PRED); - node.__set_opcode(TExprOpcode::RT_FILTER); - node.__set_is_nullable(false); - auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node); - bitmap_pred->set_filter(_bitmap_filter_func); - bitmap_pred->add_child(probe_ctx->root()); - auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, bitmap_pred, 0); - container.push_back(wrapper); - break; - } - default: - DCHECK(false); - break; - } - return Status::OK(); -} - -Status RuntimeFilterWrapper::change_to_bloom_filter() { +Status RuntimeFilterWrapper::_change_to_bloom_filter() { if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) { return Status::InternalError("Can not change to bloom filter, {}", debug_string()); } BloomFilterFuncBase* bf = _bloom_filter_func.get(); if (bf != nullptr) { - insert_to_bloom_filter(bf); + _insert(bf); } else if (_hybrid_set != nullptr && _hybrid_set->size() != 0) { return Status::InternalError("change to bloom filter need empty set, {}", debug_string()); } @@ -228,7 +87,7 @@ Status RuntimeFilterWrapper::change_to_bloom_filter() { return Status::OK(); } -void RuntimeFilterWrapper::batch_assign( +void RuntimeFilterWrapper::_batch_assign( const PInFilter& filter, void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, PColumnValue&)) { for (int i = 0; i < filter.values_size(); ++i) { @@ -237,16 +96,20 @@ void RuntimeFilterWrapper::batch_assign( } } -Status RuntimeFilterWrapper::init_bloom_filter(const size_t runtime_size) { - if (_filter_type != RuntimeFilterType::BLOOM_FILTER && - _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) { - throw Exception(ErrorCode::INTERNAL_ERROR, "init_bloom_filter meet invalid input type {}", - int(_filter_type)); +Status RuntimeFilterWrapper::init(const size_t real_size) { + if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && real_size > _max_in_num) { + RETURN_IF_ERROR(_change_to_bloom_filter()); + } + if (get_real_type() == RuntimeFilterType::IN_FILTER && real_size > _max_in_num) { + set_state(RuntimeFilterWrapper::State::DISABLED, "reach max in num"); } - return _bloom_filter_func->init_with_runtime_size(runtime_size); + if (_bloom_filter_func) { + RETURN_IF_ERROR(_bloom_filter_func->init_with_runtime_size(real_size)); + } + return Status::OK(); } -void RuntimeFilterWrapper::insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { +void RuntimeFilterWrapper::_insert(BloomFilterFuncBase* bloom_filter) const { if (_hybrid_set->size() > 0) { auto* it = _hybrid_set->begin(); while (it->has_next()) { @@ -259,7 +122,7 @@ void RuntimeFilterWrapper::insert_to_bloom_filter(BloomFilterFuncBase* bloom_fil } } -void RuntimeFilterWrapper::insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) { +void RuntimeFilterWrapper::insert(const vectorized::ColumnPtr& column, size_t start) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { _hybrid_set->insert_fixed_len(column, start); @@ -283,36 +146,35 @@ void RuntimeFilterWrapper::insert_fixed_len(const vectorized::ColumnPtr& column, } break; } + case RuntimeFilterType::BITMAP_FILTER: { + std::vector<const BitmapValue*> bitmaps; + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = + assert_cast<const vectorized::ColumnBitmap&>(nullable->get_nested_column()); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + bitmaps.push_back(&(col.get_data()[i])); + } + } + } else { + const auto* col = assert_cast<const vectorized::ColumnBitmap*>(column.get()); + for (size_t i = start; i < column->size(); i++) { + bitmaps.push_back(&(col->get_data()[i])); + } + } + _bitmap_filter_func->insert_many(bitmaps); + break; + } default: DCHECK(false); break; } } -void RuntimeFilterWrapper::bitmap_filter_insert_batch(const vectorized::ColumnPtr column, - size_t start) { - std::vector<const BitmapValue*> bitmaps; - if (column->is_nullable()) { - const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); - const auto& col = - assert_cast<const vectorized::ColumnBitmap&>(nullable->get_nested_column()); - const auto& nullmap = - assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) - .get_data(); - for (size_t i = start; i < column->size(); i++) { - if (!nullmap[i]) { - bitmaps.push_back(&(col.get_data()[i])); - } - } - } else { - const auto* col = assert_cast<const vectorized::ColumnBitmap*>(column.get()); - for (size_t i = start; i < column->size(); i++) { - bitmaps.push_back(&(col->get_data()[i])); - } - } - _bitmap_filter_func->insert_many(bitmaps); -} - bool RuntimeFilterWrapper::build_bf_by_runtime_size() const { return _bloom_filter_func ? _bloom_filter_func->build_bf_by_runtime_size() : false; } @@ -367,17 +229,17 @@ Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { // case2: use default size to init bf RETURN_IF_ERROR(_bloom_filter_func->init_with_fixed_length()); - RETURN_IF_ERROR(change_to_bloom_filter()); + RETURN_IF_ERROR(_change_to_bloom_filter()); } } else { // case1&case2: use input bf directly and insert hybrid set data into bf _bloom_filter_func = other->_bloom_filter_func; - RETURN_IF_ERROR(change_to_bloom_filter()); + RETURN_IF_ERROR(_change_to_bloom_filter()); } } else { if (other_filter_type == RuntimeFilterType::IN_FILTER) { // case2: insert data to global filter - other->insert_to_bloom_filter(_bloom_filter_func.get()); + other->_insert(_bloom_filter_func.get()); } else { // case1&case2: all input bf must has same size RETURN_IF_ERROR(_bloom_filter_func->merge(other->_bloom_filter_func.get())); @@ -396,7 +258,7 @@ Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { return Status::OK(); } -Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_null) { +Status RuntimeFilterWrapper::_assign(const PInFilter& in_filter, bool contain_null) { if (contain_null) { _hybrid_set->set_null_aware(true); _hybrid_set->insert((const void*)nullptr); @@ -404,42 +266,42 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul switch (_column_return_type) { case TYPE_BOOLEAN: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { bool bool_val = column.boolval(); set->insert(&bool_val); }); break; } case TYPE_TINYINT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto int_val = static_cast<int8_t>(column.intval()); set->insert(&int_val); }); break; } case TYPE_SMALLINT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto int_val = static_cast<int16_t>(column.intval()); set->insert(&int_val); }); break; } case TYPE_INT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { int32_t int_val = column.intval(); set->insert(&int_val); }); break; } case TYPE_BIGINT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { int64_t long_val = column.longval(); set->insert(&long_val); }); break; } case TYPE_LARGEINT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto string_val = column.stringval(); StringParser::ParseResult result; auto int128_val = StringParser::string_to_int<int128_t>(string_val.c_str(), @@ -450,28 +312,28 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul break; } case TYPE_FLOAT: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto float_val = static_cast<float>(column.doubleval()); set->insert(&float_val); }); break; } case TYPE_DOUBLE: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { double double_val = column.doubleval(); set->insert(&double_val); }); break; } case TYPE_DATEV2: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto date_v2_val = column.intval(); set->insert(&date_v2_val); }); break; } case TYPE_DATETIMEV2: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto date_v2_val = column.longval(); set->insert(&date_v2_val); }); @@ -479,7 +341,7 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul } case TYPE_DATETIME: case TYPE_DATE: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { const auto& string_val_ref = column.stringval(); VecDateTimeValue datetime_val; datetime_val.from_date_str(string_val_ref.c_str(), string_val_ref.length()); @@ -488,7 +350,7 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul break; } case TYPE_DECIMALV2: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { const auto& string_val_ref = column.stringval(); DecimalV2Value decimal_val(string_val_ref); set->insert(&decimal_val); @@ -496,21 +358,21 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul break; } case TYPE_DECIMAL32: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { int32_t decimal_32_val = column.intval(); set->insert(&decimal_32_val); }); break; } case TYPE_DECIMAL64: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { int64_t decimal_64_val = column.longval(); set->insert(&decimal_64_val); }); break; } case TYPE_DECIMAL128I: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto string_val = column.stringval(); StringParser::ParseResult result; auto int128_val = StringParser::string_to_int<int128_t>(string_val.c_str(), @@ -521,7 +383,7 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul break; } case TYPE_DECIMAL256: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto string_val = column.stringval(); StringParser::ParseResult result; auto int_val = StringParser::string_to_int<wide::Int256>(string_val.c_str(), @@ -534,7 +396,7 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { const std::string& string_value = column.stringval(); // string_value is std::string, call insert(data, size) function in StringSet will not cast as StringRef // so could avoid some cast error at different class object. @@ -543,14 +405,14 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul break; } case TYPE_IPV4: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { int32_t tmp = column.intval(); set->insert(&tmp); }); break; } case TYPE_IPV6: { - batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { + _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) { auto string_val = column.stringval(); StringParser::ParseResult result; auto int128_val = StringParser::string_to_int<uint128_t>(string_val.c_str(), @@ -568,15 +430,13 @@ Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool contain_nul return Status::OK(); } -Status RuntimeFilterWrapper::assign(const PBloomFilter& bloom_filter, - butil::IOBufAsZeroCopyInputStream* data, bool contain_null) { +Status RuntimeFilterWrapper::_assign(const PBloomFilter& bloom_filter, + butil::IOBufAsZeroCopyInputStream* data, bool contain_null) { RETURN_IF_ERROR(_bloom_filter_func->assign(data, bloom_filter.filter_length(), contain_null)); return Status::OK(); } -// used by shuffle runtime filter -// assign this filter by protobuf -Status RuntimeFilterWrapper::assign(const PMinMaxFilter& minmax_filter, bool contain_null) { +Status RuntimeFilterWrapper::_assign(const PMinMaxFilter& minmax_filter, bool contain_null) { if (contain_null) { _minmax_func->set_null_aware(true); _minmax_func->set_contain_null(); @@ -721,10 +581,6 @@ Status RuntimeFilterWrapper::assign(const PMinMaxFilter& minmax_filter, bool con return Status::InternalError("not support!"); } -void RuntimeFilterWrapper::get_bloom_filter_desc(char** data, int* filter_length) { - _bloom_filter_func->get_data(data, filter_length); -} - bool RuntimeFilterWrapper::contain_null() const { if (get_real_type() == RuntimeFilterType::BLOOM_FILTER) { return _bloom_filter_func->contain_null(); @@ -779,4 +635,10 @@ void RuntimeFilterWrapper::_to_protobuf(PMinMaxFilter* filter) { _minmax_func->to_pb(filter); } +void RuntimeFilterWrapper::_to_protobuf(PBloomFilter* filter, char** data, int* filter_length) { + _bloom_filter_func->get_data(data, filter_length); + filter->set_filter_length(*filter_length); + filter->set_always_true(false); +} + } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h b/be/src/runtime_filter/runtime_filter_wrapper.h index b41e72c7cea..a88f1baf869 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.h +++ b/be/src/runtime_filter/runtime_filter_wrapper.h @@ -37,78 +37,18 @@ public: RuntimeFilterWrapper(const RuntimeFilterParams* params); RuntimeFilterWrapper(PrimitiveType column_type, RuntimeFilterType type, uint32_t filter_id, - State state) + State state, int max_in_num = 0) : _column_return_type(column_type), _filter_type(type), _filter_id(filter_id), - _state(state) {} - - Status change_to_bloom_filter(); - - bool is_valid() const { return _state != State::DISABLED && _state != State::IGNORED; } - int filter_id() const { return _filter_id; } - - int max_in_num() const { return _max_in_num; } - - bool build_bf_by_runtime_size() const; - - Status init_bloom_filter(const size_t runtime_size); - - void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const; - - void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start); - - void insert_batch(const vectorized::ColumnPtr& column, size_t start) { - if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) { - bitmap_filter_insert_batch(column, start); - } else { - insert_fixed_len(column, start); - } - } - - void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t start); - - RuntimeFilterType get_real_type() const { - if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - if (_hybrid_set) { - return RuntimeFilterType::IN_FILTER; - } - return RuntimeFilterType::BLOOM_FILTER; - } - return _filter_type; - } - - Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VRuntimeFilterPtr>& push_exprs, - const TExpr& probe_expr); + _state(state), + _max_in_num(max_in_num) {} + Status init(const size_t runtime_size); + void insert(const vectorized::ColumnPtr& column, size_t start); Status merge(const RuntimeFilterWrapper* wrapper); - - Status assign(const PInFilter& in_filter, bool contain_null); - - // used by shuffle runtime filter - // assign this filter by protobuf - Status assign(const PBloomFilter& bloom_filter, butil::IOBufAsZeroCopyInputStream* data, - bool contain_null); - - // used by shuffle runtime filter - // assign this filter by protobuf - Status assign(const PMinMaxFilter& minmax_filter, bool contain_null); - - void get_bloom_filter_desc(char** data, int* filter_length); - - PrimitiveType column_type() { return _column_return_type; } - - bool contain_null() const; - - void batch_assign(const PInFilter& filter, - void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, - PColumnValue&)); - - friend class RuntimeFilter; - template <class T> - Status assign_data(const T& request, butil::IOBufAsZeroCopyInputStream* data) { + Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) { PFilterType filter_type = request.filter_type(); if (request.has_disabled() && request.disabled()) { @@ -126,24 +66,47 @@ public: switch (filter_type) { case PFilterType::IN_FILTER: { DCHECK(request.has_in_filter()); - return assign(request.in_filter(), request.contain_null()); + return _assign(request.in_filter(), request.contain_null()); } case PFilterType::BLOOM_FILTER: { DCHECK(request.has_bloom_filter()); _hybrid_set.reset(); // change in_or_bloom filter to bloom filter - return assign(request.bloom_filter(), data, request.contain_null()); + return _assign(request.bloom_filter(), data, request.contain_null()); } case PFilterType::MIN_FILTER: case PFilterType::MAX_FILTER: case PFilterType::MINMAX_FILTER: { DCHECK(request.has_minmax_filter()); - return assign(request.minmax_filter(), request.contain_null()); + return _assign(request.minmax_filter(), request.contain_null()); } default: return Status::InternalError("unknown filter type {}", int(filter_type)); } } + bool is_valid() const { return _state != State::DISABLED && _state != State::IGNORED; } + int filter_id() const { return _filter_id; } + bool build_bf_by_runtime_size() const; + + RuntimeFilterType get_real_type() const { + if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { + if (_hybrid_set) { + return RuntimeFilterType::IN_FILTER; + } + return RuntimeFilterType::BLOOM_FILTER; + } + return _filter_type; + } + + std::shared_ptr<MinMaxFuncBase> minmax_func() const { return _minmax_func; } + std::shared_ptr<HybridSetBase> hybrid_set() const { return _hybrid_set; } + std::shared_ptr<BloomFilterFuncBase> bloom_filter_func() const { return _bloom_filter_func; } + std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func() const { return _bitmap_filter_func; } + + PrimitiveType column_type() const { return _column_return_type; } + + bool contain_null() const; + std::string debug_string() const; void set_state(State state, std::string reason = "") { @@ -154,11 +117,8 @@ public: } _state = state; } - void disable(std::string reason) { set_state(State::DISABLED, reason); } - State get_state() const { return _state; } - void check_state(std::vector<State> assumed_states) const { if (!check_state_impl<RuntimeFilterWrapper>(_state, assumed_states)) { throw Exception(ErrorCode::INTERNAL_ERROR, @@ -166,7 +126,6 @@ public: states_to_string<RuntimeFilterWrapper>(assumed_states)); } } - static std::string to_string(const State& state) { switch (state) { case State::IGNORED: @@ -182,22 +141,33 @@ public: } } +private: + friend class RuntimeFilter; + void _insert(BloomFilterFuncBase* bloom_filter) const; + // used by shuffle runtime filter + // assign this filter by protobuf + Status _assign(const PInFilter& in_filter, bool contain_null); + Status _assign(const PBloomFilter& bloom_filter, butil::IOBufAsZeroCopyInputStream* data, + bool contain_null); + Status _assign(const PMinMaxFilter& minmax_filter, bool contain_null); + void _batch_assign(const PInFilter& filter, + void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, + PColumnValue&)); void _to_protobuf(PInFilter* filter); - void _to_protobuf(PMinMaxFilter* filter); - -private: + void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length); + Status _change_to_bloom_filter(); // When a runtime filter received from remote and it is a bloom filter, _column_return_type will be invalid. - PrimitiveType _column_return_type; // column type - RuntimeFilterType _filter_type; - int32_t _max_in_num; - uint32_t _filter_id; + const PrimitiveType _column_return_type; // column type + const RuntimeFilterType _filter_type; + const uint32_t _filter_id; + std::atomic<State> _state; + const int32_t _max_in_num; std::shared_ptr<MinMaxFuncBase> _minmax_func; std::shared_ptr<HybridSetBase> _hybrid_set; std::shared_ptr<BloomFilterFuncBase> _bloom_filter_func; std::shared_ptr<BitmapFilterFuncBase> _bitmap_filter_func; - std::atomic<State> _state; std::string _disabled_reason; }; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 31616cf0a57..5c850be40ad 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -222,7 +222,8 @@ Status VScanner::try_append_late_arrival_runtime_filter() { DCHECK(_applied_rf_num < _total_rf_num); int arrived_rf_num = 0; - RETURN_IF_ERROR(_local_state->_helper.try_append_late_arrival_runtime_filter(&arrived_rf_num)); + RETURN_IF_ERROR(_local_state->_helper.try_append_late_arrival_runtime_filter( + &arrived_rf_num, _local_state->_conjuncts)); if (arrived_rf_num == _applied_rf_num) { // No newly arrived runtime filters, just return; diff --git a/be/src/vec/exprs/vbitmap_predicate.cpp b/be/src/vec/exprs/vbitmap_predicate.cpp index 4e6f28950df..b09dfea4c3c 100644 --- a/be/src/vec/exprs/vbitmap_predicate.cpp +++ b/be/src/vec/exprs/vbitmap_predicate.cpp @@ -127,7 +127,7 @@ const std::string& vectorized::VBitmapPredicate::expr_name() const { return _expr_name; } -void vectorized::VBitmapPredicate::set_filter(std::shared_ptr<BitmapFilterFuncBase>& filter) { +void vectorized::VBitmapPredicate::set_filter(std::shared_ptr<BitmapFilterFuncBase> filter) { _filter = filter; } diff --git a/be/src/vec/exprs/vbitmap_predicate.h b/be/src/vec/exprs/vbitmap_predicate.h index bdb3ea2b00e..c37dfc69a3a 100644 --- a/be/src/vec/exprs/vbitmap_predicate.h +++ b/be/src/vec/exprs/vbitmap_predicate.h @@ -60,7 +60,7 @@ public: const std::string& expr_name() const override; - void set_filter(std::shared_ptr<BitmapFilterFuncBase>& filter); + void set_filter(std::shared_ptr<BitmapFilterFuncBase> filter); std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter_func() const override { return _filter; diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index 139547ec96f..45aeca684ed 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -105,7 +105,8 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result const std::string& VBloomPredicate::expr_name() const { return _expr_name; } -void VBloomPredicate::set_filter(std::shared_ptr<BloomFilterFuncBase>& filter) { + +void VBloomPredicate::set_filter(std::shared_ptr<BloomFilterFuncBase> filter) { _filter = filter; } diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h index 9ca5fdd0925..f5756e266da 100644 --- a/be/src/vec/exprs/vbloom_predicate.h +++ b/be/src/vec/exprs/vbloom_predicate.h @@ -49,7 +49,7 @@ public: FunctionContext::FunctionStateScope scope) override; void close(VExprContext* context, FunctionContext::FunctionStateScope scope) override; const std::string& expr_name() const override; - void set_filter(std::shared_ptr<BloomFilterFuncBase>& filter); + void set_filter(std::shared_ptr<BloomFilterFuncBase> filter); std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() const override { return _filter; } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index d2e47844aa8..46b94860701 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -614,7 +614,7 @@ message PPublishFilterRequestV2 { optional int64 merge_time = 9; optional bool contain_null = 10; optional bool ignored = 11; - repeated int32 fragment_ids = 12; + repeated int32 fragment_ids = 12; // deprecated optional uint64 local_merge_time = 13; optional bool disabled = 14; }; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 43ada0bdae3..88acfed96db 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1267,7 +1267,7 @@ struct TRuntimeFilterDesc { // Indicates if there is at least one target scan node that is not in the same // fragment as the broadcast join that produced the runtime filter - 7: required bool has_remote_targets + 7: required bool has_remote_targets // deprecated // The type of runtime filter to build. 8: required TRuntimeFilterType type --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org