This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 25d1934289be3337e8af04284ae1a655f7962126
Author: Pxl <pxl...@qq.com>
AuthorDate: Mon Mar 4 16:58:11 2024 +0800

    [Feature](topn) support multiple topn filter on backend (#31665)
    
    support multiple topn filter on backend
---
 be/src/olap/iterators.h                            |  1 +
 be/src/olap/rowset/beta_rowset_reader.cpp          |  1 +
 be/src/olap/rowset/rowset_reader_context.h         |  1 +
 be/src/olap/rowset/segment_v2/segment.cpp          | 32 ++++++++--------
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 43 ++++++++++++----------
 be/src/olap/tablet_reader.cpp                      |  9 +++--
 be/src/olap/tablet_reader.h                        |  1 +
 be/src/pipeline/exec/sort_sink_operator.cpp        |  6 +--
 be/src/pipeline/pipeline_fragment_context.cpp      | 15 ++++++--
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 13 +++++--
 be/src/runtime/fragment_mgr.cpp                    |  5 +--
 be/src/runtime/query_context.h                     | 16 +++++++-
 be/src/runtime/runtime_state.cpp                   |  3 ++
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  6 +++
 be/src/vec/exec/vsort_node.cpp                     |  6 +--
 be/src/vec/olap/vcollect_iterator.cpp              |  4 +-
 gensrc/thrift/PaloInternalService.thrift           |  1 +
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 18 files changed, 109 insertions(+), 55 deletions(-)

diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 5f7c32bf0d4..a7fb43cff58 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -100,6 +100,7 @@ public:
     bool record_rowids = false;
     // flag for enable topn opt
     bool use_topn_opt = false;
+    std::vector<int> topn_filter_source_node_ids;
     // used for special optimization for query : ORDER BY key DESC LIMIT n
     bool read_orderby_key_reverse = false;
     // columns for orderby keys
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 71002b69099..915b322bcaf 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -217,6 +217,7 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     _read_options.tablet_schema = _read_context->tablet_schema;
     _read_options.record_rowids = _read_context->record_rowids;
     _read_options.use_topn_opt = _read_context->use_topn_opt;
+    _read_options.topn_filter_source_node_ids = 
_read_context->topn_filter_source_node_ids;
     _read_options.read_orderby_key_reverse = 
_read_context->read_orderby_key_reverse;
     _read_options.read_orderby_key_columns = 
_read_context->read_orderby_key_columns;
     _read_options.io_ctx.reader_type = _read_context->reader_type;
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index 365f4a734f4..d5683924a9e 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -38,6 +38,7 @@ struct RowsetReaderContext {
     TabletSchemaSPtr tablet_schema = nullptr;
     // flag for enable topn opt
     bool use_topn_opt = false;
+    std::vector<int> topn_filter_source_node_ids;
     // whether rowset should return ordered rows.
     bool need_ordered_result = true;
     // used for special optimization for query : ORDER BY key DESC LIMIT n
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index 5cee6017e1f..9afb1429ac7 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -151,21 +151,23 @@ Status Segment::new_iterator(SchemaSPtr schema, const 
StorageReadOptions& read_o
     }
     if (read_options.use_topn_opt) {
         auto* query_ctx = read_options.runtime_state->get_query_ctx();
-        auto runtime_predicate = 
query_ctx->get_runtime_predicate().get_predicate();
-
-        int32_t uid =
-                
read_options.tablet_schema->column(runtime_predicate->column_id()).unique_id();
-        AndBlockColumnPredicate and_predicate;
-        and_predicate.add_column_predicate(
-                
SingleColumnBlockPredicate::create_unique(runtime_predicate.get()));
-        if (_column_readers.contains(uid) &&
-            can_apply_predicate_safely(runtime_predicate->column_id(), 
runtime_predicate.get(),
-                                       *schema, 
read_options.io_ctx.reader_type) &&
-            !_column_readers.at(uid)->match_condition(&and_predicate)) {
-            // any condition not satisfied, return.
-            *iter = std::make_unique<EmptySegmentIterator>(*schema);
-            read_options.stats->filtered_segment_number++;
-            return Status::OK();
+        for (int id : read_options.topn_filter_source_node_ids) {
+            auto runtime_predicate = 
query_ctx->get_runtime_predicate(id).get_predicate();
+
+            int32_t uid =
+                    
read_options.tablet_schema->column(runtime_predicate->column_id()).unique_id();
+            AndBlockColumnPredicate and_predicate;
+            and_predicate.add_column_predicate(
+                    
SingleColumnBlockPredicate::create_unique(runtime_predicate.get()));
+            if (_column_readers.contains(uid) &&
+                can_apply_predicate_safely(runtime_predicate->column_id(), 
runtime_predicate.get(),
+                                           *schema, 
read_options.io_ctx.reader_type) &&
+                !_column_readers.at(uid)->match_condition(&and_predicate)) {
+                // any condition not satisfied, return.
+                *iter = std::make_unique<EmptySegmentIterator>(*schema);
+                read_options.stats->filtered_segment_number++;
+                return Status::OK();
+            }
         }
     }
 
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index c9848f6f866..e8e1aa386a5 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -594,23 +594,25 @@ Status 
SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
         if (_opts.use_topn_opt) {
             
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_zonemap_ns);
             auto* query_ctx = _opts.runtime_state->get_query_ctx();
-            std::shared_ptr<doris::ColumnPredicate> runtime_predicate =
-                    query_ctx->get_runtime_predicate().get_predicate();
-            if 
(_segment->can_apply_predicate_safely(runtime_predicate->column_id(),
-                                                     runtime_predicate.get(), 
*_schema,
-                                                     
_opts.io_ctx.reader_type)) {
-                AndBlockColumnPredicate and_predicate;
-                and_predicate.add_column_predicate(
-                        
SingleColumnBlockPredicate::create_unique(runtime_predicate.get()));
-
-                RowRanges column_rp_row_ranges = 
RowRanges::create_single(num_rows());
-                
RETURN_IF_ERROR(_column_iterators[runtime_predicate->column_id()]
-                                        
->get_row_ranges_by_zone_map(&and_predicate, nullptr,
-                                                                     
&column_rp_row_ranges));
-
-                // intersect different columns's row ranges to get final row 
ranges by zone map
-                RowRanges::ranges_intersection(zone_map_row_ranges, 
column_rp_row_ranges,
-                                               &zone_map_row_ranges);
+            for (int id : _opts.topn_filter_source_node_ids) {
+                std::shared_ptr<doris::ColumnPredicate> runtime_predicate =
+                        query_ctx->get_runtime_predicate(id).get_predicate();
+                if 
(_segment->can_apply_predicate_safely(runtime_predicate->column_id(),
+                                                         
runtime_predicate.get(), *_schema,
+                                                         
_opts.io_ctx.reader_type)) {
+                    AndBlockColumnPredicate and_predicate;
+                    and_predicate.add_column_predicate(
+                            
SingleColumnBlockPredicate::create_unique(runtime_predicate.get()));
+
+                    RowRanges column_rp_row_ranges = 
RowRanges::create_single(num_rows());
+                    
RETURN_IF_ERROR(_column_iterators[runtime_predicate->column_id()]
+                                            
->get_row_ranges_by_zone_map(&and_predicate, nullptr,
+                                                                         
&column_rp_row_ranges));
+
+                    // intersect different columns's row ranges to get final 
row ranges by zone map
+                    RowRanges::ranges_intersection(zone_map_row_ranges, 
column_rp_row_ranges,
+                                                   &zone_map_row_ranges);
+                }
             }
         }
 
@@ -1507,8 +1509,11 @@ Status SegmentIterator::_vec_init_lazy_materialization() 
{
     //  all rows should be read, so runtime predicate will reduce rows for 
topn node
     if (_opts.use_topn_opt &&
         (_opts.read_orderby_key_columns == nullptr || 
_opts.read_orderby_key_columns->empty())) {
-        auto& runtime_predicate = 
_opts.runtime_state->get_query_ctx()->get_runtime_predicate();
-        _col_predicates.push_back(runtime_predicate.get_predicate().get());
+        for (int id : _opts.topn_filter_source_node_ids) {
+            auto& runtime_predicate =
+                    
_opts.runtime_state->get_query_ctx()->get_runtime_predicate(id);
+            _col_predicates.push_back(runtime_predicate.get_predicate().get());
+        }
     }
 
     // Step1: extract columns that can be lazy materialization
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index 86823ac36c7..f0229431b7b 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -234,6 +234,7 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params) {
     _reader_context.tablet_schema = _tablet_schema;
     _reader_context.need_ordered_result = need_ordered_result;
     _reader_context.use_topn_opt = read_params.use_topn_opt;
+    _reader_context.topn_filter_source_node_ids = 
read_params.topn_filter_source_node_ids;
     _reader_context.read_orderby_key_reverse = 
read_params.read_orderby_key_reverse;
     _reader_context.read_orderby_key_limit = 
read_params.read_orderby_key_limit;
     _reader_context.filter_block_conjuncts = 
read_params.filter_block_conjuncts;
@@ -574,9 +575,11 @@ void 
TabletReader::_init_conditions_param_except_leafnode_of_andnode(
     }
 
     if (read_params.use_topn_opt) {
-        auto& runtime_predicate =
-                
read_params.runtime_state->get_query_ctx()->get_runtime_predicate();
-        runtime_predicate.set_tablet_schema(_tablet_schema);
+        for (int id : read_params.topn_filter_source_node_ids) {
+            auto& runtime_predicate =
+                    
read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id);
+            runtime_predicate.set_tablet_schema(_tablet_schema);
+        }
     }
 }
 
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index ed57b9e3d1e..6a560fac0f8 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -161,6 +161,7 @@ public:
         bool record_rowids = false;
         // flag for enable topn opt
         bool use_topn_opt = false;
+        std::vector<int> topn_filter_source_node_ids;
         // used for special optimization for query : ORDER BY key LIMIT n
         bool read_orderby_key = false;
         // used for special optimization for query : ORDER BY key DESC LIMIT n
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 854b6f165ca..e86f4915de3 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -104,7 +104,7 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
                 }
                 for (auto* slot : tuple_desc->slots()) {
                     if (slot->id() == first_sort_slot.slot_id) {
-                        
RETURN_IF_ERROR(query_ctx->get_runtime_predicate().init(
+                        
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).init(
                                 slot->type().type, _nulls_first[0], 
_is_asc_order[0],
                                 slot->col_name()));
                         break;
@@ -112,7 +112,7 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
                 }
             }
         }
