This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 25d1934289be3337e8af04284ae1a655f7962126 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Mar 4 16:58:11 2024 +0800 [Feature](topn) support multiple topn filter on backend (#31665) support multiple topn filter on backend --- be/src/olap/iterators.h | 1 + be/src/olap/rowset/beta_rowset_reader.cpp | 1 + be/src/olap/rowset/rowset_reader_context.h | 1 + be/src/olap/rowset/segment_v2/segment.cpp | 32 ++++++++-------- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 43 ++++++++++++---------- be/src/olap/tablet_reader.cpp | 9 +++-- be/src/olap/tablet_reader.h | 1 + be/src/pipeline/exec/sort_sink_operator.cpp | 6 +-- be/src/pipeline/pipeline_fragment_context.cpp | 15 ++++++-- .../pipeline_x/pipeline_x_fragment_context.cpp | 13 +++++-- be/src/runtime/fragment_mgr.cpp | 5 +-- be/src/runtime/query_context.h | 16 +++++++- be/src/runtime/runtime_state.cpp | 3 ++ be/src/vec/exec/scan/new_olap_scanner.cpp | 6 +++ be/src/vec/exec/vsort_node.cpp | 6 +-- be/src/vec/olap/vcollect_iterator.cpp | 4 +- gensrc/thrift/PaloInternalService.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 1 + 18 files changed, 109 insertions(+), 55 deletions(-) diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 5f7c32bf0d4..a7fb43cff58 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -100,6 +100,7 @@ public: bool record_rowids = false; // flag for enable topn opt bool use_topn_opt = false; + std::vector<int> topn_filter_source_node_ids; // used for special optimization for query : ORDER BY key DESC LIMIT n bool read_orderby_key_reverse = false; // columns for orderby keys diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 71002b69099..915b322bcaf 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -217,6 +217,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.tablet_schema = _read_context->tablet_schema; _read_options.record_rowids = _read_context->record_rowids; _read_options.use_topn_opt = _read_context->use_topn_opt; + _read_options.topn_filter_source_node_ids = _read_context->topn_filter_source_node_ids; _read_options.read_orderby_key_reverse = _read_context->read_orderby_key_reverse; _read_options.read_orderby_key_columns = _read_context->read_orderby_key_columns; _read_options.io_ctx.reader_type = _read_context->reader_type; diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 365f4a734f4..d5683924a9e 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -38,6 +38,7 @@ struct RowsetReaderContext { TabletSchemaSPtr tablet_schema = nullptr; // flag for enable topn opt bool use_topn_opt = false; + std::vector<int> topn_filter_source_node_ids; // whether rowset should return ordered rows. bool need_ordered_result = true; // used for special optimization for query : ORDER BY key DESC LIMIT n diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 5cee6017e1f..9afb1429ac7 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -151,21 +151,23 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o } if (read_options.use_topn_opt) { auto* query_ctx = read_options.runtime_state->get_query_ctx(); - auto runtime_predicate = query_ctx->get_runtime_predicate().get_predicate(); - - int32_t uid = - read_options.tablet_schema->column(runtime_predicate->column_id()).unique_id(); - AndBlockColumnPredicate and_predicate; - and_predicate.add_column_predicate( - SingleColumnBlockPredicate::create_unique(runtime_predicate.get())); - if (_column_readers.contains(uid) && - can_apply_predicate_safely(runtime_predicate->column_id(), runtime_predicate.get(), - *schema, read_options.io_ctx.reader_type) && - !_column_readers.at(uid)->match_condition(&and_predicate)) { - // any condition not satisfied, return. - *iter = std::make_unique<EmptySegmentIterator>(*schema); - read_options.stats->filtered_segment_number++; - return Status::OK(); + for (int id : read_options.topn_filter_source_node_ids) { + auto runtime_predicate = query_ctx->get_runtime_predicate(id).get_predicate(); + + int32_t uid = + read_options.tablet_schema->column(runtime_predicate->column_id()).unique_id(); + AndBlockColumnPredicate and_predicate; + and_predicate.add_column_predicate( + SingleColumnBlockPredicate::create_unique(runtime_predicate.get())); + if (_column_readers.contains(uid) && + can_apply_predicate_safely(runtime_predicate->column_id(), runtime_predicate.get(), + *schema, read_options.io_ctx.reader_type) && + !_column_readers.at(uid)->match_condition(&and_predicate)) { + // any condition not satisfied, return. + *iter = std::make_unique<EmptySegmentIterator>(*schema); + read_options.stats->filtered_segment_number++; + return Status::OK(); + } } } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index c9848f6f866..e8e1aa386a5 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -594,23 +594,25 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row if (_opts.use_topn_opt) { SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_zonemap_ns); auto* query_ctx = _opts.runtime_state->get_query_ctx(); - std::shared_ptr<doris::ColumnPredicate> runtime_predicate = - query_ctx->get_runtime_predicate().get_predicate(); - if (_segment->can_apply_predicate_safely(runtime_predicate->column_id(), - runtime_predicate.get(), *_schema, - _opts.io_ctx.reader_type)) { - AndBlockColumnPredicate and_predicate; - and_predicate.add_column_predicate( - SingleColumnBlockPredicate::create_unique(runtime_predicate.get())); - - RowRanges column_rp_row_ranges = RowRanges::create_single(num_rows()); - RETURN_IF_ERROR(_column_iterators[runtime_predicate->column_id()] - ->get_row_ranges_by_zone_map(&and_predicate, nullptr, - &column_rp_row_ranges)); - - // intersect different columns's row ranges to get final row ranges by zone map - RowRanges::ranges_intersection(zone_map_row_ranges, column_rp_row_ranges, - &zone_map_row_ranges); + for (int id : _opts.topn_filter_source_node_ids) { + std::shared_ptr<doris::ColumnPredicate> runtime_predicate = + query_ctx->get_runtime_predicate(id).get_predicate(); + if (_segment->can_apply_predicate_safely(runtime_predicate->column_id(), + runtime_predicate.get(), *_schema, + _opts.io_ctx.reader_type)) { + AndBlockColumnPredicate and_predicate; + and_predicate.add_column_predicate( + SingleColumnBlockPredicate::create_unique(runtime_predicate.get())); + + RowRanges column_rp_row_ranges = RowRanges::create_single(num_rows()); + RETURN_IF_ERROR(_column_iterators[runtime_predicate->column_id()] + ->get_row_ranges_by_zone_map(&and_predicate, nullptr, + &column_rp_row_ranges)); + + // intersect different columns's row ranges to get final row ranges by zone map + RowRanges::ranges_intersection(zone_map_row_ranges, column_rp_row_ranges, + &zone_map_row_ranges); + } } } @@ -1507,8 +1509,11 @@ Status SegmentIterator::_vec_init_lazy_materialization() { // all rows should be read, so runtime predicate will reduce rows for topn node if (_opts.use_topn_opt && (_opts.read_orderby_key_columns == nullptr || _opts.read_orderby_key_columns->empty())) { - auto& runtime_predicate = _opts.runtime_state->get_query_ctx()->get_runtime_predicate(); - _col_predicates.push_back(runtime_predicate.get_predicate().get()); + for (int id : _opts.topn_filter_source_node_ids) { + auto& runtime_predicate = + _opts.runtime_state->get_query_ctx()->get_runtime_predicate(id); + _col_predicates.push_back(runtime_predicate.get_predicate().get()); + } } // Step1: extract columns that can be lazy materialization diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 86823ac36c7..f0229431b7b 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -234,6 +234,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.tablet_schema = _tablet_schema; _reader_context.need_ordered_result = need_ordered_result; _reader_context.use_topn_opt = read_params.use_topn_opt; + _reader_context.topn_filter_source_node_ids = read_params.topn_filter_source_node_ids; _reader_context.read_orderby_key_reverse = read_params.read_orderby_key_reverse; _reader_context.read_orderby_key_limit = read_params.read_orderby_key_limit; _reader_context.filter_block_conjuncts = read_params.filter_block_conjuncts; @@ -574,9 +575,11 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode( } if (read_params.use_topn_opt) { - auto& runtime_predicate = - read_params.runtime_state->get_query_ctx()->get_runtime_predicate(); - runtime_predicate.set_tablet_schema(_tablet_schema); + for (int id : read_params.topn_filter_source_node_ids) { + auto& runtime_predicate = + read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id); + runtime_predicate.set_tablet_schema(_tablet_schema); + } } } diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index ed57b9e3d1e..6a560fac0f8 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -161,6 +161,7 @@ public: bool record_rowids = false; // flag for enable topn opt bool use_topn_opt = false; + std::vector<int> topn_filter_source_node_ids; // used for special optimization for query : ORDER BY key LIMIT n bool read_orderby_key = false; // used for special optimization for query : ORDER BY key DESC LIMIT n diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 854b6f165ca..e86f4915de3 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -104,7 +104,7 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { } for (auto* slot : tuple_desc->slots()) { if (slot->id() == first_sort_slot.slot_id) { - RETURN_IF_ERROR(query_ctx->get_runtime_predicate().init( + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).init( slot->type().type, _nulls_first[0], _is_asc_order[0], slot->col_name())); break; @@ -112,7 +112,7 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { } } } - if (!query_ctx->get_runtime_predicate().inited()) { + if (!query_ctx->get_runtime_predicate(_node_id).inited()) { return Status::InternalError("runtime predicate is not properly initialized"); } } @@ -160,7 +160,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in vectorized::Field new_top = local_state._shared_state->sorter->get_top_value(); if (!new_top.is_null() && new_top != local_state.old_top) { auto* query_ctx = state->get_query_ctx(); - RETURN_IF_ERROR(query_ctx->get_runtime_predicate().update(new_top)); + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top)); local_state.old_top = std::move(new_top); } } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 82b18e8bfa8..b4c5646402e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -244,9 +244,18 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state = RuntimeState::create_unique( local_params.fragment_instance_id, request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - if (idx == 0 && local_params.__isset.runtime_filter_params) { - _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( - local_params.runtime_filter_params); + if (idx == 0) { + if (local_params.__isset.runtime_filter_params) { + if (local_params.__isset.runtime_filter_params) { + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + local_params.runtime_filter_params); + } + } + if (local_params.__isset.topn_filter_source_node_ids) { + _query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids); + } else { + _query_ctx->init_runtime_predicates({0}); + } } _runtime_state->set_task_execution_context(shared_from_this()); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7362af1c0a0..733cdfe2b50 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -523,9 +523,16 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( // build local_runtime_filter_mgr for each instance runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(request.query_id, filterparams.get()); - if (i == 0 && local_params.__isset.runtime_filter_params) { - _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( - local_params.runtime_filter_params); + if (i == 0) { + if (local_params.__isset.runtime_filter_params) { + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + local_params.runtime_filter_params); + } + if (local_params.__isset.topn_filter_source_node_ids) { + _query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids); + } else { + _query_ctx->init_runtime_predicates({0}); + } } filterparams->runtime_filter_mgr = runtime_filter_mgr.get(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 29f4065ab3d..e7d4771d615 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -797,7 +797,6 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params); if (!prepare_st.ok()) { - LOG(WARNING) << "Prepare failed: " << prepare_st.to_string(); context->close_if_prepare_failed(); return prepare_st; } @@ -806,10 +805,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, for (size_t i = 0; i < params.local_params.size(); i++) { std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; - static_cast<void>(_runtimefilter_controller.add_entity( + RETURN_IF_ERROR(_runtimefilter_controller.add_entity( params.local_params[i], params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); - if (i == 0 and handler) { + if (!i && handler) { query_ctx->set_merge_controller_handler(handler); } const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index a7632a443bf..a7c855f4882 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -148,7 +148,19 @@ public: return _shared_scanner_controller; } - vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; } + vectorized::RuntimePredicate& get_runtime_predicate(int source_node_id) { + DCHECK(_runtime_predicates.contains(source_node_id) || _runtime_predicates.contains(0)); + if (_runtime_predicates.contains(source_node_id)) { + return _runtime_predicates[source_node_id]; + } + return _runtime_predicates[0]; + } + + void init_runtime_predicates(std::vector<int> source_node_ids) { + for (int id : source_node_ids) { + _runtime_predicates.try_emplace(id); + } + } Status set_task_group(taskgroup::TaskGroupPtr& tg); @@ -274,7 +286,7 @@ private: std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller; std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller; - vectorized::RuntimePredicate _runtime_predicate; + std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates; taskgroup::TaskGroupPtr _task_group = nullptr; std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 5a32b37c40f..b04a7eed819 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -104,6 +104,9 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( fragment_exec_params.runtime_filter_params); } + if (_query_ctx) { + _query_ctx->init_runtime_predicates({0}); + } } RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index bc15cf7207f..a2a9293dee8 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -393,6 +393,12 @@ Status NewOlapScanner::_init_tablet_reader_params( // runtime predicate push down optimization for topn _tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt; + if (olap_scan_node.__isset.topn_filter_source_node_ids) { + _tablet_reader_params.topn_filter_source_node_ids = + olap_scan_node.topn_filter_source_node_ids; + } else if (_tablet_reader_params.use_topn_opt) { + _tablet_reader_params.topn_filter_source_node_ids = {0}; + } } // If this is a Two-Phase read query, and we need to delay the release of Rowset diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 848256a0aee..f49fbe99c64 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -92,7 +92,7 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { } for (auto* slot : tuple_desc->slots()) { if (slot->id() == first_sort_slot.slot_id) { - RETURN_IF_ERROR(query_ctx->get_runtime_predicate().init( + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).init( slot->type().type, _nulls_first[0], _is_asc_order[0], slot->col_name())); break; @@ -100,7 +100,7 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { } } } - if (!query_ctx->get_runtime_predicate().inited()) { + if (!query_ctx->get_runtime_predicate(_id).inited()) { return Status::InternalError("runtime predicate is not properly initialized"); } } @@ -146,7 +146,7 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool Field new_top = _sorter->get_top_value(); if (!new_top.is_null() && new_top != old_top) { auto* query_ctx = state->get_query_ctx(); - RETURN_IF_ERROR(query_ctx->get_runtime_predicate().update(new_top)); + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top)); old_top = std::move(new_top); } } diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 4eca53146ed..d8728cfa812 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -421,7 +421,9 @@ Status VCollectIterator::_topn_next(Block* block) { // update orderby_extrems in query global context auto* query_ctx = _reader->_reader_context.runtime_state->get_query_ctx(); - RETURN_IF_ERROR(query_ctx->get_runtime_predicate().update(new_top)); + for (int id : _reader->_reader_context.topn_filter_source_node_ids) { + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(id).update(new_top)); + } } } // end of while (read_rows < _topn_limit && !eof) VLOG_DEBUG << "topn debug rowset " << i << " read_rows=" << read_rows << " eof=" << eof diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 8cc0a24a917..8cdf758f6dc 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -670,6 +670,7 @@ struct TPipelineInstanceParams { 5: optional TRuntimeFilterParams runtime_filter_params 6: optional i32 backend_num 7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans + 8: optional list<i32> topn_filter_source_node_ids } // ExecPlanFragment diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 4c23a1afd97..172115ff0b4 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -701,6 +701,7 @@ struct TOlapScanNode { 15: optional set<i32> output_column_unique_ids 16: optional list<i32> distribute_column_ids 17: optional i32 schema_version + 18: optional list<i32> topn_filter_source_node_ids } struct TEqJoinCondition { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org