This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new baaa53d8551 [Feature](top-n) support topn filter on vscan node (#33496)
baaa53d8551 is described below

commit baaa53d855174f6a4bf7e969cfa8e9c91899ec27
Author: Pxl <pxl...@qq.com>
AuthorDate: Wed Apr 24 10:54:02 2024 +0800

    [Feature](top-n) support topn filter on vscan node (#33496)
---
 be/src/pipeline/exec/es_scan_operator.cpp          |   4 +-
 be/src/pipeline/exec/es_scan_operator.h            |   2 +-
 be/src/pipeline/exec/file_scan_operator.cpp        |   4 +-
 be/src/pipeline/exec/file_scan_operator.h          |   2 +-
 be/src/pipeline/exec/meta_scan_operator.cpp        |   2 +-
 be/src/pipeline/exec/meta_scan_operator.h          |   2 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        |   4 +-
 be/src/pipeline/exec/olap_scan_operator.h          |   4 +-
 be/src/pipeline/exec/scan_operator.cpp             |  40 +++++--
 be/src/pipeline/exec/scan_operator.h               |  15 ++-
 be/src/pipeline/exec/sort_sink_operator.cpp        |   2 +-
 be/src/runtime/runtime_predicate.cpp               |   6 +-
 be/src/runtime/runtime_predicate.h                 |  17 +++
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  24 ++++-
 be/src/vec/exprs/vliteral.cpp                      |   6 --
 be/src/vec/exprs/vliteral.h                        |   2 -
 be/src/vec/exprs/vruntimefilter_wrapper.cpp        |  88 ++--------------
 be/src/vec/exprs/vtopn_pred.h                      | 116 +++++++++++++++++++++
 be/src/vec/utils/util.hpp                          |  59 +++++++++++
 .../org/apache/doris/planner/OlapScanNode.java     |   6 +-
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 21 files changed, 290 insertions(+), 117 deletions(-)

diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index c00ee6917ea..aab16ff3bff 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -53,8 +53,8 @@ Status EsScanLocalState::_init_profile() {
     return Status::OK();
 }
 
-Status EsScanLocalState::_process_conjuncts() {
-    RETURN_IF_ERROR(Base::_process_conjuncts());
+Status EsScanLocalState::_process_conjuncts(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::_process_conjuncts(state));
     if (Base::_eos) {
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/es_scan_operator.h 
b/be/src/pipeline/exec/es_scan_operator.h
index 62d1a043c47..cdbd6922454 100644
--- a/be/src/pipeline/exec/es_scan_operator.h
+++ b/be/src/pipeline/exec/es_scan_operator.h
@@ -51,7 +51,7 @@ private:
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override;
     Status _init_profile() override;
-    Status _process_conjuncts() override;
+    Status _process_conjuncts(RuntimeState* state) override;
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
 
     std::vector<std::unique_ptr<TEsScanRange>> _scan_ranges;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index f81781481df..392179f2dd8 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -119,8 +119,8 @@ Status FileScanLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     return Status::OK();
 }
 
-Status FileScanLocalState::_process_conjuncts() {
-    RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts());
+Status FileScanLocalState::_process_conjuncts(RuntimeState* state) {
+    
RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts(state));
     if (Base::_eos) {
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index 4d0c38b2850..e59dd8055b2 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -49,7 +49,7 @@ public:
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
 
-    Status _process_conjuncts() override;
+    Status _process_conjuncts(RuntimeState* state) override;
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override;
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp 
b/be/src/pipeline/exec/meta_scan_operator.cpp
index 749fbcf333a..e981c6a2dc1 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -44,7 +44,7 @@ void MetaScanLocalState::set_scan_ranges(RuntimeState* state,
     _scan_ranges = scan_ranges;
 }
 
-Status MetaScanLocalState::_process_conjuncts() {
+Status MetaScanLocalState::_process_conjuncts(RuntimeState* state) {
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/meta_scan_operator.h 
b/be/src/pipeline/exec/meta_scan_operator.h
index 1bfda6c9b83..e26af7dba5a 100644
--- a/be/src/pipeline/exec/meta_scan_operator.h
+++ b/be/src/pipeline/exec/meta_scan_operator.h
@@ -51,7 +51,7 @@ private:
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override;
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
-    Status _process_conjuncts() override;
+    Status _process_conjuncts(RuntimeState* state) override;
 
     std::vector<TScanRangeParams> _scan_ranges;
 };
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 1876e62ed9c..eb0a0be726b 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -147,9 +147,9 @@ Status OlapScanLocalState::_init_profile() {
     return Status::OK();
 }
 
-Status OlapScanLocalState::_process_conjuncts() {
+Status OlapScanLocalState::_process_conjuncts(RuntimeState* state) {
     SCOPED_TIMER(_process_conjunct_timer);
-    RETURN_IF_ERROR(ScanLocalState::_process_conjuncts());
+    RETURN_IF_ERROR(ScanLocalState::_process_conjuncts(state));
     if (ScanLocalState::_eos) {
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 8ec318e853b..8f546826c88 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -58,7 +58,7 @@ private:
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override;
     Status _init_profile() override;
-    Status _process_conjuncts() override;
+    Status _process_conjuncts(RuntimeState* state) override;
     bool _is_key_column(const std::string& col_name) override;
 
     Status _should_push_down_function_filter(vectorized::VectorizedFnCall* 
fn_call,
@@ -83,6 +83,8 @@ private:
 
     bool _storage_no_merge() override;
 
+    bool _push_down_topn() override { return true; }
+
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
 
     void add_filter_info(int id, const PredicateFilterInfo& info);
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 94d07f8c0d6..9d32f0e25ab 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -40,6 +40,7 @@
 #include "vec/exprs/vexpr_context.h"
 #include "vec/exprs/vin_predicate.h"
 #include "vec/exprs/vslot_ref.h"
+#include "vec/exprs/vtopn_pred.h"
 #include "vec/functions/in.h"
 
 namespace doris::pipeline {
@@ -48,11 +49,7 @@ OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
 
 bool ScanOperator::can_read() {
     if (!_node->_opened) {
-        if (_node->_should_create_scanner || _node->ready_to_open()) {
-            return true;
-        } else {
-            return false;
-        }
+        return _node->_should_create_scanner || _node->ready_to_open();
     } else {
         // If scanner meet any error, done == true
         if (_node->_eos || _node->_scanner_ctx->done()) {
@@ -151,7 +148,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
     for (size_t i = 0; i < _stale_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._stale_expr_ctxs[i]->clone(state, 
_stale_expr_ctxs[i]));
     }
-    RETURN_IF_ERROR(_process_conjuncts());
+    RETURN_IF_ERROR(_process_conjuncts(state));
 
     auto status = _eos ? Status::OK() : _prepare_scanners();
     RETURN_IF_ERROR(status);
@@ -164,7 +161,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
 }
 
 template <typename Derived>
-Status ScanLocalState<Derived>::_normalize_conjuncts() {
+Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
     auto& p = _parent->cast<typename Derived::Parent>();
     // The conjuncts is always on output tuple, so use _output_tuple_desc;
     std::vector<SlotDescriptor*> slots = p._output_tuple_desc->slots();
@@ -226,6 +223,10 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
         init_value_range(_slot_id_to_slot_desc[_colname_to_slot_id[colname]], 
type);
     }
 
+    if (!_push_down_topn()) {
+        RETURN_IF_ERROR(_get_topn_filters(state));
+    }
+
     for (auto it = _conjuncts.begin(); it != _conjuncts.end();) {
         auto& conjunct = *it;
         if (conjunct->root()) {
@@ -1300,6 +1301,27 @@ Status ScanLocalState<Derived>::_init_profile() {
     return Status::OK();
 }
 
+template <typename Derived>
+Status ScanLocalState<Derived>::_get_topn_filters(RuntimeState* state) {
+    for (auto id : get_topn_filter_source_node_ids()) {
+        const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
+        if (!pred.inited()) {
+            continue;
+        }
+        SlotDescriptor* slot_desc = 
_slot_id_to_slot_desc[_colname_to_slot_id[pred.get_col_name()]];
+
+        vectorized::VExprSPtr topn_pred;
+        RETURN_IF_ERROR(vectorized::VTopNPred::create_vtopn_pred(slot_desc, 
id, topn_pred));
+
+        vectorized::VExprContextSPtr conjunct = 
vectorized::VExprContext::create_shared(topn_pred);
+        RETURN_IF_ERROR(conjunct->prepare(
+                state, _parent->cast<typename 
Derived::Parent>().row_descriptor()));
+        RETURN_IF_ERROR(conjunct->open(state));
+        _conjuncts.emplace_back(conjunct);
+    }
+    return Status::OK();
+}
+
 template <typename Derived>
 void ScanLocalState<Derived>::_filter_and_collect_cast_type_for_variant(
         const vectorized::VExpr* expr,
@@ -1391,6 +1413,10 @@ Status ScanOperatorX<LocalStateType>::init(const 
TPlanNode& tnode, RuntimeState*
     } else {
         _push_down_agg_type = TPushAggOp::type::NONE;
     }
+
+    if (tnode.__isset.topn_filter_source_node_ids) {
+        topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
+    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 35a5d0c722a..3ebccb58a8c 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -180,6 +180,10 @@ class ScanLocalState : public ScanLocalStateBase {
 
     std::vector<Dependency*> dependencies() const override { return 
{_scan_dependency.get()}; }
 
+    std::vector<int> get_topn_filter_source_node_ids() {
+        return _parent->cast<typename 
Derived::Parent>().topn_filter_source_node_ids;
+    }
+
 protected:
     template <typename LocalStateType>
     friend class ScanOperatorX;
@@ -187,13 +191,14 @@ protected:
     friend class vectorized::VScanner;
 
     Status _init_profile() override;
-    virtual Status _process_conjuncts() {
-        RETURN_IF_ERROR(_normalize_conjuncts());
+    virtual Status _process_conjuncts(RuntimeState* state) {
+        RETURN_IF_ERROR(_normalize_conjuncts(state));
         return Status::OK();
     }
     virtual bool _should_push_down_common_expr() { return false; }
 
     virtual bool _storage_no_merge() { return false; }
+    virtual bool _push_down_topn() { return false; }
     virtual bool _is_key_column(const std::string& col_name) { return false; }
     virtual vectorized::VScanNode::PushDownType 
_should_push_down_bloom_filter() {
         return vectorized::VScanNode::PushDownType::UNACCEPTABLE;
@@ -231,7 +236,7 @@ protected:
         return Status::OK();
     }
 
-    Status _normalize_conjuncts();
+    Status _normalize_conjuncts(RuntimeState* state);
     Status _normalize_predicate(const vectorized::VExprSPtr& 
conjunct_expr_root,
                                 vectorized::VExprContext* context,
                                 vectorized::VExprSPtr& output_expr);
@@ -324,6 +329,8 @@ protected:
             const vectorized::VExpr* expr,
             phmap::flat_hash_map<std::string, std::vector<PrimitiveType>>& 
colname_to_cast_types);
 
+    Status _get_topn_filters(RuntimeState* state);
+
     // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in 
this vector
     // so that it will be destroyed uniformly at the end of the query.
     vectorized::VExprContextSPtrs _stale_expr_ctxs;
@@ -454,6 +461,8 @@ protected:
     // Record the value of the aggregate function 'count' from doris's be
     int64_t _push_down_count = -1;
     const int _parallel_tasks = 0;
+
+    std::vector<int> topn_filter_source_node_ids;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index d89e54614d1..91ae687510c 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -162,7 +162,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in
 
         if (_use_topn_opt) {
             auto& predicate = 
state->get_query_ctx()->get_runtime_predicate(_node_id);
-            if (predicate.need_update()) {
+            if (predicate.inited()) {
                 vectorized::Field new_top = 
local_state._shared_state->sorter->get_top_value();
                 if (!new_top.is_null() && new_top != local_state.old_top) {
                     auto* query_ctx = state->get_query_ctx();
diff --git a/be/src/runtime/runtime_predicate.cpp 
b/be/src/runtime/runtime_predicate.cpp
index 032a5d505c9..2655ff86680 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -133,7 +133,7 @@ Status RuntimePredicate::init(PrimitiveType type, bool 
nulls_first, bool is_asc,
 Status RuntimePredicate::update(const Field& value) {
     std::unique_lock<std::shared_mutex> wlock(_rwlock);
     // skip null value
-    if (value.is_null() || !_inited || !_tablet_schema) {
+    if (value.is_null() || !_inited) {
         return Status::OK();
     }
 
@@ -149,7 +149,9 @@ Status RuntimePredicate::update(const Field& value) {
         }
     }
 
-    if (!updated) {
+    _has_value = true;
+
+    if (!updated || !_tablet_schema) {
         return Status::OK();
     }
 
diff --git a/be/src/runtime/runtime_predicate.h 
b/be/src/runtime/runtime_predicate.h
index 255c909c286..00fbd62dd88 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -76,6 +76,22 @@ public:
 
     Status update(const Field& value);
 
+    bool has_value() const {
+        std::shared_lock<std::shared_mutex> rlock(_rwlock);
+        return _has_value;
+    }
+
+    Field get_value() const {
+        std::shared_lock<std::shared_mutex> rlock(_rwlock);
+        return _orderby_extrem;
+    }
+
+    std::string get_col_name() const { return _col_name; }
+
+    bool is_asc() const { return _is_asc; }
+
+    bool nulls_first() const { return _nulls_first; }
+
 private:
     mutable std::shared_mutex _rwlock;
     Field _orderby_extrem {Field::Types::Null};
@@ -90,6 +106,7 @@ private:
             _pred_constructor;
     bool _inited = false;
     std::string _col_name;
+    bool _has_value = false;
 
     template <PrimitiveType type>
     static std::string get_normal_value(const Field& field) {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index ac8481e5f4f..c0bef6b3d8a 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -395,12 +395,26 @@ Status NewOlapScanner::_init_tablet_reader_params(
         }
 
         // runtime predicate push down optimization for topn
-        _tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt;
-        if (olap_scan_node.__isset.topn_filter_source_node_ids) {
+        if (!_parent && !((pipeline::OlapScanLocalState*)_local_state)
+                                 ->get_topn_filter_source_node_ids()
+                                 .empty()) {
+            // the new topn whitch support external table
             _tablet_reader_params.topn_filter_source_node_ids =
-                    olap_scan_node.topn_filter_source_node_ids;
-        } else if (_tablet_reader_params.use_topn_opt) {
-            _tablet_reader_params.topn_filter_source_node_ids = {0};
+                    ((pipeline::OlapScanLocalState*)_local_state)
+                            ->get_topn_filter_source_node_ids();
+        } else {
+            _tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt;
+            if (_tablet_reader_params.use_topn_opt) {
+                if (olap_scan_node.__isset.topn_filter_source_node_ids) {
+                    // the 2.1 new multiple topn
+                    _tablet_reader_params.topn_filter_source_node_ids =
+                            olap_scan_node.topn_filter_source_node_ids;
+
+                } else {
+                    // the 2.0 old topn
+                    _tablet_reader_params.topn_filter_source_node_ids = {0};
+                }
+            }
         }
     }
 
diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp
index c7fbb081675..2dd544a0428 100644
--- a/be/src/vec/exprs/vliteral.cpp
+++ b/be/src/vec/exprs/vliteral.cpp
@@ -61,12 +61,6 @@ Status VLiteral::prepare(RuntimeState* state, const 
RowDescriptor& desc, VExprCo
     return Status::OK();
 }
 
-Status VLiteral::open(RuntimeState* state, VExprContext* context,
-                      FunctionContext::FunctionStateScope scope) {
-    RETURN_IF_ERROR(VExpr::open(state, context, scope));
-    return Status::OK();
-}
-
 Status VLiteral::execute(VExprContext* context, vectorized::Block* block, int* 
result_column_id) {
     // Literal expr should return least one row.
     // sometimes we just use a VLiteral without open or prepare. so can't 
check it at this moment
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index d443478ada5..582fc8ccf32 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -44,8 +44,6 @@ public:
     }
 
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
-    Status open(RuntimeState* state, VExprContext* context,
-                FunctionContext::FunctionStateScope scope) override;
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
override;
 
     const std::string& expr_name() const override { return _expr_name; }
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index e0753103b00..0bd0afbe185 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -35,6 +35,7 @@
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
+#include "vec/utils/util.hpp"
 
 namespace doris {
 class RowDescriptor;
@@ -81,16 +82,9 @@ void VRuntimeFilterWrapper::close(VExprContext* context,
 Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, 
int* result_column_id) {
     DCHECK(_open_finished || _getting_const_col);
     if (_always_true) {
-        auto res_data_column = ColumnVector<UInt8>::create(block->rows(), 1);
-        size_t num_columns_without_result = block->columns();
-        if (_data_type->is_nullable()) {
-            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
-            block->insert({ColumnNullable::create(std::move(res_data_column), 
std::move(null_map)),
-                           _data_type, expr_name()});
-        } else {
-            block->insert({std::move(res_data_column), _data_type, 
expr_name()});
-        }
-        *result_column_id = num_columns_without_result;
+        block->insert({create_always_true_column(block->rows(), 
_data_type->is_nullable()),
+                       _data_type, expr_name()});
+        *result_column_id = block->columns() - 1;
         return Status::OK();
     } else {
         int64_t input_rows = 0, filter_rows = 0;
@@ -118,76 +112,12 @@ Status VRuntimeFilterWrapper::execute(VExprContext* 
context, Block* block, int*
 
         const auto rows = block->rows();
         ColumnWithTypeAndName& result_column = 
block->get_by_position(*result_column_id);
-        if (is_column_const(*result_column.column)) {
-            auto* constant_val = 
const_cast<char*>(result_column.column->get_data_at(0).data);
-            auto filter =
-                    (constant_val == nullptr) || (!reinterpret_cast<const 
uint8_t*>(constant_val));
-            // if _null_aware is true, we should check the first args column 
is nullable. if value in
-            // column is null. we should set it to true
-            if (_null_aware) {
-                DCHECK(!args.empty());
-                // if args is only null, result may be const null column
-                
DCHECK(is_column_const(*block->get_by_position(args[0]).column) ||
-                       constant_val == nullptr);
-                if (filter &&
-                    
block->get_by_position(args[0]).column->get_data_at(0).data == nullptr) {
-                    auto res_col = ColumnVector<uint8_t>::create(1, 1);
-                    if (result_column.type->is_nullable()) {
-                        result_column.column = 
make_nullable(std::move(res_col), false);
-                    } else {
-                        result_column.column = std::move(res_col);
-                    }
-                    filter = false;
-                }
-            }
-            if (filter) {
-                filter_rows += rows;
-            }
-        } else if (auto* nullable = 
check_and_get_column<ColumnNullable>(*result_column.column)) {
-            auto* __restrict data = 
((ColumnVector<UInt8>*)nullable->get_nested_column_ptr().get())
-                                            ->get_data()
-                                            .data();
-            auto* __restrict null_map = 
const_cast<uint8_t*>(nullable->get_null_map_data().data());
-
-            if (_null_aware && 
block->get_by_position(args[0]).column->is_nullable()) {
-                auto* __restrict null_map_args =
-                        
((ColumnNullable*)block->get_by_position(args[0]).column.get())
-                                ->get_null_map_data()
-                                .data();
-                // TODO: try to simd the code
-                for (int i = 0; i < rows; ++i) {
-                    if (null_map_args[i]) {
-                        null_map[i] = 0;
-                        data[i] = 1;
-                    }
-                    filter_rows += (!null_map[i]) && (data[i] == 1);
-                }
-            } else {
-                filter_rows += doris::simd::count_zero_num(
-                        reinterpret_cast<const int8_t* __restrict>(data), 
null_map, rows);
-            }
-        } else if (const auto* res_col =
-                           
check_and_get_column<ColumnVector<UInt8>>(*result_column.column)) {
-            auto* __restrict data = 
const_cast<uint8_t*>(res_col->get_data().data());
-            if (_null_aware && 
block->get_by_position(args[0]).column->is_nullable()) {
-                auto* __restrict null_map_args =
-                        
((ColumnNullable*)block->get_by_position(args[0]).column.get())
-                                ->get_null_map_data()
-                                .data();
-                for (int i = 0; i < rows; ++i) {
-                    data[i] |= null_map_args[i];
-                    filter_rows += data[i];
-                }
-            } else {
-                filter_rows +=
-                        doris::simd::count_zero_num(reinterpret_cast<const 
int8_t*>(data), rows);
-            }
-        } else {
-            return Status::InternalError(
-                    "Invalid type for runtime filters!, and _expr_name is: {}. 
_data_type is: {}. "
-                    "result_column_id is: {}. block structure: {}.",
-                    _expr_name, _data_type->get_name(), *result_column_id, 
block->dump_structure());
+
+        if (_null_aware) {
+            change_null_to_true(result_column.column, 
block->get_by_position(args[0]).column);
         }
+
+        filter_rows = rows - calculate_false_number(result_column.column);
         _filtered_rows += filter_rows;
         _scan_rows += input_rows;
         calculate_filter(VRuntimeFilterWrapper::EXPECTED_FILTER_RATE, 
_filtered_rows, _scan_rows,
diff --git a/be/src/vec/exprs/vtopn_pred.h b/be/src/vec/exprs/vtopn_pred.h
new file mode 100644
index 00000000000..326ceaf0f2c
--- /dev/null
+++ b/be/src/vec/exprs/vtopn_pred.h
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/types.pb.h>
+
+#include "runtime/query_context.h"
+#include "runtime/runtime_predicate.h"
+#include "runtime/runtime_state.h"
+#include "vec/columns/columns_number.h"
+#include "vec/data_types/data_type.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/util.hpp"
+
+namespace doris::vectorized {
+
+// only used for dynamic topn filter
+class VTopNPred : public VExpr {
+    ENABLE_FACTORY_CREATOR(VTopNPred);
+
+public:
+    VTopNPred(const TExprNode& node, int source_node_id)
+            : VExpr(node),
+              _source_node_id(source_node_id),
+              _expr_name(fmt::format("VTopNPred(source_node_id={})", 
_source_node_id)) {}
+
+    // TODO: support general expr
+    static Status create_vtopn_pred(SlotDescriptor* slot_desc, int 
source_node_id,
+                                    vectorized::VExprSPtr& expr) {
+        TExprNode node;
+        node.__set_node_type(TExprNodeType::FUNCTION_CALL);
+        node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+        node.__set_is_nullable(slot_desc->is_nullable());
+        expr = vectorized::VTopNPred::create_shared(node, source_node_id);
+
+        expr->add_child(VSlotRef::create_shared(slot_desc));
+
+        return Status::OK();
+    }
+
+    Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override {
+        _predicate = 
&state->get_query_ctx()->get_runtime_predicate(_source_node_id);
+        RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+
+        ColumnsWithTypeAndName argument_template;
+        argument_template.emplace_back(nullptr, _children[0]->data_type(),
+                                       _children[0]->expr_name());
+        argument_template.emplace_back(nullptr, _children[0]->data_type(), 
"topn value");
+
+        _function = SimpleFunctionFactory::instance().get_function(
+                _predicate->is_asc() ? "le" : "ge", argument_template, 
_data_type,
+                state->be_exec_version());
+        if (!_function) {
+            return Status::InternalError("get function failed");
+        }
+        return Status::OK();
+    }
+
+    Status execute(VExprContext* context, Block* block, int* result_column_id) 
override {
+        if (!_predicate->has_value()) {
+            block->insert({create_always_true_column(block->rows(), 
_data_type->is_nullable()),
+                           _data_type, _expr_name});
+            *result_column_id = block->columns() - 1;
+            return Status::OK();
+        }
+
+        Field field = _predicate->get_value();
+        auto column_ptr = _children[0]->data_type()->create_column_const(1, 
field);
+        size_t row_size = std::max(block->rows(), column_ptr->size());
+        int topn_value_id = VExpr::insert_param(
+                block, {column_ptr, _children[0]->data_type(), _expr_name}, 
row_size);
+
+        int slot_id = -1;
+        RETURN_IF_ERROR(_children[0]->execute(context, block, &slot_id));
+
+        std::vector<size_t> arguments = {(size_t)slot_id, 
(size_t)topn_value_id};
+
+        size_t num_columns_without_result = block->columns();
+        block->insert({nullptr, _data_type, _expr_name});
+        RETURN_IF_ERROR(_function->execute(nullptr, *block, arguments, 
num_columns_without_result,
+                                           block->rows(), false));
+        *result_column_id = num_columns_without_result;
+
+        if (is_nullable() && _predicate->nulls_first()) {
+            // null values ​​are always not filtered
+            
change_null_to_true(block->get_by_position(num_columns_without_result).column);
+        }
+        return Status::OK();
+    }
+
+    const std::string& expr_name() const override { return _expr_name; }
+
+private:
+    int _source_node_id;
+    std::string _expr_name;
+    RuntimePredicate* _predicate = nullptr;
+    FunctionBasePtr _function;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp
index 30609799e7f..133c8dcc5b6 100644
--- a/be/src/vec/utils/util.hpp
+++ b/be/src/vec/utils/util.hpp
@@ -22,6 +22,7 @@
 #include <boost/shared_ptr.hpp>
 
 #include "runtime/descriptors.h"
+#include "util/simd/bits.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/block.h"
@@ -169,6 +170,64 @@ inline std::string remove_suffix(const std::string& name, 
const std::string& suf
     return name.substr(0, name.length() - suffix.length());
 };
 
+inline ColumnPtr create_always_true_column(size_t size, bool is_nullable) {
+    auto res_data_column = ColumnUInt8::create(size, 1);
+    if (is_nullable) {
+        auto null_map = ColumnVector<UInt8>::create(size, 0);
+        return ColumnNullable::create(std::move(res_data_column), 
std::move(null_map));
+    }
+    return res_data_column;
+}
+
+// change null element to true element
+inline void change_null_to_true(ColumnPtr column, ColumnPtr argument = 
nullptr) {
+    size_t rows = column->size();
+    if (is_column_const(*column)) {
+        change_null_to_true(assert_cast<const 
ColumnConst*>(column.get())->get_data_column_ptr());
+    } else if (column->is_nullable()) {
+        auto* nullable =
+                const_cast<ColumnNullable*>(assert_cast<const 
ColumnNullable*>(column.get()));
+        auto* __restrict data = 
assert_cast<ColumnUInt8*>(nullable->get_nested_column_ptr().get())
+                                        ->get_data()
+                                        .data();
+        auto* __restrict null_map = 
const_cast<uint8_t*>(nullable->get_null_map_data().data());
+        for (size_t i = 0; i < rows; ++i) {
+            data[i] |= null_map[i];
+        }
+        memset(null_map, 0, rows);
+    } else if (argument != nullptr) {
+        const auto* __restrict null_map =
+                assert_cast<const 
ColumnNullable*>(argument.get())->get_null_map_data().data();
+        auto* __restrict data =
+                const_cast<ColumnUInt8*>(assert_cast<const 
ColumnUInt8*>(column.get()))
+                        ->get_data()
+                        .data();
+        for (size_t i = 0; i < rows; ++i) {
+            data[i] |= null_map[i];
+        }
+    }
+}
+
+inline size_t calculate_false_number(ColumnPtr column) {
+    size_t rows = column->size();
+    if (is_column_const(*column)) {
+        return calculate_false_number(
+                       assert_cast<const 
ColumnConst*>(column.get())->get_data_column_ptr()) *
+               rows;
+    } else if (column->is_nullable()) {
+        const auto* nullable = assert_cast<const 
ColumnNullable*>(column.get());
+        const auto* data = assert_cast<const 
ColumnUInt8*>(nullable->get_nested_column_ptr().get())
+                                   ->get_data()
+                                   .data();
+        const auto* __restrict null_map = nullable->get_null_map_data().data();
+        return simd::count_zero_num(reinterpret_cast<const int8_t* 
__restrict>(data), null_map,
+                                    rows);
+    } else {
+        const auto* data = assert_cast<const 
ColumnUInt8*>(column.get())->get_data().data();
+        return simd::count_zero_num(reinterpret_cast<const int8_t* 
__restrict>(data), rows);
+    }
+}
+
 } // namespace doris::vectorized
 
 namespace apache::thrift {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index bccd8a4962d..fbafe3d00d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1530,7 +1530,11 @@ public class OlapScanNode extends ScanNode {
                 .map(sortNode -> sortNode.getId().asInt())
                 .collect(Collectors.toList());
         if (!topnFilterSourceNodeIds.isEmpty()) {
-            
msg.olap_scan_node.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
+            if (SessionVariable.enablePipelineEngineX()) {
+                msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
+            } else {
+                
msg.olap_scan_node.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
+            }
         }
         msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift());
         String tableName = olapTable.getName();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index b0241b30c75..bb9a23b18f1 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1303,6 +1303,8 @@ struct TPlanNode {
   // Intermediate projections will not materialize into the output block.
   104: optional list<list<Exprs.TExpr>> intermediate_projections_list
   105: optional list<Types.TTupleId> intermediate_output_tuple_id_list
+
+  106: optional list<i32> topn_filter_source_node_ids
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to