This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 989f1894869 [test](join) Fuzzy disable runtime filters in BE #45654 (#46931) 989f1894869 is described below commit 989f1894869b5208aa8f2075b096fb2faf5fce03 Author: Pxl <x...@selectdb.com> AuthorDate: Tue Jan 14 12:46:56 2025 +0800 [test](join) Fuzzy disable runtime filters in BE #45654 (#46931) pick from #45654 --------- Co-authored-by: Jerry Hu <hushengg...@selectdb.com> --- be/src/exprs/runtime_filter.cpp | 59 +++++++++++++++------- be/src/exprs/runtime_filter.h | 3 ++ be/src/exprs/runtime_filter_slots.h | 27 +++++++--- be/src/pipeline/exec/hashjoin_build_sink.cpp | 51 +++++++++++++++++-- be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++ be/src/runtime/runtime_filter_mgr.cpp | 4 +- be/src/runtime/runtime_state.h | 5 ++ be/src/vec/exec/join/vhash_join_node.h | 2 +- be/src/vec/runtime/shared_hash_table_controller.h | 1 + .../java/org/apache/doris/qe/SessionVariable.java | 12 +++++ gensrc/proto/internal_service.proto | 6 +++ gensrc/thrift/PaloInternalService.thrift | 2 + 12 files changed, 142 insertions(+), 34 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 80d9494ac9e..5958fdf2724 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -474,9 +474,15 @@ public: const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { - if (wrapper->is_ignored()) { + if (wrapper->is_disabled()) { + set_disabled(); return Status::OK(); } + + if (wrapper->is_ignored() || is_disabled()) { + return Status::OK(); + } + _context->ignored = false; bool can_not_merge_in_or_bloom = @@ -495,15 +501,9 @@ public: switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - if (!_context->hybrid_set) { - _context->ignored = true; - return Status::OK(); - } _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { - _context->ignored = true; - // release in filter - _context->hybrid_set.reset(); + set_disabled(); } break; } @@ -957,6 +957,10 @@ public: void set_ignored() { _context->ignored = true; } + bool is_disabled() const { return _context->disabled; } + + void set_disabled() { _context->disabled = true; } + void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, PColumnValue&, ObjectPool*)) { @@ -1217,9 +1221,10 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms()); merge_filter_callback->cntl_->ignore_eovercrowded(); - if (get_ignored()) { + if (get_ignored() || get_disabled()) { merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER); - merge_filter_request->set_ignored(true); + merge_filter_request->set_ignored(get_ignored()); + merge_filter_request->set_disabled(get_disabled()); } else { RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len)); } @@ -1241,7 +1246,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr bool is_late_arrival) { DCHECK(is_consumer()); auto origin_size = push_exprs.size(); - if (!_wrapper->is_ignored()) { + if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) { _set_push_down(!is_late_arrival); RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); } @@ -1367,6 +1372,7 @@ PrimitiveType IRuntimeFilter::column_type() const { void IRuntimeFilter::signal() { DCHECK(is_consumer()); + if (_enable_pipeline_exec) { _rf_state_atomic.store(RuntimeFilterState::READY); if (!_filter_timer.empty()) { @@ -1420,16 +1426,21 @@ bool IRuntimeFilter::get_ignored() { return _wrapper->is_ignored(); } -std::string IRuntimeFilter::formatted_state() const { - return fmt::format( - "[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " - "HasLocalTarget = {}, Ignored = {}]", - _is_push_down, _get_explain_state_string(), _has_remote_target, _has_local_target, - _wrapper->_context->ignored); +void IRuntimeFilter::set_disabled() { + _wrapper->set_disabled(); } -BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { - return _wrapper->get_bloomfilter(); +bool IRuntimeFilter::get_disabled() const { + return _wrapper->is_disabled(); +} + +std::string IRuntimeFilter::formatted_state() const { + return fmt::format( + "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " + "HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]", + _filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target, + _has_local_target, _wrapper->_context->ignored, _wrapper->_context->disabled, + _wrapper->get_real_type(), wait_time_ms()); } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, @@ -1536,6 +1547,11 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, *wrapper = param->pool->add(new RuntimePredicateWrapper( param->pool, column_type, get_type(filter_type), param->request->filter_id())); + if (param->request->has_disabled() && param->request->disabled()) { + (*wrapper)->set_disabled(); + return Status::OK(); + } + if (param->request->has_ignored() && param->request->ignored()) { (*wrapper)->set_ignored(); return Status::OK(); @@ -1582,6 +1598,11 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, *wrapper = std::make_unique<RuntimePredicateWrapper>(pool, column_type, get_type(filter_type), param->request->filter_id()); + if (param->request->has_disabled() && param->request->disabled()) { + (*wrapper)->set_disabled(); + return Status::OK(); + } + if (param->request->has_ignored() && param->request->ignored()) { (*wrapper)->set_ignored(); return Status::OK(); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index b71bbd0648c..6f7695796bf 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -304,6 +304,9 @@ public: bool get_ignored(); + void set_disabled(); + bool get_disabled() const; + RuntimeFilterType get_real_type(); bool need_sync_filter_size(); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 2b9773ce89f..c06ac946283 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -67,11 +67,17 @@ public: : hash_table_size; } - Status ignore_filters(RuntimeState* state) { + /** + Disable meaningless filters, such as filters: + RF1: col1 in (1, 3, 5) + RF2: col1 min: 1, max: 5 + We consider RF2 is meaningless, because RF1 has already filtered out all values that RF2 can filter. + */ + Status disable_meaningless_filters(RuntimeState* state) { // process ignore duplicate IN_FILTER std::unordered_set<int> has_in_filter; - for (auto* filter : _runtime_filters) { - if (filter->get_ignored()) { + for (auto filter : _runtime_filters) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) { @@ -82,7 +88,7 @@ public: continue; } if (has_in_filter.contains(filter->expr_order())) { - filter->set_ignored(); + filter->set_disabled(); continue; } has_in_filter.insert(filter->expr_order()); @@ -90,14 +96,14 @@ public: // process ignore filter when it has IN_FILTER on same expr for (auto filter : _runtime_filters) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } if (filter->get_real_type() == RuntimeFilterType::IN_FILTER || !has_in_filter.contains(filter->expr_order())) { continue; } - filter->set_ignored(); + filter->set_disabled(); } return Status::OK(); } @@ -109,6 +115,13 @@ public: return Status::OK(); } + Status disable_all_filters() { + for (auto filter : _runtime_filters) { + filter->set_disabled(); + } + return Status::OK(); + } + Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto* filter : _runtime_filters) { @@ -139,7 +152,7 @@ public: int result_column_id = _build_expr_context[i]->get_last_result_column_id(); const auto& column = block->get_by_position(result_column_id).column; for (auto* filter : iter->second) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } filter->insert_batch(column, 1); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 87c8d7caea3..c764b8d1a73 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -17,6 +17,7 @@ #include "hashjoin_build_sink.h" +#include <cstdlib> #include <string> #include "exprs/bloom_filter_func.h" @@ -119,6 +120,15 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state)); + +#ifndef NDEBUG + if (state->fuzzy_disable_runtime_filter_in_be()) { + if ((_parent->operator_id() + random()) % 2 == 0) { + RETURN_IF_ERROR(disable_runtime_filters(state)); + } + } +#endif + return Status::OK(); } @@ -137,7 +147,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) { + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos || + _runtime_filters_disabled) { return Base::close(state, exec_status); } @@ -152,7 +163,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu { SCOPED_TIMER(_runtime_filter_init_timer); RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); - RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + RETURN_IF_ERROR(_runtime_filter_slots->disable_meaningless_filters(state)); } if (hash_table_size > 1) { SCOPED_TIMER(_runtime_filter_compute_timer); @@ -181,6 +192,33 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu return Base::close(state, exec_status); } +Status HashJoinBuildSinkLocalState::disable_runtime_filters(RuntimeState* state) { + if (_runtime_filters_disabled) { + return Status::OK(); + } + + if (_runtime_filters.empty()) { + return Status::OK(); + } + + if (!_should_build_hash_table) { + return Status::OK(); + } + + if (_runtime_filters.empty()) { + return Status::OK(); + } + + DCHECK(_runtime_filter_slots) << "_runtime_filter_slots should be initialized"; + + _runtime_filters_disabled = true; + RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); + RETURN_IF_ERROR(_runtime_filter_slots->disable_all_filters()); + + SCOPED_TIMER(_publish_runtime_filter_timer); + return _runtime_filter_slots->publish(!_should_build_hash_table); +} + bool HashJoinBuildSinkLocalState::build_unique() const { return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique; } @@ -605,9 +643,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_block = std::make_shared<vectorized::Block>( local_state._build_side_mutable_block.to_block()); - RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size( - state, local_state._shared_state->build_block->rows(), - local_state._finish_dependency)); + if (!local_state._runtime_filters_disabled) { + RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size( + state, local_state._shared_state->build_block->rows(), + local_state._finish_dependency)); + } + RETURN_IF_ERROR( local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 3b55dc5e44d..60251d33055 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -74,6 +74,8 @@ public: Status close(RuntimeState* state, Status exec_status) override; + Status disable_runtime_filters(RuntimeState* state); + protected: void _hash_table_init(RuntimeState* state); void _set_build_ignore_flag(vectorized::Block& block, const std::vector<int>& res_col_ids); @@ -97,6 +99,8 @@ protected: int64_t _build_side_mem_used = 0; int64_t _build_side_last_mem_used = 0; + bool _runtime_filters_disabled = false; + size_t _build_side_rows = 0; std::vector<vectorized::Block> _build_blocks; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 640cece8fb3..89975973597 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -456,7 +456,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ void* data = nullptr; int len = 0; bool has_attachment = false; - if (!cnt_val->filter->get_ignored()) { + if (!cnt_val->filter->get_ignored() && !cnt_val->filter->get_disabled()) { RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); } else { apply_request.set_ignored(true); @@ -535,7 +535,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ void* data = nullptr; int len = 0; bool has_attachment = false; - if (!cnt_val->filter->get_ignored()) { + if (!cnt_val->filter->get_ignored() && !cnt_val->filter->get_disabled()) { RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); } else { apply_request.set_ignored(true); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 96633f7215e..4deb8babd10 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -652,6 +652,11 @@ public: return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill; } + bool fuzzy_disable_runtime_filter_in_be() const { + return _query_options.__isset.fuzzy_disable_runtime_filter_in_be && + _query_options.fuzzy_disable_runtime_filter_in_be; + } + int64_t min_revocable_mem() const { if (_query_options.__isset.min_revocable_mem) { return _query_options.min_revocable_mem; diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index f23a992049f..5cfac9f9aba 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -82,7 +82,7 @@ Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* p { SCOPED_TIMER(parent->_runtime_filter_init_timer); RETURN_IF_ERROR(parent->_runtime_filter_slots->init_filters(state, rows)); - RETURN_IF_ERROR(parent->_runtime_filter_slots->ignore_filters(state)); + RETURN_IF_ERROR(parent->_runtime_filter_slots->disable_meaningless_filters(state)); } if (!parent->_runtime_filter_slots->empty() && rows > 1) { diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index aba441f282a..ea26333a3aa 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -46,6 +46,7 @@ struct RuntimeFilterContext { std::shared_ptr<BloomFilterFuncBase> bloom_filter_func; std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func; bool ignored = false; + bool disabled = false; std::string err_msg; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 894de847158..f68fd1423c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -542,6 +542,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_FORCE_SPILL = "enable_force_spill"; public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks"; + public static final String FUZZY_DISABLE_RUNTIME_FILTER_IN_BE = "fuzzy_disable_runtime_filter_in_be"; + public static final String GENERATE_STATS_FACTOR = "generate_stats_factor"; public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS @@ -2128,6 +2130,13 @@ public class SessionVariable implements Serializable, Writable { needForward = true, fuzzy = true) public long dataQueueMaxBlocks = 1; + @VariableMgr.VarAttr( + name = FUZZY_DISABLE_RUNTIME_FILTER_IN_BE, + description = {"在 BE 上开启禁用 runtime filter 的随机开关,用于测试", + "Disable the runtime filter on the BE for testing purposes."}, + needForward = true, fuzzy = false) + public boolean fuzzyDisableRuntimeFilterInBE = false; + // If the memory consumption of sort node exceed this limit, will trigger spill to disk; // Set to 0 to disable; min: 128M public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152; @@ -2376,6 +2385,8 @@ public class SessionVariable implements Serializable, Writable { this.batchSize = 50; this.enableFoldConstantByBe = false; } + + this.fuzzyDisableRuntimeFilterInBE = true; } } @@ -3781,6 +3792,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableForceSpill(enableForceSpill); tResult.setMinRevocableMem(minRevocableMem); tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); + tResult.setFuzzyDisableRuntimeFilterInBe(fuzzyDisableRuntimeFilterInBE); tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull); tResult.setSerdeDialect(getSerdeDialect()); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 72b11e6e2ed..1ea3b798f4a 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -573,6 +573,8 @@ message PMergeFilterRequest { optional PColumnType column_type = 10; optional bool contain_null = 11; optional bool ignored = 12; + optional uint64 local_merge_time = 13; + optional bool disabled = 14; }; message PMergeFilterResponse { @@ -593,6 +595,7 @@ message PPublishFilterRequest { optional PColumnType column_type = 10; optional bool contain_null = 11; optional bool ignored = 12; + optional bool disabled = 13; }; message PPublishFilterRequestV2 { @@ -607,6 +610,9 @@ message PPublishFilterRequestV2 { optional int64 merge_time = 9; optional bool contain_null = 10; optional bool ignored = 11; + repeated int32 fragment_ids = 12; + optional uint64 local_merge_time = 13; + optional bool disabled = 14; }; message PPublishFilterResponse { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b0e3ad456fc..c612826836e 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -339,6 +339,8 @@ struct TQueryOptions { 139: optional i64 orc_once_max_read_bytes = 8388608; 140: optional i64 orc_max_merge_distance_bytes = 1048576; + 146: optional bool fuzzy_disable_runtime_filter_in_be = false; + // upgrade options. keep them same in every branch. 200: optional bool new_is_ip_address_in_range = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org