-        if (!query_ctx->get_runtime_predicate().inited()) {
+        if (!query_ctx->get_runtime_predicate(_node_id).inited()) {
             return Status::InternalError("runtime predicate is not properly 
initialized");
         }
     }
@@ -160,7 +160,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in
             vectorized::Field new_top = 
local_state._shared_state->sorter->get_top_value();
             if (!new_top.is_null() && new_top != local_state.old_top) {
                 auto* query_ctx = state->get_query_ctx();
-                
RETURN_IF_ERROR(query_ctx->get_runtime_predicate().update(new_top));
+                
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top));
                 local_state.old_top = std::move(new_top);
             }
         }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 82b18e8bfa8..b4c5646402e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -244,9 +244,18 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     _runtime_state = RuntimeState::create_unique(
             local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
             request.query_options, _query_ctx->query_globals, _exec_env, 
_query_ctx.get());
-    if (idx == 0 && local_params.__isset.runtime_filter_params) {
-        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
-                local_params.runtime_filter_params);
+    if (idx == 0) {
+        if (local_params.__isset.runtime_filter_params) {
+            if (local_params.__isset.runtime_filter_params) {
+                _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                        local_params.runtime_filter_params);
+            }
+        }
+        if (local_params.__isset.topn_filter_source_node_ids) {
+            
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+        } else {
+            _query_ctx->init_runtime_predicates({0});
+        }
     }
 
     _runtime_state->set_task_execution_context(shared_from_this());
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7362af1c0a0..733cdfe2b50 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -523,9 +523,16 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         // build local_runtime_filter_mgr for each instance
         runtime_filter_mgr =
                 std::make_unique<RuntimeFilterMgr>(request.query_id, 
filterparams.get());
-        if (i == 0 && local_params.__isset.runtime_filter_params) {
-            _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
-                    local_params.runtime_filter_params);
+        if (i == 0) {
+            if (local_params.__isset.runtime_filter_params) {
+                _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                        local_params.runtime_filter_params);
+            }
+            if (local_params.__isset.topn_filter_source_node_ids) {
+                
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
+            } else {
+                _query_ctx->init_runtime_predicates({0});
+            }
         }
         filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 29f4065ab3d..e7d4771d615 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -797,7 +797,6 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             SCOPED_RAW_TIMER(&duration_ns);
             auto prepare_st = context->prepare(params);
             if (!prepare_st.ok()) {
-                LOG(WARNING) << "Prepare failed: " << prepare_st.to_string();
                 context->close_if_prepare_failed();
                 return prepare_st;
             }
