This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch cp_0114 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0892c7ecdc15e7678e2c189e10a4c3c57fd56fb4 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Mon Dec 30 19:46:18 2024 +0800 [test](join) Fuzzy disable runtime filters in BE (#45654) Add the functionality to disable the runtime filter, in preparation for the spill feature. --- be/src/exprs/runtime_filter.cpp | 66 ++++++++++++++++------ be/src/exprs/runtime_filter.h | 3 + be/src/exprs/runtime_filter_slots.h | 27 ++++++--- be/src/pipeline/exec/hashjoin_build_sink.cpp | 51 +++++++++++++++-- be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++ be/src/runtime/runtime_filter_mgr.cpp | 4 +- be/src/runtime/runtime_state.h | 5 ++ be/src/vec/runtime/shared_hash_table_controller.h | 1 + .../java/org/apache/doris/qe/SessionVariable.java | 12 ++++ gensrc/proto/internal_service.proto | 6 ++ gensrc/thrift/PaloInternalService.thrift | 9 ++- 11 files changed, 154 insertions(+), 34 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 80d9494ac9e..348bea00a01 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -474,9 +474,15 @@ public: const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { - if (wrapper->is_ignored()) { + if (wrapper->is_disabled()) { + set_disabled(); return Status::OK(); } + + if (wrapper->is_ignored() || is_disabled()) { + return Status::OK(); + } + _context->ignored = false; bool can_not_merge_in_or_bloom = @@ -957,6 +963,10 @@ public: void set_ignored() { _context->ignored = true; } + bool is_disabled() const { return _context->disabled; } + + void set_disabled() { _context->disabled = true; } + void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, PColumnValue&, ObjectPool*)) { @@ -1217,9 +1227,10 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms()); merge_filter_callback->cntl_->ignore_eovercrowded(); - if (get_ignored()) { + if (get_ignored() || get_disabled()) { merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER); - merge_filter_request->set_ignored(true); + merge_filter_request->set_ignored(get_ignored()); + merge_filter_request->set_disabled(get_disabled()); } else { RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len)); } @@ -1241,7 +1252,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr bool is_late_arrival) { DCHECK(is_consumer()); auto origin_size = push_exprs.size(); - if (!_wrapper->is_ignored()) { + if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) { _set_push_down(!is_late_arrival); RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); } @@ -1367,12 +1378,16 @@ PrimitiveType IRuntimeFilter::column_type() const { void IRuntimeFilter::signal() { DCHECK(is_consumer()); - if (_enable_pipeline_exec) { - _rf_state_atomic.store(RuntimeFilterState::READY); - if (!_filter_timer.empty()) { - for (auto& timer : _filter_timer) { - timer->call_ready(); - } + if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() && + !_wrapper->get_bloomfilter()->inited()) { + throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored/disabled, rf: {}", + debug_string()); + } + + _rf_state_atomic.store(RuntimeFilterState::READY); + if (!_filter_timer.empty()) { + for (auto& timer : _filter_timer) { + timer->call_ready(); } } else { std::unique_lock lock(_inner_mutex); @@ -1420,16 +1435,21 @@ bool IRuntimeFilter::get_ignored() { return _wrapper->is_ignored(); } -std::string IRuntimeFilter::formatted_state() const { - return fmt::format( - "[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " - "HasLocalTarget = {}, Ignored = {}]", - _is_push_down, _get_explain_state_string(), _has_remote_target, _has_local_target, - _wrapper->_context->ignored); +void IRuntimeFilter::set_disabled() { + _wrapper->set_disabled(); +} + +bool IRuntimeFilter::get_disabled() const { + return _wrapper->is_disabled(); } -BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { - return _wrapper->get_bloomfilter(); +std::string IRuntimeFilter::formatted_state() const { + return fmt::format( + "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " + "HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]", + _filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target, + _has_local_target, _wrapper->_context->ignored, _wrapper->_context->disabled, + _wrapper->get_real_type(), wait_time_ms()); } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, @@ -1536,6 +1556,11 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, *wrapper = param->pool->add(new RuntimePredicateWrapper( param->pool, column_type, get_type(filter_type), param->request->filter_id())); + if (param->request->has_disabled() && param->request->disabled()) { + (*wrapper)->set_disabled(); + return Status::OK(); + } + if (param->request->has_ignored() && param->request->ignored()) { (*wrapper)->set_ignored(); return Status::OK(); @@ -1582,6 +1607,11 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, *wrapper = std::make_unique<RuntimePredicateWrapper>(pool, column_type, get_type(filter_type), param->request->filter_id()); + if (param->request->has_disabled() && param->request->disabled()) { + (*wrapper)->set_disabled(); + return Status::OK(); + } + if (param->request->has_ignored() && param->request->ignored()) { (*wrapper)->set_ignored(); return Status::OK(); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index b71bbd0648c..6f7695796bf 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -304,6 +304,9 @@ public: bool get_ignored(); + void set_disabled(); + bool get_disabled() const; + RuntimeFilterType get_real_type(); bool need_sync_filter_size(); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 2b9773ce89f..c06ac946283 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -67,11 +67,17 @@ public: : hash_table_size; } - Status ignore_filters(RuntimeState* state) { + /** + Disable meaningless filters, such as filters: + RF1: col1 in (1, 3, 5) + RF2: col1 min: 1, max: 5 + We consider RF2 is meaningless, because RF1 has already filtered out all values that RF2 can filter. + */ + Status disable_meaningless_filters(RuntimeState* state) { // process ignore duplicate IN_FILTER std::unordered_set<int> has_in_filter; - for (auto* filter : _runtime_filters) { - if (filter->get_ignored()) { + for (auto filter : _runtime_filters) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) { @@ -82,7 +88,7 @@ public: continue; } if (has_in_filter.contains(filter->expr_order())) { - filter->set_ignored(); + filter->set_disabled(); continue; } has_in_filter.insert(filter->expr_order()); @@ -90,14 +96,14 @@ public: // process ignore filter when it has IN_FILTER on same expr for (auto filter : _runtime_filters) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } if (filter->get_real_type() == RuntimeFilterType::IN_FILTER || !has_in_filter.contains(filter->expr_order())) { continue; } - filter->set_ignored(); + filter->set_disabled(); } return Status::OK(); } @@ -109,6 +115,13 @@ public: return Status::OK(); } + Status disable_all_filters() { + for (auto filter : _runtime_filters) { + filter->set_disabled(); + } + return Status::OK(); + } + Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto* filter : _runtime_filters) { @@ -139,7 +152,7 @@ public: int result_column_id = _build_expr_context[i]->get_last_result_column_id(); const auto& column = block->get_by_position(result_column_id).column; for (auto* filter : iter->second) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } filter->insert_batch(column, 1); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 87c8d7caea3..a61d6162b79 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -17,6 +17,7 @@ #include "hashjoin_build_sink.h" +#include <cstdlib> #include <string> #include "exprs/bloom_filter_func.h" @@ -119,6 +120,15 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state)); + +#ifndef NDEBUG + if (state->fuzzy_disable_runtime_filter_in_be()) { + if ((_parent->operator_id() + random()) % 2 == 0) { + RETURN_IF_ERROR(disable_runtime_filters(state)); + } + } +#endif + return Status::OK(); } @@ -137,7 +147,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) { + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos || + _runtime_filters_disabled) { return Base::close(state, exec_status); } @@ -152,7 +163,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu { SCOPED_TIMER(_runtime_filter_init_timer); RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); - RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + RETURN_IF_ERROR(_runtime_filter_slots->disable_meaningless_filters(state)); } if (hash_table_size > 1) { SCOPED_TIMER(_runtime_filter_compute_timer); @@ -181,6 +192,33 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu return Base::close(state, exec_status); } +Status HashJoinBuildSinkLocalState::disable_runtime_filters(RuntimeState* state) { + if (_runtime_filters_disabled) { + return Status::OK(); + } + + if (_runtime_filters.empty()) { + return Status::OK(); + } + + if (!_should_build_hash_table) { + return Status::OK(); + } + + if (_runtime_filters.empty()) { + return Status::OK(); + } + + DCHECK(_runtime_filter_slots) << "_runtime_filter_slots should be initialized"; + + _runtime_filters_disabled = true; + RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); + RETURN_IF_ERROR(_runtime_filter_slots->disable_all_filters()); + + SCOPED_TIMER(_publish_runtime_filter_timer); + return _runtime_filter_slots->publish(state, !_should_build_hash_table); +} + bool HashJoinBuildSinkLocalState::build_unique() const { return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique; } @@ -605,9 +643,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_block = std::make_shared<vectorized::Block>( local_state._build_side_mutable_block.to_block()); - RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size( - state, local_state._shared_state->build_block->rows(), - local_state._finish_dependency)); + if (!local_state._runtime_filters_disabled) { + RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size( + state, local_state._shared_state->build_block->rows(), + local_state._finish_dependency)); + } + RETURN_IF_ERROR( local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 3b55dc5e44d..60251d33055 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -74,6 +74,8 @@ public: Status close(RuntimeState* state, Status exec_status) override; + Status disable_runtime_filters(RuntimeState* state); + protected: void _hash_table_init(RuntimeState* state); void _set_build_ignore_flag(vectorized::Block& block, const std::vector<int>& res_col_ids); @@ -97,6 +99,8 @@ protected: int64_t _build_side_mem_used = 0; int64_t _build_side_last_mem_used = 0; + bool _runtime_filters_disabled = false; + size_t _build_side_rows = 0; std::vector<vectorized::Block> _build_blocks; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 640cece8fb3..89975973597 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -456,7 +456,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ void* data = nullptr; int len = 0; bool has_attachment = false; - if (!cnt_val->filter->get_ignored()) { + if (!cnt_val->filter->get_ignored() && !cnt_val->filter->get_disabled()) { RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); } else { apply_request.set_ignored(true); @@ -535,7 +535,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ void* data = nullptr; int len = 0; bool has_attachment = false; - if (!cnt_val->filter->get_ignored()) { + if (!cnt_val->filter->get_ignored() && !cnt_val->filter->get_disabled()) { RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); } else { apply_request.set_ignored(true); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 96633f7215e..4deb8babd10 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -652,6 +652,11 @@ public: return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill; } + bool fuzzy_disable_runtime_filter_in_be() const { + return _query_options.__isset.fuzzy_disable_runtime_filter_in_be && + _query_options.fuzzy_disable_runtime_filter_in_be; + } + int64_t min_revocable_mem() const { if (_query_options.__isset.min_revocable_mem) { return _query_options.min_revocable_mem; diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index aba441f282a..ea26333a3aa 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -46,6 +46,7 @@ struct RuntimeFilterContext { std::shared_ptr<BloomFilterFuncBase> bloom_filter_func; std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func; bool ignored = false; + bool disabled = false; std::string err_msg; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index a65f750d6db..4a97b83f293 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -542,6 +542,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_FORCE_SPILL = "enable_force_spill"; public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks"; + public static final String FUZZY_DISABLE_RUNTIME_FILTER_IN_BE = "fuzzy_disable_runtime_filter_in_be"; + public static final String GENERATE_STATS_FACTOR = "generate_stats_factor"; public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS @@ -2128,6 +2130,13 @@ public class SessionVariable implements Serializable, Writable { needForward = true, fuzzy = true) public long dataQueueMaxBlocks = 1; + @VariableMgr.VarAttr( + name = FUZZY_DISABLE_RUNTIME_FILTER_IN_BE, + description = {"在 BE 上开启禁用 runtime filter 的随机开关,用于测试", + "Disable the runtime filter on the BE for testing purposes."}, + needForward = true, fuzzy = false) + public boolean fuzzyDisableRuntimeFilterInBE = false; + // If the memory consumption of sort node exceed this limit, will trigger spill to disk; // Set to 0 to disable; min: 128M public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152; @@ -2376,6 +2385,8 @@ public class SessionVariable implements Serializable, Writable { this.batchSize = 50; this.enableFoldConstantByBe = false; } + + this.fuzzyDisableRuntimeFilterInBE = true; } } @@ -3781,6 +3792,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableForceSpill(enableForceSpill); tResult.setMinRevocableMem(minRevocableMem); tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); + tResult.setFuzzyDisableRuntimeFilterInBe(fuzzyDisableRuntimeFilterInBE); tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull); tResult.setSerdeDialect(getSerdeDialect()); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 72b11e6e2ed..1ea3b798f4a 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -573,6 +573,8 @@ message PMergeFilterRequest { optional PColumnType column_type = 10; optional bool contain_null = 11; optional bool ignored = 12; + optional uint64 local_merge_time = 13; + optional bool disabled = 14; }; message PMergeFilterResponse { @@ -593,6 +595,7 @@ message PPublishFilterRequest { optional PColumnType column_type = 10; optional bool contain_null = 11; optional bool ignored = 12; + optional bool disabled = 13; }; message PPublishFilterRequestV2 { @@ -607,6 +610,9 @@ message PPublishFilterRequestV2 { optional int64 merge_time = 9; optional bool contain_null = 10; optional bool ignored = 11; + repeated int32 fragment_ids = 12; + optional uint64 local_merge_time = 13; + optional bool disabled = 14; }; message PPublishFilterResponse { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b0e3ad456fc..074619258f1 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -339,8 +339,13 @@ struct TQueryOptions { 139: optional i64 orc_once_max_read_bytes = 8388608; 140: optional i64 orc_max_merge_distance_bytes = 1048576; - // upgrade options. keep them same in every branch. - 200: optional bool new_is_ip_address_in_range = false; + 141: optional bool ignore_runtime_filter_error = false; + 142: optional bool enable_fixed_len_to_uint32_v2 = false; + 143: optional bool enable_shared_exchange_sink_buffer = true; + + 144: optional bool enable_inverted_index_searcher_cache = true; + 145: optional bool enable_inverted_index_query_cache = true; + 146: optional bool fuzzy_disable_runtime_filter_in_be = false; // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org