This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit f7d23a0ca1c066530103db9f0442908b17681a3c Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Fri Feb 21 09:23:57 2025 +0800 [refactor](runtime filter) Simplify producer / slots (#48145) --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 14 +-- .../exec/nested_loop_join_build_operator.cpp | 2 +- be/src/runtime_filter/role/producer.cpp | 39 +++----- be/src/runtime_filter/role/producer.h | 83 +++++++--------- be/src/runtime_filter/role/runtime_filter.h | 1 + be/src/runtime_filter/runtime_filter_slots.cpp | 108 +++++++++++---------- be/src/runtime_filter/runtime_filter_slots.h | 53 +++------- be/src/runtime_filter/runtime_filter_slots_cross.h | 4 +- be/src/runtime_filter/runtime_filter_wrapper.h | 12 +-- 9 files changed, 136 insertions(+), 180 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 85f65e153a8..ba935feb26a 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -88,8 +88,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo // Hash Table Init RETURN_IF_ERROR(_hash_table_init(state)); - _runtime_filter_slots = std::make_shared<RuntimeFilterSlots>(_build_expr_ctxs, profile(), - _should_build_hash_table); + _runtime_filter_slots = std::make_shared<RuntimeFilterSlots>( + _build_expr_ctxs, profile(), _should_build_hash_table, p._is_broadcast_join); RETURN_IF_ERROR(_runtime_filter_slots->init(state, p._runtime_filter_descs)); return Status::OK(); } @@ -102,8 +102,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { #ifndef NDEBUG if (state->fuzzy_disable_runtime_filter_in_be()) { if ((_parent->operator_id() + random()) % 2 == 0) { - RETURN_IF_ERROR( - _runtime_filter_slots->skip_runtime_filters_process(state, _finish_dependency)); + _runtime_filter_slots->skip_runtime_filters(); } } #endif @@ -144,7 +143,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu try { RETURN_IF_ERROR(_runtime_filter_slots->process(state, _shared_state->build_block.get(), - _finish_dependency)); + _finish_dependency, + p._shared_hash_table_context)); } catch (Exception& e) { bool blocked_by_shared_hash_table_signal = !_should_build_hash_table && p._shared_hashtable_controller && @@ -508,7 +508,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* _shared_hash_table_context->block = local_state._shared_state->build_block; _shared_hash_table_context->build_indexes_null = local_state._shared_state->build_indexes_null; - local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); } } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); @@ -524,9 +523,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* return _shared_hash_table_context->status; } - RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context( - _shared_hash_table_context)); - local_state.profile()->add_info_string( "SharedHashTableFrom", print_id( diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 157947028e7..0f9fa44fab8 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -42,7 +42,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta } _runtime_filter_slots = - std::make_shared<RuntimeFilterSlotsCross>(_filter_src_expr_ctxs, profile(), true); + std::make_shared<RuntimeFilterSlotsCross>(_filter_src_expr_ctxs, profile()); RETURN_IF_ERROR(_runtime_filter_slots->init(state, p._runtime_filter_descs)); return Status::OK(); } diff --git a/be/src/runtime_filter/role/producer.cpp b/be/src/runtime_filter/role/producer.cpp index b35320a50d6..d1c2286b41c 100644 --- a/be/src/runtime_filter/role/producer.cpp +++ b/be/src/runtime_filter/role/producer.cpp @@ -44,9 +44,10 @@ Status RuntimeFilterProducer::_send_to_local_targets(RuntimeFilter* merger_filte return Status::OK(); }; -Status RuntimeFilterProducer::publish(RuntimeState* state, bool publish_local) { +Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table) { _check_state({State::READY_TO_PUBLISH}); + // TODO: do we still need this if wrapper is disabled / ignored? auto do_merge = [&]() { // two case we need do local merge: // 1. has remote target @@ -77,19 +78,17 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool publish_local) { // So for all runtime filters' producers, `publish` should notify all consumers in global RF mgr which manages local-merge RF and local RF mgr which manages others. RETURN_IF_ERROR(do_merge()); RETURN_IF_ERROR(_send_to_local_targets(this, false)); - } else if (!publish_local) { + } else if (build_hash_table) { if (_is_broadcast_join) { RETURN_IF_ERROR(_send_to_remote_targets(state, this)); } else { RETURN_IF_ERROR(do_merge()); } } else { - // remote broadcast join only push onetime in build shared hash table - // publish_local only set true on copy shared hash table DCHECK(_is_broadcast_join); } - _set_state(State::PUBLISHED); + set_state(State::PUBLISHED); return Status::OK(); } @@ -134,7 +133,7 @@ public: : Base(req, callback, context), _dependency(std::move(dependency)), _wrapper(wrapper) {} }; -Status RuntimeFilterProducer::send_filter_size( +Status RuntimeFilterProducer::send_size( RuntimeState* state, uint64_t local_filter_size, const std::shared_ptr<pipeline::CountedFinishDependency>& dependency) { if (_rf_state != State::WAITING_FOR_SEND_SIZE) { @@ -143,7 +142,7 @@ Status RuntimeFilterProducer::send_filter_size( } _dependency = dependency; _dependency->add(); - _set_state(State::WAITING_FOR_SYNCED_SIZE); + set_state(State::WAITING_FOR_SYNCED_SIZE); // two case we need do local merge: // 1. has remote target @@ -221,10 +220,10 @@ void RuntimeFilterProducer::set_synced_size(uint64_t global_size) { if (_dependency) { _dependency->sub(); } - _set_state(State::WAITING_FOR_DATA); + set_state(State::WAITING_FOR_DATA); } -Status RuntimeFilterProducer::init_with_size(size_t local_size) { +Status RuntimeFilterProducer::init(size_t local_size) { size_t real_size = _synced_size != -1 ? _synced_size : local_size; if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && real_size > _wrapper->max_in_num()) { @@ -236,23 +235,15 @@ Status RuntimeFilterProducer::init_with_size(size_t local_size) { } if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER && real_size > _wrapper->max_in_num()) { - disable_and_ready_to_publish("reach max in num"); + set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED, + "reach max in num"); } - return Status::OK(); -} - -void RuntimeFilterProducer::disable_meaningless_filters(std::unordered_set<int>& has_in_filter, - bool collect_in_filters) { - if (_rf_state == State::READY_TO_PUBLISH || - collect_in_filters != (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER)) { - return; - } - - if (has_in_filter.contains(_expr_order)) { - disable_and_ready_to_publish("exist in_filter"); - } else if (collect_in_filters) { - has_in_filter.insert(_expr_order); + if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER && !_callback.empty()) { + for (auto& call : _callback) { + call(); + } } + return Status::OK(); } } // namespace doris diff --git a/be/src/runtime_filter/role/producer.h b/be/src/runtime_filter/role/producer.h index a782f19277f..ae1b3708457 100644 --- a/be/src/runtime_filter/role/producer.h +++ b/be/src/runtime_filter/role/producer.h @@ -24,8 +24,19 @@ namespace doris { +/** + * init -> send_size -> insert -> publish + */ class RuntimeFilterProducer : public RuntimeFilter { public: + using Callback = std::function<void()>; + enum class State { + WAITING_FOR_SEND_SIZE = 0, + WAITING_FOR_SYNCED_SIZE = 1, + WAITING_FOR_DATA = 2, + READY_TO_PUBLISH = 3, + PUBLISHED = 4 + }; static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, std::shared_ptr<RuntimeFilterProducer>* res, RuntimeProfile* parent_profile) { @@ -40,55 +51,35 @@ public: return Status::OK(); } + Status init(size_t local_size); + Status send_size(RuntimeState* state, uint64_t local_filter_size, + const std::shared_ptr<pipeline::CountedFinishDependency>& dependency); // insert data to build filter - void insert_batch(vectorized::ColumnPtr column, size_t start) { + void insert(vectorized::ColumnPtr column, size_t start) { if (_rf_state == State::READY_TO_PUBLISH || _rf_state == State::PUBLISHED) { + DCHECK(!_wrapper->is_valid()); return; } _check_state({State::WAITING_FOR_DATA}); _wrapper->insert_batch(column, start); } - - int expr_order() const { return _expr_order; } - - Status init_with_size(size_t local_size); - - Status send_filter_size(RuntimeState* state, uint64_t local_filter_size, - const std::shared_ptr<pipeline::CountedFinishDependency>& dependency); - - Status publish(RuntimeState* state, bool publish_local); - - void set_synced_size(uint64_t global_size); - + Status publish(RuntimeState* state, bool build_hash_table); std::string debug_string() const override { return fmt::format("Producer: ({}, state: {}, dependency: {}, synced_size: {})", _debug_string(), to_string(_rf_state), _dependency ? _dependency->debug_string() : "none", _synced_size); } - enum class State { - WAITING_FOR_SEND_SIZE = 0, - WAITING_FOR_SYNCED_SIZE = 1, - WAITING_FOR_DATA = 2, - READY_TO_PUBLISH = 3, - PUBLISHED = 4 - }; - - void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State state) { - if (_set_state(State::READY_TO_PUBLISH)) { - _wrapper->set_state(state); - } - } - - void disable_and_ready_to_publish(std::string reason) { - if (_set_state(State::READY_TO_PUBLISH)) { - _wrapper->disable(reason); + void with_callback(Callback& callback) { _callback.emplace_back(callback); } + int expr_order() const { return _expr_order; } + void set_synced_size(uint64_t global_size); + void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State state, + std::string reason = "") { + if (set_state(State::READY_TO_PUBLISH)) { + _wrapper->set_state(state, reason); } } - void disable_meaningless_filters(std::unordered_set<int>& has_in_filter, - bool collect_in_filters); - static std::string to_string(const State& state) { switch (state) { case State::WAITING_FOR_SEND_SIZE: @@ -109,11 +100,20 @@ public: void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { context->runtime_filters[_wrapper->filter_id()] = _wrapper; } - void copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) { _wrapper = context->runtime_filters[_wrapper->filter_id()]; } + bool set_state(State state) { + if (_rf_state == State::PUBLISHED || + (state != State::PUBLISHED && _rf_state == State::READY_TO_PUBLISH)) { + return false; + } + _rf_state = state; + _profile->add_info_string("Info", debug_string()); + return true; + } + private: RuntimeFilterProducer(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, RuntimeProfile* parent_profile) @@ -137,24 +137,15 @@ private: } } - bool _set_state(State state) { - if (_rf_state == State::PUBLISHED || - (state != State::PUBLISHED && _rf_state == State::READY_TO_PUBLISH)) { - return false; - } - _rf_state = state; - _profile->add_info_string("Info", debug_string()); - return true; - } - - bool _is_broadcast_join; - int _expr_order; + const bool _is_broadcast_join; + const int _expr_order; int64_t _synced_size = -1; std::shared_ptr<pipeline::CountedFinishDependency> _dependency; std::atomic<State> _rf_state; std::unique_ptr<RuntimeProfile> _profile; + std::vector<Callback> _callback; }; } // namespace doris diff --git a/be/src/runtime_filter/role/runtime_filter.h b/be/src/runtime_filter/role/runtime_filter.h index 0c9c80c0513..41623fbc74d 100644 --- a/be/src/runtime_filter/role/runtime_filter.h +++ b/be/src/runtime_filter/role/runtime_filter.h @@ -38,6 +38,7 @@ class RuntimeFilter { public: virtual ~RuntimeFilter() = default; + RuntimeFilterWrapper* impl() const { return _wrapper.get(); } RuntimeFilterType type() const { return _runtime_filter_type; } bool has_local_target() const { return _has_local_target; } diff --git a/be/src/runtime_filter/runtime_filter_slots.cpp b/be/src/runtime_filter/runtime_filter_slots.cpp index add42cd1f88..5031c59a04b 100644 --- a/be/src/runtime_filter/runtime_filter_slots.cpp +++ b/be/src/runtime_filter/runtime_filter_slots.cpp @@ -24,37 +24,41 @@ namespace doris { -Status RuntimeFilterSlots::send_filter_size( - RuntimeState* state, uint64_t hash_table_size, - std::shared_ptr<pipeline::CountedFinishDependency> dependency) { - if (_skip_runtime_filters_process) { - return Status::OK(); +Status RuntimeFilterSlots::init(RuntimeState* state, + const std::vector<TRuntimeFilterDesc>& runtime_filter_descs) { + _runtime_filters.resize(runtime_filter_descs.size()); + std::unordered_map<int, RuntimeFilterProducer*> id_to_in_filter; + for (size_t i = 0; i < runtime_filter_descs.size(); i++) { + RETURN_IF_ERROR(state->register_producer_runtime_filter( + runtime_filter_descs[i], &_runtime_filters[i], _profile.get())); + if (runtime_filter_descs[i].type == TRuntimeFilterType::IN) { + id_to_in_filter.insert({runtime_filter_descs[i].expr_order, _runtime_filters[i].get()}); + } else if (runtime_filter_descs[i].type == TRuntimeFilterType::IN_OR_BLOOM && + !id_to_in_filter.contains(runtime_filter_descs[i].expr_order)) { + id_to_in_filter.insert({runtime_filter_descs[i].expr_order, _runtime_filters[i].get()}); + } } - - dependency->add(); // add count at start to avoid dependency ready multiple times - Defer defer {[&]() { dependency->sub(); }}; // remove the initial external add - for (auto runtime_filter : _runtime_filters) { - RETURN_IF_ERROR(runtime_filter->send_filter_size(state, hash_table_size, dependency)); + for (size_t i = 0; i < runtime_filter_descs.size(); i++) { + if (id_to_in_filter.contains(_runtime_filters[i]->expr_order()) && + _runtime_filters[i].get() != id_to_in_filter[_runtime_filters[i]->expr_order()]) { + RuntimeFilterProducer::Callback callback = + [&, filter = _runtime_filters[i].get()]() -> void { + filter->set_wrapper_state_and_ready_to_publish( + RuntimeFilterWrapper::State::DISABLED, "exist in_filter"); + }; + id_to_in_filter[_runtime_filters[i]->expr_order()]->with_callback(callback); + } } return Status::OK(); } -/** - Disable meaningless filters, such as filters: - RF1: col1 in (1, 3, 5) - RF2: col1 min: 1, max: 5 - We consider RF2 is meaningless, because RF1 has already filtered out all values that RF2 can filter. -*/ -Status RuntimeFilterSlots::_disable_meaningless_filters(RuntimeState* state) { - // process ignore duplicate IN_FILTER - std::unordered_set<int> has_in_filter; - for (auto filter : _runtime_filters) { - filter->disable_meaningless_filters(has_in_filter, true); - } - - // process ignore filter when it has IN_FILTER on same expr - for (auto filter : _runtime_filters) { - filter->disable_meaningless_filters(has_in_filter, false); +Status RuntimeFilterSlots::send_filter_size( + RuntimeState* state, uint64_t hash_table_size, + std::shared_ptr<pipeline::CountedFinishDependency> dependency) { + // TODO: dependency is not needed if `_skip_runtime_filters_process` is true + for (auto runtime_filter : _runtime_filters) { + RETURN_IF_ERROR(runtime_filter->send_size( + state, _skip_runtime_filters_process ? 0 : hash_table_size, dependency)); } return Status::OK(); } @@ -62,7 +66,7 @@ Status RuntimeFilterSlots::_disable_meaningless_filters(RuntimeState* state) { Status RuntimeFilterSlots::_init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto filter : _runtime_filters) { - RETURN_IF_ERROR(filter->init_with_size(local_hash_table_size)); + RETURN_IF_ERROR(filter->init(local_hash_table_size)); } return Status::OK(); } @@ -70,51 +74,51 @@ Status RuntimeFilterSlots::_init_filters(RuntimeState* state, uint64_t local_has void RuntimeFilterSlots::_insert(const vectorized::Block* block, size_t start) { SCOPED_TIMER(_runtime_filter_compute_timer); for (auto& filter : _runtime_filters) { + if (!filter->impl()->is_valid()) { + // Skip building if ignored or disabled. + continue; + } int result_column_id = _build_expr_context[filter->expr_order()]->get_last_result_column_id(); const auto& column = block->get_by_position(result_column_id).column; - filter->insert_batch(column, start); + filter->insert(column, start); } } Status RuntimeFilterSlots::process( RuntimeState* state, const vectorized::Block* block, - std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency) { - if (_skip_runtime_filters_process) { - return Status::OK(); - } - - auto wrapper_state = RuntimeFilterWrapper::State::READY; - if (state->get_task()->wake_up_early()) { - // partitial ignore rf to make global rf work + std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency, + vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) { + auto wrapper_state = _skip_runtime_filters_process ? RuntimeFilterWrapper::State::DISABLED + : RuntimeFilterWrapper::State::READY; + if (state->get_task()->wake_up_early() && !_skip_runtime_filters_process) { + // Runtime filter is ignored partially which has no effect on correctness. wrapper_state = RuntimeFilterWrapper::State::IGNORED; - } else if (_should_build_hash_table) { + } else if (_should_build_hash_table && !_skip_runtime_filters_process) { + // Hash table is completed and runtime filter has a global size now. uint64_t hash_table_size = block ? block->rows() : 0; - { - RETURN_IF_ERROR(_init_filters(state, hash_table_size)); - RETURN_IF_ERROR(_disable_meaningless_filters(state)); - } + RETURN_IF_ERROR(_init_filters(state, hash_table_size)); if (hash_table_size > 1) { _insert(block, 1); } } for (auto filter : _runtime_filters) { - filter->set_wrapper_state_and_ready_to_publish(wrapper_state); + if (shared_hash_table_ctx && _should_build_hash_table) { + filter->copy_to_shared_context(shared_hash_table_ctx); + } else if (shared_hash_table_ctx) { + filter->copy_from_shared_context(shared_hash_table_ctx); + } + if (_should_build_hash_table) { + filter->set_wrapper_state_and_ready_to_publish( + wrapper_state, _skip_runtime_filters_process ? "skip all rf process" : ""); + } else { + filter->set_state(RuntimeFilterProducer::State::READY_TO_PUBLISH); + } } RETURN_IF_ERROR(_publish(state)); return Status::OK(); } -Status RuntimeFilterSlots::skip_runtime_filters_process( - RuntimeState* state, std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency) { - RETURN_IF_ERROR(send_filter_size(state, 0, finish_dependency)); - for (auto filter : _runtime_filters) { - filter->disable_and_ready_to_publish("skip all rf process"); - } - RETURN_IF_ERROR(_publish(state)); - _skip_runtime_filters_process = true; - return Status::OK(); -} } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_slots.h b/be/src/runtime_filter/runtime_filter_slots.h index 7f76e33d250..166a012f7e2 100644 --- a/be/src/runtime_filter/runtime_filter_slots.h +++ b/be/src/runtime_filter/runtime_filter_slots.h @@ -28,74 +28,49 @@ namespace doris { // this class used in hash join node +/** + * init -> (skip_runtime_filters ->) send_filter_size -> process + */ class RuntimeFilterSlots { public: RuntimeFilterSlots(const vectorized::VExprContextSPtrs& build_expr_ctxs, - RuntimeProfile* profile, bool should_build_hash_table) + RuntimeProfile* profile, bool should_build_hash_table, + bool is_broadcast_join) : _build_expr_context(build_expr_ctxs), _should_build_hash_table(should_build_hash_table), - _profile(new RuntimeProfile("RuntimeFilterSlots")) { + _profile(new RuntimeProfile("RuntimeFilterSlots")), + _is_broadcast_join(is_broadcast_join) { profile->add_child(_profile.get(), true, nullptr); _publish_runtime_filter_timer = ADD_TIMER_WITH_LEVEL(_profile, "PublishTime", 1); _runtime_filter_compute_timer = ADD_TIMER_WITH_LEVEL(_profile, "BuildTime", 1); } - - Status init(RuntimeState* state, const std::vector<TRuntimeFilterDesc>& runtime_filter_descs) { - _runtime_filters.resize(runtime_filter_descs.size()); - for (size_t i = 0; i < runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->register_producer_runtime_filter( - runtime_filter_descs[i], &_runtime_filters[i], _profile.get())); - } - return Status::OK(); - } - + Status init(RuntimeState* state, const std::vector<TRuntimeFilterDesc>& runtime_filter_descs); Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, std::shared_ptr<pipeline::CountedFinishDependency> dependency); - - Status skip_runtime_filters_process( - RuntimeState* state, - std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency); - + void skip_runtime_filters() { _skip_runtime_filters_process = true; } Status process(RuntimeState* state, const vectorized::Block* block, - std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency); - - void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { - for (auto& filter : _runtime_filters) { - filter->copy_to_shared_context(context); - } - } - - Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) { - for (auto& filter : _runtime_filters) { - filter->copy_from_shared_context(context); - } - return Status::OK(); - } + std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency, + vectorized::SharedHashTableContextPtr& shared_hash_table_ctx); protected: - Status _disable_meaningless_filters(RuntimeState* state); Status _init_filters(RuntimeState* state, uint64_t local_hash_table_size); void _insert(const vectorized::Block* block, size_t start); Status _publish(RuntimeState* state) { - if (_skip_runtime_filters_process) { - return Status::OK(); - } SCOPED_TIMER(_publish_runtime_filter_timer); for (auto& filter : _runtime_filters) { - RETURN_IF_ERROR(filter->publish(state, !_should_build_hash_table)); + RETURN_IF_ERROR(filter->publish(state, _should_build_hash_table)); } return Status::OK(); } const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context; std::vector<std::shared_ptr<RuntimeFilterProducer>> _runtime_filters; - bool _should_build_hash_table; - + const bool _should_build_hash_table; RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; std::unique_ptr<RuntimeProfile> _profile; - bool _skip_runtime_filters_process = false; + const bool _is_broadcast_join; }; } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_slots_cross.h b/be/src/runtime_filter/runtime_filter_slots_cross.h index 74a5fc6c2d8..f66f5537224 100644 --- a/be/src/runtime_filter/runtime_filter_slots_cross.h +++ b/be/src/runtime_filter/runtime_filter_slots_cross.h @@ -33,8 +33,8 @@ namespace doris { class RuntimeFilterSlotsCross : public RuntimeFilterSlots { public: RuntimeFilterSlotsCross(const vectorized::VExprContextSPtrs& build_expr_ctxs, - RuntimeProfile* profile, bool should_build_hash_table) - : RuntimeFilterSlots(build_expr_ctxs, profile, should_build_hash_table) {} + RuntimeProfile* profile) + : RuntimeFilterSlots(build_expr_ctxs, profile, true, false) {} Status process(RuntimeState* state, vectorized::Blocks& blocks) { for (auto& block : blocks) { diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h b/be/src/runtime_filter/runtime_filter_wrapper.h index c51ad4484aa..b41e72c7cea 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.h +++ b/be/src/runtime_filter/runtime_filter_wrapper.h @@ -45,6 +45,7 @@ public: Status change_to_bloom_filter(); + bool is_valid() const { return _state != State::DISABLED && _state != State::IGNORED; } int filter_id() const { return _filter_id; } int max_in_num() const { return _max_in_num; } @@ -145,19 +146,16 @@ public: std::string debug_string() const; - void set_state(State state) { - DCHECK(state != State::DISABLED); + void set_state(State state, std::string reason = "") { if (_state == State::DISABLED) { return; + } else if (state == State::DISABLED) { + _disabled_reason = reason; } - _state = state; } - void disable(std::string reason) { - _state = State::DISABLED; - _disabled_reason = reason; - } + void disable(std::string reason) { set_state(State::DISABLED, reason); } State get_state() const { return _state; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org