@@ -806,10 +805,10 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
         for (size_t i = 0; i < params.local_params.size(); i++) {
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
-            static_cast<void>(_runtimefilter_controller.add_entity(
+            RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
                     params.local_params[i], params.query_id, 
params.query_options, &handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state())));
-            if (i == 0 and handler) {
+            if (!i && handler) {
                 query_ctx->set_merge_controller_handler(handler);
             }
             const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index a7632a443bf..a7c855f4882 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -148,7 +148,19 @@ public:
         return _shared_scanner_controller;
     }
 
-    vectorized::RuntimePredicate& get_runtime_predicate() { return 
_runtime_predicate; }
+    vectorized::RuntimePredicate& get_runtime_predicate(int source_node_id) {
+        DCHECK(_runtime_predicates.contains(source_node_id) || 
_runtime_predicates.contains(0));
+        if (_runtime_predicates.contains(source_node_id)) {
+            return _runtime_predicates[source_node_id];
+        }
+        return _runtime_predicates[0];
+    }
+
+    void init_runtime_predicates(std::vector<int> source_node_ids) {
+        for (int id : source_node_ids) {
+            _runtime_predicates.try_emplace(id);
+        }
+    }
 
     Status set_task_group(taskgroup::TaskGroupPtr& tg);
 
