This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new baaa53d8551 [Feature](top-n) support topn filter on vscan node (#33496) baaa53d8551 is described below commit baaa53d855174f6a4bf7e969cfa8e9c91899ec27 Author: Pxl <pxl...@qq.com> AuthorDate: Wed Apr 24 10:54:02 2024 +0800 [Feature](top-n) support topn filter on vscan node (#33496) --- be/src/pipeline/exec/es_scan_operator.cpp | 4 +- be/src/pipeline/exec/es_scan_operator.h | 2 +- be/src/pipeline/exec/file_scan_operator.cpp | 4 +- be/src/pipeline/exec/file_scan_operator.h | 2 +- be/src/pipeline/exec/meta_scan_operator.cpp | 2 +- be/src/pipeline/exec/meta_scan_operator.h | 2 +- be/src/pipeline/exec/olap_scan_operator.cpp | 4 +- be/src/pipeline/exec/olap_scan_operator.h | 4 +- be/src/pipeline/exec/scan_operator.cpp | 40 +++++-- be/src/pipeline/exec/scan_operator.h | 15 ++- be/src/pipeline/exec/sort_sink_operator.cpp | 2 +- be/src/runtime/runtime_predicate.cpp | 6 +- be/src/runtime/runtime_predicate.h | 17 +++ be/src/vec/exec/scan/new_olap_scanner.cpp | 24 ++++- be/src/vec/exprs/vliteral.cpp | 6 -- be/src/vec/exprs/vliteral.h | 2 - be/src/vec/exprs/vruntimefilter_wrapper.cpp | 88 ++-------------- be/src/vec/exprs/vtopn_pred.h | 116 +++++++++++++++++++++ be/src/vec/utils/util.hpp | 59 +++++++++++ .../org/apache/doris/planner/OlapScanNode.java | 6 +- gensrc/thrift/PlanNodes.thrift | 2 + 21 files changed, 290 insertions(+), 117 deletions(-) diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index c00ee6917ea..aab16ff3bff 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -53,8 +53,8 @@ Status EsScanLocalState::_init_profile() { return Status::OK(); } -Status EsScanLocalState::_process_conjuncts() { - RETURN_IF_ERROR(Base::_process_conjuncts()); +Status EsScanLocalState::_process_conjuncts(RuntimeState* state) { + RETURN_IF_ERROR(Base::_process_conjuncts(state)); if (Base::_eos) { return Status::OK(); } diff --git a/be/src/pipeline/exec/es_scan_operator.h b/be/src/pipeline/exec/es_scan_operator.h index 62d1a043c47..cdbd6922454 100644 --- a/be/src/pipeline/exec/es_scan_operator.h +++ b/be/src/pipeline/exec/es_scan_operator.h @@ -51,7 +51,7 @@ private: void set_scan_ranges(RuntimeState* state, const std::vector<TScanRangeParams>& scan_ranges) override; Status _init_profile() override; - Status _process_conjuncts() override; + Status _process_conjuncts(RuntimeState* state) override; Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; std::vector<std::unique_ptr<TEsScanRange>> _scan_ranges; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index f81781481df..392179f2dd8 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -119,8 +119,8 @@ Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { return Status::OK(); } -Status FileScanLocalState::_process_conjuncts() { - RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts()); +Status FileScanLocalState::_process_conjuncts(RuntimeState* state) { + RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts(state)); if (Base::_eos) { return Status::OK(); } diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 4d0c38b2850..e59dd8055b2 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -49,7 +49,7 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; - Status _process_conjuncts() override; + Status _process_conjuncts(RuntimeState* state) override; Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; void set_scan_ranges(RuntimeState* state, const std::vector<TScanRangeParams>& scan_ranges) override; diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp index 749fbcf333a..e981c6a2dc1 100644 --- a/be/src/pipeline/exec/meta_scan_operator.cpp +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -44,7 +44,7 @@ void MetaScanLocalState::set_scan_ranges(RuntimeState* state, _scan_ranges = scan_ranges; } -Status MetaScanLocalState::_process_conjuncts() { +Status MetaScanLocalState::_process_conjuncts(RuntimeState* state) { return Status::OK(); } diff --git a/be/src/pipeline/exec/meta_scan_operator.h b/be/src/pipeline/exec/meta_scan_operator.h index 1bfda6c9b83..e26af7dba5a 100644 --- a/be/src/pipeline/exec/meta_scan_operator.h +++ b/be/src/pipeline/exec/meta_scan_operator.h @@ -51,7 +51,7 @@ private: void set_scan_ranges(RuntimeState* state, const std::vector<TScanRangeParams>& scan_ranges) override; Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; - Status _process_conjuncts() override; + Status _process_conjuncts(RuntimeState* state) override; std::vector<TScanRangeParams> _scan_ranges; }; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 1876e62ed9c..eb0a0be726b 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -147,9 +147,9 @@ Status OlapScanLocalState::_init_profile() { return Status::OK(); } -Status OlapScanLocalState::_process_conjuncts() { +Status OlapScanLocalState::_process_conjuncts(RuntimeState* state) { SCOPED_TIMER(_process_conjunct_timer); - RETURN_IF_ERROR(ScanLocalState::_process_conjuncts()); + RETURN_IF_ERROR(ScanLocalState::_process_conjuncts(state)); if (ScanLocalState::_eos) { return Status::OK(); } diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 8ec318e853b..8f546826c88 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -58,7 +58,7 @@ private: void set_scan_ranges(RuntimeState* state, const std::vector<TScanRangeParams>& scan_ranges) override; Status _init_profile() override; - Status _process_conjuncts() override; + Status _process_conjuncts(RuntimeState* state) override; bool _is_key_column(const std::string& col_name) override; Status _should_push_down_function_filter(vectorized::VectorizedFnCall* fn_call, @@ -83,6 +83,8 @@ private: bool _storage_no_merge() override; + bool _push_down_topn() override { return true; } + Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; void add_filter_info(int id, const PredicateFilterInfo& info); diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 94d07f8c0d6..9d32f0e25ab 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -40,6 +40,7 @@ #include "vec/exprs/vexpr_context.h" #include "vec/exprs/vin_predicate.h" #include "vec/exprs/vslot_ref.h" +#include "vec/exprs/vtopn_pred.h" #include "vec/functions/in.h" namespace doris::pipeline { @@ -48,11 +49,7 @@ OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator) bool ScanOperator::can_read() { if (!_node->_opened) { - if (_node->_should_create_scanner || _node->ready_to_open()) { - return true; - } else { - return false; - } + return _node->_should_create_scanner || _node->ready_to_open(); } else { // If scanner meet any error, done == true if (_node->_eos || _node->_scanner_ctx->done()) { @@ -151,7 +148,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) { for (size_t i = 0; i < _stale_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._stale_expr_ctxs[i]->clone(state, _stale_expr_ctxs[i])); } - RETURN_IF_ERROR(_process_conjuncts()); + RETURN_IF_ERROR(_process_conjuncts(state)); auto status = _eos ? Status::OK() : _prepare_scanners(); RETURN_IF_ERROR(status); @@ -164,7 +161,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) { } template <typename Derived> -Status ScanLocalState<Derived>::_normalize_conjuncts() { +Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) { auto& p = _parent->cast<typename Derived::Parent>(); // The conjuncts is always on output tuple, so use _output_tuple_desc; std::vector<SlotDescriptor*> slots = p._output_tuple_desc->slots(); @@ -226,6 +223,10 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() { init_value_range(_slot_id_to_slot_desc[_colname_to_slot_id[colname]], type); } + if (!_push_down_topn()) { + RETURN_IF_ERROR(_get_topn_filters(state)); + } + for (auto it = _conjuncts.begin(); it != _conjuncts.end();) { auto& conjunct = *it; if (conjunct->root()) { @@ -1300,6 +1301,27 @@ Status ScanLocalState<Derived>::_init_profile() { return Status::OK(); } +template <typename Derived> +Status ScanLocalState<Derived>::_get_topn_filters(RuntimeState* state) { + for (auto id : get_topn_filter_source_node_ids()) { + const auto& pred = state->get_query_ctx()->get_runtime_predicate(id); + if (!pred.inited()) { + continue; + } + SlotDescriptor* slot_desc = _slot_id_to_slot_desc[_colname_to_slot_id[pred.get_col_name()]]; + + vectorized::VExprSPtr topn_pred; + RETURN_IF_ERROR(vectorized::VTopNPred::create_vtopn_pred(slot_desc, id, topn_pred)); + + vectorized::VExprContextSPtr conjunct = vectorized::VExprContext::create_shared(topn_pred); + RETURN_IF_ERROR(conjunct->prepare( + state, _parent->cast<typename Derived::Parent>().row_descriptor())); + RETURN_IF_ERROR(conjunct->open(state)); + _conjuncts.emplace_back(conjunct); + } + return Status::OK(); +} + template <typename Derived> void ScanLocalState<Derived>::_filter_and_collect_cast_type_for_variant( const vectorized::VExpr* expr, @@ -1391,6 +1413,10 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState* } else { _push_down_agg_type = TPushAggOp::type::NONE; } + + if (tnode.__isset.topn_filter_source_node_ids) { + topn_filter_source_node_ids = tnode.topn_filter_source_node_ids; + } return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 35a5d0c722a..3ebccb58a8c 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -180,6 +180,10 @@ class ScanLocalState : public ScanLocalStateBase { std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; } + std::vector<int> get_topn_filter_source_node_ids() { + return _parent->cast<typename Derived::Parent>().topn_filter_source_node_ids; + } + protected: template <typename LocalStateType> friend class ScanOperatorX; @@ -187,13 +191,14 @@ protected: friend class vectorized::VScanner; Status _init_profile() override; - virtual Status _process_conjuncts() { - RETURN_IF_ERROR(_normalize_conjuncts()); + virtual Status _process_conjuncts(RuntimeState* state) { + RETURN_IF_ERROR(_normalize_conjuncts(state)); return Status::OK(); } virtual bool _should_push_down_common_expr() { return false; } virtual bool _storage_no_merge() { return false; } + virtual bool _push_down_topn() { return false; } virtual bool _is_key_column(const std::string& col_name) { return false; } virtual vectorized::VScanNode::PushDownType _should_push_down_bloom_filter() { return vectorized::VScanNode::PushDownType::UNACCEPTABLE; @@ -231,7 +236,7 @@ protected: return Status::OK(); } - Status _normalize_conjuncts(); + Status _normalize_conjuncts(RuntimeState* state); Status _normalize_predicate(const vectorized::VExprSPtr& conjunct_expr_root, vectorized::VExprContext* context, vectorized::VExprSPtr& output_expr); @@ -324,6 +329,8 @@ protected: const vectorized::VExpr* expr, phmap::flat_hash_map<std::string, std::vector<PrimitiveType>>& colname_to_cast_types); + Status _get_topn_filters(RuntimeState* state); + // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector // so that it will be destroyed uniformly at the end of the query. vectorized::VExprContextSPtrs _stale_expr_ctxs; @@ -454,6 +461,8 @@ protected: // Record the value of the aggregate function 'count' from doris's be int64_t _push_down_count = -1; const int _parallel_tasks = 0; + + std::vector<int> topn_filter_source_node_ids; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index d89e54614d1..91ae687510c 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -162,7 +162,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in if (_use_topn_opt) { auto& predicate = state->get_query_ctx()->get_runtime_predicate(_node_id); - if (predicate.need_update()) { + if (predicate.inited()) { vectorized::Field new_top = local_state._shared_state->sorter->get_top_value(); if (!new_top.is_null() && new_top != local_state.old_top) { auto* query_ctx = state->get_query_ctx(); diff --git a/be/src/runtime/runtime_predicate.cpp b/be/src/runtime/runtime_predicate.cpp index 032a5d505c9..2655ff86680 100644 --- a/be/src/runtime/runtime_predicate.cpp +++ b/be/src/runtime/runtime_predicate.cpp @@ -133,7 +133,7 @@ Status RuntimePredicate::init(PrimitiveType type, bool nulls_first, bool is_asc, Status RuntimePredicate::update(const Field& value) { std::unique_lock<std::shared_mutex> wlock(_rwlock); // skip null value - if (value.is_null() || !_inited || !_tablet_schema) { + if (value.is_null() || !_inited) { return Status::OK(); } @@ -149,7 +149,9 @@ Status RuntimePredicate::update(const Field& value) { } } - if (!updated) { + _has_value = true; + + if (!updated || !_tablet_schema) { return Status::OK(); } diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index 255c909c286..00fbd62dd88 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -76,6 +76,22 @@ public: Status update(const Field& value); + bool has_value() const { + std::shared_lock<std::shared_mutex> rlock(_rwlock); + return _has_value; + } + + Field get_value() const { + std::shared_lock<std::shared_mutex> rlock(_rwlock); + return _orderby_extrem; + } + + std::string get_col_name() const { return _col_name; } + + bool is_asc() const { return _is_asc; } + + bool nulls_first() const { return _nulls_first; } + private: mutable std::shared_mutex _rwlock; Field _orderby_extrem {Field::Types::Null}; @@ -90,6 +106,7 @@ private: _pred_constructor; bool _inited = false; std::string _col_name; + bool _has_value = false; template <PrimitiveType type> static std::string get_normal_value(const Field& field) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index ac8481e5f4f..c0bef6b3d8a 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -395,12 +395,26 @@ Status NewOlapScanner::_init_tablet_reader_params( } // runtime predicate push down optimization for topn - _tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt; - if (olap_scan_node.__isset.topn_filter_source_node_ids) { + if (!_parent && !((pipeline::OlapScanLocalState*)_local_state) + ->get_topn_filter_source_node_ids() + .empty()) { + // the new topn whitch support external table _tablet_reader_params.topn_filter_source_node_ids = - olap_scan_node.topn_filter_source_node_ids; - } else if (_tablet_reader_params.use_topn_opt) { - _tablet_reader_params.topn_filter_source_node_ids = {0}; + ((pipeline::OlapScanLocalState*)_local_state) + ->get_topn_filter_source_node_ids(); + } else { + _tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt; + if (_tablet_reader_params.use_topn_opt) { + if (olap_scan_node.__isset.topn_filter_source_node_ids) { + // the 2.1 new multiple topn + _tablet_reader_params.topn_filter_source_node_ids = + olap_scan_node.topn_filter_source_node_ids; + + } else { + // the 2.0 old topn + _tablet_reader_params.topn_filter_source_node_ids = {0}; + } + } } } diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp index c7fbb081675..2dd544a0428 100644 --- a/be/src/vec/exprs/vliteral.cpp +++ b/be/src/vec/exprs/vliteral.cpp @@ -61,12 +61,6 @@ Status VLiteral::prepare(RuntimeState* state, const RowDescriptor& desc, VExprCo return Status::OK(); } -Status VLiteral::open(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) { - RETURN_IF_ERROR(VExpr::open(state, context, scope)); - return Status::OK(); -} - Status VLiteral::execute(VExprContext* context, vectorized::Block* block, int* result_column_id) { // Literal expr should return least one row. // sometimes we just use a VLiteral without open or prepare. so can't check it at this moment diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h index d443478ada5..582fc8ccf32 100644 --- a/be/src/vec/exprs/vliteral.h +++ b/be/src/vec/exprs/vliteral.h @@ -44,8 +44,6 @@ public: } Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; - Status open(RuntimeState* state, VExprContext* context, - FunctionContext::FunctionStateScope scope) override; Status execute(VExprContext* context, Block* block, int* result_column_id) override; const std::string& expr_name() const override { return _expr_name; } diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index e0753103b00..0bd0afbe185 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -35,6 +35,7 @@ #include "vec/core/column_with_type_and_name.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" +#include "vec/utils/util.hpp" namespace doris { class RowDescriptor; @@ -81,16 +82,9 @@ void VRuntimeFilterWrapper::close(VExprContext* context, Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) { DCHECK(_open_finished || _getting_const_col); if (_always_true) { - auto res_data_column = ColumnVector<UInt8>::create(block->rows(), 1); - size_t num_columns_without_result = block->columns(); - if (_data_type->is_nullable()) { - auto null_map = ColumnVector<UInt8>::create(block->rows(), 0); - block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)), - _data_type, expr_name()}); - } else { - block->insert({std::move(res_data_column), _data_type, expr_name()}); - } - *result_column_id = num_columns_without_result; + block->insert({create_always_true_column(block->rows(), _data_type->is_nullable()), + _data_type, expr_name()}); + *result_column_id = block->columns() - 1; return Status::OK(); } else { int64_t input_rows = 0, filter_rows = 0; @@ -118,76 +112,12 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* const auto rows = block->rows(); ColumnWithTypeAndName& result_column = block->get_by_position(*result_column_id); - if (is_column_const(*result_column.column)) { - auto* constant_val = const_cast<char*>(result_column.column->get_data_at(0).data); - auto filter = - (constant_val == nullptr) || (!reinterpret_cast<const uint8_t*>(constant_val)); - // if _null_aware is true, we should check the first args column is nullable. if value in - // column is null. we should set it to true - if (_null_aware) { - DCHECK(!args.empty()); - // if args is only null, result may be const null column - DCHECK(is_column_const(*block->get_by_position(args[0]).column) || - constant_val == nullptr); - if (filter && - block->get_by_position(args[0]).column->get_data_at(0).data == nullptr) { - auto res_col = ColumnVector<uint8_t>::create(1, 1); - if (result_column.type->is_nullable()) { - result_column.column = make_nullable(std::move(res_col), false); - } else { - result_column.column = std::move(res_col); - } - filter = false; - } - } - if (filter) { - filter_rows += rows; - } - } else if (auto* nullable = check_and_get_column<ColumnNullable>(*result_column.column)) { - auto* __restrict data = ((ColumnVector<UInt8>*)nullable->get_nested_column_ptr().get()) - ->get_data() - .data(); - auto* __restrict null_map = const_cast<uint8_t*>(nullable->get_null_map_data().data()); - - if (_null_aware && block->get_by_position(args[0]).column->is_nullable()) { - auto* __restrict null_map_args = - ((ColumnNullable*)block->get_by_position(args[0]).column.get()) - ->get_null_map_data() - .data(); - // TODO: try to simd the code - for (int i = 0; i < rows; ++i) { - if (null_map_args[i]) { - null_map[i] = 0; - data[i] = 1; - } - filter_rows += (!null_map[i]) && (data[i] == 1); - } - } else { - filter_rows += doris::simd::count_zero_num( - reinterpret_cast<const int8_t* __restrict>(data), null_map, rows); - } - } else if (const auto* res_col = - check_and_get_column<ColumnVector<UInt8>>(*result_column.column)) { - auto* __restrict data = const_cast<uint8_t*>(res_col->get_data().data()); - if (_null_aware && block->get_by_position(args[0]).column->is_nullable()) { - auto* __restrict null_map_args = - ((ColumnNullable*)block->get_by_position(args[0]).column.get()) - ->get_null_map_data() - .data(); - for (int i = 0; i < rows; ++i) { - data[i] |= null_map_args[i]; - filter_rows += data[i]; - } - } else { - filter_rows += - doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), rows); - } - } else { - return Status::InternalError( - "Invalid type for runtime filters!, and _expr_name is: {}. _data_type is: {}. " - "result_column_id is: {}. block structure: {}.", - _expr_name, _data_type->get_name(), *result_column_id, block->dump_structure()); + + if (_null_aware) { + change_null_to_true(result_column.column, block->get_by_position(args[0]).column); } + + filter_rows = rows - calculate_false_number(result_column.column); _filtered_rows += filter_rows; _scan_rows += input_rows; calculate_filter(VRuntimeFilterWrapper::EXPECTED_FILTER_RATE, _filtered_rows, _scan_rows, diff --git a/be/src/vec/exprs/vtopn_pred.h b/be/src/vec/exprs/vtopn_pred.h new file mode 100644 index 00000000000..326ceaf0f2c --- /dev/null +++ b/be/src/vec/exprs/vtopn_pred.h @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/types.pb.h> + +#include "runtime/query_context.h" +#include "runtime/runtime_predicate.h" +#include "runtime/runtime_state.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vslot_ref.h" +#include "vec/functions/simple_function_factory.h" +#include "vec/utils/util.hpp" + +namespace doris::vectorized { + +// only used for dynamic topn filter +class VTopNPred : public VExpr { + ENABLE_FACTORY_CREATOR(VTopNPred); + +public: + VTopNPred(const TExprNode& node, int source_node_id) + : VExpr(node), + _source_node_id(source_node_id), + _expr_name(fmt::format("VTopNPred(source_node_id={})", _source_node_id)) {} + + // TODO: support general expr + static Status create_vtopn_pred(SlotDescriptor* slot_desc, int source_node_id, + vectorized::VExprSPtr& expr) { + TExprNode node; + node.__set_node_type(TExprNodeType::FUNCTION_CALL); + node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + node.__set_is_nullable(slot_desc->is_nullable()); + expr = vectorized::VTopNPred::create_shared(node, source_node_id); + + expr->add_child(VSlotRef::create_shared(slot_desc)); + + return Status::OK(); + } + + Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override { + _predicate = &state->get_query_ctx()->get_runtime_predicate(_source_node_id); + RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context)); + + ColumnsWithTypeAndName argument_template; + argument_template.emplace_back(nullptr, _children[0]->data_type(), + _children[0]->expr_name()); + argument_template.emplace_back(nullptr, _children[0]->data_type(), "topn value"); + + _function = SimpleFunctionFactory::instance().get_function( + _predicate->is_asc() ? "le" : "ge", argument_template, _data_type, + state->be_exec_version()); + if (!_function) { + return Status::InternalError("get function failed"); + } + return Status::OK(); + } + + Status execute(VExprContext* context, Block* block, int* result_column_id) override { + if (!_predicate->has_value()) { + block->insert({create_always_true_column(block->rows(), _data_type->is_nullable()), + _data_type, _expr_name}); + *result_column_id = block->columns() - 1; + return Status::OK(); + } + + Field field = _predicate->get_value(); + auto column_ptr = _children[0]->data_type()->create_column_const(1, field); + size_t row_size = std::max(block->rows(), column_ptr->size()); + int topn_value_id = VExpr::insert_param( + block, {column_ptr, _children[0]->data_type(), _expr_name}, row_size); + + int slot_id = -1; + RETURN_IF_ERROR(_children[0]->execute(context, block, &slot_id)); + + std::vector<size_t> arguments = {(size_t)slot_id, (size_t)topn_value_id}; + + size_t num_columns_without_result = block->columns(); + block->insert({nullptr, _data_type, _expr_name}); + RETURN_IF_ERROR(_function->execute(nullptr, *block, arguments, num_columns_without_result, + block->rows(), false)); + *result_column_id = num_columns_without_result; + + if (is_nullable() && _predicate->nulls_first()) { + // null values are always not filtered + change_null_to_true(block->get_by_position(num_columns_without_result).column); + } + return Status::OK(); + } + + const std::string& expr_name() const override { return _expr_name; } + +private: + int _source_node_id; + std::string _expr_name; + RuntimePredicate* _predicate = nullptr; + FunctionBasePtr _function; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index 30609799e7f..133c8dcc5b6 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -22,6 +22,7 @@ #include <boost/shared_ptr.hpp> #include "runtime/descriptors.h" +#include "util/simd/bits.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/core/block.h" @@ -169,6 +170,64 @@ inline std::string remove_suffix(const std::string& name, const std::string& suf return name.substr(0, name.length() - suffix.length()); }; +inline ColumnPtr create_always_true_column(size_t size, bool is_nullable) { + auto res_data_column = ColumnUInt8::create(size, 1); + if (is_nullable) { + auto null_map = ColumnVector<UInt8>::create(size, 0); + return ColumnNullable::create(std::move(res_data_column), std::move(null_map)); + } + return res_data_column; +} + +// change null element to true element +inline void change_null_to_true(ColumnPtr column, ColumnPtr argument = nullptr) { + size_t rows = column->size(); + if (is_column_const(*column)) { + change_null_to_true(assert_cast<const ColumnConst*>(column.get())->get_data_column_ptr()); + } else if (column->is_nullable()) { + auto* nullable = + const_cast<ColumnNullable*>(assert_cast<const ColumnNullable*>(column.get())); + auto* __restrict data = assert_cast<ColumnUInt8*>(nullable->get_nested_column_ptr().get()) + ->get_data() + .data(); + auto* __restrict null_map = const_cast<uint8_t*>(nullable->get_null_map_data().data()); + for (size_t i = 0; i < rows; ++i) { + data[i] |= null_map[i]; + } + memset(null_map, 0, rows); + } else if (argument != nullptr) { + const auto* __restrict null_map = + assert_cast<const ColumnNullable*>(argument.get())->get_null_map_data().data(); + auto* __restrict data = + const_cast<ColumnUInt8*>(assert_cast<const ColumnUInt8*>(column.get())) + ->get_data() + .data(); + for (size_t i = 0; i < rows; ++i) { + data[i] |= null_map[i]; + } + } +} + +inline size_t calculate_false_number(ColumnPtr column) { + size_t rows = column->size(); + if (is_column_const(*column)) { + return calculate_false_number( + assert_cast<const ColumnConst*>(column.get())->get_data_column_ptr()) * + rows; + } else if (column->is_nullable()) { + const auto* nullable = assert_cast<const ColumnNullable*>(column.get()); + const auto* data = assert_cast<const ColumnUInt8*>(nullable->get_nested_column_ptr().get()) + ->get_data() + .data(); + const auto* __restrict null_map = nullable->get_null_map_data().data(); + return simd::count_zero_num(reinterpret_cast<const int8_t* __restrict>(data), null_map, + rows); + } else { + const auto* data = assert_cast<const ColumnUInt8*>(column.get())->get_data().data(); + return simd::count_zero_num(reinterpret_cast<const int8_t* __restrict>(data), rows); + } +} + } // namespace doris::vectorized namespace apache::thrift { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index bccd8a4962d..fbafe3d00d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1530,7 +1530,11 @@ public class OlapScanNode extends ScanNode { .map(sortNode -> sortNode.getId().asInt()) .collect(Collectors.toList()); if (!topnFilterSourceNodeIds.isEmpty()) { - msg.olap_scan_node.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds); + if (SessionVariable.enablePipelineEngineX()) { + msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds); + } else { + msg.olap_scan_node.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds); + } } msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift()); String tableName = olapTable.getName(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index b0241b30c75..bb9a23b18f1 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1303,6 +1303,8 @@ struct TPlanNode { // Intermediate projections will not materialize into the output block. 104: optional list<list<Exprs.TExpr>> intermediate_projections_list 105: optional list<Types.TTupleId> intermediate_output_tuple_id_list + + 106: optional list<i32> topn_filter_source_node_ids } // A flattened representation of a tree of PlanNodes, obtained by depth-first --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org