@@ -274,7 +286,7 @@ private:
 
     std::shared_ptr<vectorized::SharedHashTableController> 
_shared_hash_table_controller;
     std::shared_ptr<vectorized::SharedScannerController> 
_shared_scanner_controller;
-    vectorized::RuntimePredicate _runtime_predicate;
+    std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
 
     taskgroup::TaskGroupPtr _task_group = nullptr;
     std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 5a32b37c40f..b04a7eed819 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -104,6 +104,9 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
         _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
                 fragment_exec_params.runtime_filter_params);
     }
+    if (_query_ctx) {
+        _query_ctx->init_runtime_predicates({0});
+    }
 }
 
 RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& 
query_id,
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index bc15cf7207f..a2a9293dee8 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -393,6 +393,12 @@ Status NewOlapScanner::_init_tablet_reader_params(
 
         // runtime predicate push down optimization for topn
         _tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt;
+        if (olap_scan_node.__isset.topn_filter_source_node_ids) {
+            _tablet_reader_params.topn_filter_source_node_ids =
+                    olap_scan_node.topn_filter_source_node_ids;
+        } else if (_tablet_reader_params.use_topn_opt) {
+            _tablet_reader_params.topn_filter_source_node_ids = {0};
+        }
     }
 
     // If this is a Two-Phase read query, and we need to delay the release of 
Rowset
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 848256a0aee..f49fbe99c64 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -92,7 +92,7 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
                 }
                 for (auto* slot : tuple_desc->slots()) {
                     if (slot->id() == first_sort_slot.slot_id) {
-                        
RETURN_IF_ERROR(query_ctx->get_runtime_predicate().init(
+                        
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).init(
                                 slot->type().type, _nulls_first[0], 
_is_asc_order[0],
                                 slot->col_name()));
                         break;
@@ -100,7 +100,7 @@ Status VSortNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
                 }
             }
         }
-        if (!query_ctx->get_runtime_predicate().inited()) {
+        if (!query_ctx->get_runtime_predicate(_id).inited()) {
             return Status::InternalError("runtime predicate is not properly 
initialized");
         }
     }
@@ -146,7 +146,7 @@ Status VSortNode::sink(RuntimeState* state, 
vectorized::Block* input_block, bool
             Field new_top = _sorter->get_top_value();
             if (!new_top.is_null() && new_top != old_top) {
                 auto* query_ctx = state->get_query_ctx();
-                
RETURN_IF_ERROR(query_ctx->get_runtime_predicate().update(new_top));
+                
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top));
                 old_top = std::move(new_top);
             }
         }
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 4eca53146ed..d8728cfa812 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -421,7 +421,9 @@ Status VCollectIterator::_topn_next(Block* block) {
 
                 // update orderby_extrems in query global context
                 auto* query_ctx = 
_reader->_reader_context.runtime_state->get_query_ctx();
-                
RETURN_IF_ERROR(query_ctx->get_runtime_predicate().update(new_top));
+                for (int id : 
_reader->_reader_context.topn_filter_source_node_ids) {
+                    
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(id).update(new_top));
+                }
             }
         } // end of while (read_rows < _topn_limit && !eof)
         VLOG_DEBUG << "topn debug rowset " << i << " read_rows=" << read_rows 
<< " eof=" << eof
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 8cc0a24a917..8cdf758f6dc 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -670,6 +670,7 @@ struct TPipelineInstanceParams {
   5: optional TRuntimeFilterParams runtime_filter_params
   6: optional i32 backend_num
   7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
+  8: optional list<i32> topn_filter_source_node_ids
 }
 
 // ExecPlanFragment
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 4c23a1afd97..172115ff0b4 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -701,6 +701,7 @@ struct TOlapScanNode {
   15: optional set<i32> output_column_unique_ids
   16: optional list<i32> distribute_column_ids
   17: optional i32 schema_version
+  18: optional list<i32> topn_filter_source_node_ids
 }
 
 struct TEqJoinCondition {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to