This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0813d701224 [test](beut) add pipeline DistinctStreamingAggOperator 
beut (#48805)
0813d701224 is described below

commit 0813d70122405d71d3ca320139eaaba571b468df
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Mon Mar 10 14:22:03 2025 +0800

    [test](beut) add pipeline DistinctStreamingAggOperator beut (#48805)
    
    ### What problem does this PR solve?
    add pipeline DistinctStreamingAggOperator beut
---
 .../distinct_streaming_aggregation_operator.cpp    |  88 +++------
 .../exec/distinct_streaming_aggregation_operator.h |  20 ++-
 be/src/pipeline/exec/operator.h                    |   3 +
 ...istinct_streaming_aggregation_operator_test.cpp | 198 +++++++++++++++++++++
 be/test/testutil/mock/mock_descriptors.h           |   1 +
 be/test/testutil/mock/mock_slot_ref.cpp            |  11 ++
 be/test/testutil/mock/mock_slot_ref.h              |   2 +
 7 files changed, 250 insertions(+), 73 deletions(-)

diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 24df662bb57..dc266dc9ae1 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -44,13 +44,13 @@ struct StreamingHtMinReductionEntry {
 // of the machine that we're running on.
 static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
         // Expand up to L2 cache always.
-        {0, 0.0},
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
         // Expand into L3 cache if we look like we're getting some reduction.
         // At present, The L2 cache is generally 1024k or more
-        {1024 * 1024, 0.0},
+        {.min_ht_mem = 1024 * 1024, .streaming_ht_min_reduction = 0.0},
         // Expand into main memory if we're getting a significant reduction.
         // The L3 cache is generally 16MB or more
-        {16 * 1024 * 1024, 2.0},
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
 };
 
 static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
@@ -85,15 +85,10 @@ Status DistinctStreamingAggLocalState::open(RuntimeState* 
state) {
     SCOPED_TIMER(Base::_open_timer);
     RETURN_IF_ERROR(Base::open(state));
     auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
-    for (auto& evaluator : p._aggregate_evaluators) {
-        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
-    }
     _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
     for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
     }
-
-    DCHECK_EQ(p._total_size_of_aggregate_states, 0);
     RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
     return Status::OK();
 }
@@ -192,15 +187,21 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
         }
     }
 
-    size_t rows = in_block->rows();
+    const size_t rows = in_block->rows();
     _distinct_row.clear();
-    _distinct_row.reserve(rows);
+
     if (_parent->cast<DistinctStreamingAggOperatorX>()._is_streaming_preagg && 
low_memory_mode()) {
         _stop_emplace_flag = true;
     }
 
     if (!_stop_emplace_flag) {
+        // _distinct_row is used to calculate non-duplicate data in key_columns
+        // _emplace_into_hash_table_to_distinct will determine whether to 
continue inserting data into the hashmap
+        // If it decides not to insert data, it will set _stop_emplace_flag = 
true and _distinct_row will be empty
+        _distinct_row.reserve(rows);
         _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
+        DCHECK_LE(_distinct_row.size(), rows)
+                << "_distinct_row size should be less than or equal to rows";
     }
 
     bool mem_reuse = 
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
@@ -216,6 +217,7 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
         }
         DCHECK_EQ(out_block->columns(), key_size);
         if (_stop_emplace_flag && _distinct_row.empty()) {
+            // If _stop_emplace_flag is true and _distinct_row is also empty, 
it means it is in streaming mode, outputting what is input
             // swap the column directly, to solve Check failed: 
d.column->use_count() == 1 (2 vs. 1)
             for (int i = 0; i < key_size; ++i) {
                 auto output_column = out_block->get_by_position(i).column;
@@ -224,6 +226,7 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
             }
         } else {
             DCHECK_EQ(_cache_block.rows(), 0);
+            // is output row > batch_size, split some to cache_block
             if (out_block->rows() + _distinct_row.size() > batch_size) {
                 size_t split_size = batch_size - out_block->rows();
                 for (int i = 0; i < key_size; ++i) {
@@ -243,6 +246,7 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
             }
         }
     } else {
+        DCHECK(out_block->empty()) << "out_block must be empty , but rows is " 
<< out_block->rows();
         vectorized::ColumnsWithTypeAndName columns_with_schema;
         for (int i = 0; i < key_size; ++i) {
             if (_stop_emplace_flag) {
@@ -320,7 +324,6 @@ 
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
                                                              const 
DescriptorTbl& descs,
                                                              bool 
require_bucket_distribution)
         : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, 
operator_id, descs),
-          _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
           _needs_finalize(tnode.agg_node.need_finalize),
           _is_first_phase(tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase),
@@ -328,8 +331,7 @@ 
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
                                    ? tnode.distribute_expr_lists[0]
                                    : tnode.agg_node.grouping_exprs),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
-          _require_bucket_distribution(require_bucket_distribution),
-          _without_key(tnode.agg_node.grouping_exprs.empty()) {
+          _require_bucket_distribution(require_bucket_distribution) {
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
         _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
         if (_is_streaming_preagg) {
@@ -346,32 +348,22 @@ Status DistinctStreamingAggOperatorX::init(const 
TPlanNode& tnode, RuntimeState*
     RETURN_IF_ERROR(
             
vectorized::VExpr::create_expr_trees(tnode.agg_node.grouping_exprs, 
_probe_expr_ctxs));
 
-    // init aggregate functions
-    _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size());
-
-    TSortInfo dummy;
-    for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
-        vectorized::AggFnEvaluator* evaluator = nullptr;
-        RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(
-                _pool, tnode.agg_node.aggregate_functions[i],
-                tnode.agg_node.__isset.agg_sort_infos ? 
tnode.agg_node.agg_sort_infos[i] : dummy,
-                tnode.agg_node.grouping_exprs.empty(), &evaluator));
-        _aggregate_evaluators.push_back(evaluator);
-    }
-
     _op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR";
     return Status::OK();
 }
 
 Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
     
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
-    _intermediate_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
-    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
-    DCHECK_EQ(_intermediate_tuple_desc->slots().size(), 
_output_tuple_desc->slots().size());
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
+    init_make_nullable(state);
+    return Status::OK();
+}
 
-    size_t j = _probe_expr_ctxs.size();
-    for (size_t i = 0; i < j; ++i) {
+void DistinctStreamingAggOperatorX::init_make_nullable(RuntimeState* state) {
+    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+
+    for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
         auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable();
         auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
         if (nullable_output != nullable_input) {
@@ -379,40 +371,6 @@ Status 
DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
             _make_nullable_keys.emplace_back(i);
         }
     }
-    for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
-        SlotDescriptor* intermediate_slot_desc = 
_intermediate_tuple_desc->slots()[j];
-        SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
-        RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
-                state, _child->row_desc(), intermediate_slot_desc, 
output_slot_desc));
-        _aggregate_evaluators[i]->set_version(state->be_exec_version());
-    }
-
-    for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
-        const auto& agg_function = _aggregate_evaluators[i]->function();
-        _total_size_of_aggregate_states += agg_function->size_of_data();
-
-        // If not the last aggregate_state, we need pad it so that next 
aggregate_state will be aligned.
-        if (i + 1 < _aggregate_evaluators.size()) {
-            size_t alignment_of_next_state =
-                    _aggregate_evaluators[i + 1]->function()->align_of_data();
-            if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 
0) {
-                return Status::RuntimeError("Logical error: align_of_data is 
not 2^N");
-            }
-
-            /// Extend total_size to next alignment requirement
-            /// Add padding by rounding up 'total_size_of_aggregate_states' to 
be a multiplier of alignment_of_next_state.
-            _total_size_of_aggregate_states =
-                    (_total_size_of_aggregate_states + alignment_of_next_state 
- 1) /
-                    alignment_of_next_state * alignment_of_next_state;
-        }
-    }
-    RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
-
-    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state));
-    }
-
-    return Status::OK();
 }
 
 Status DistinctStreamingAggOperatorX::push(RuntimeState* state, 
vectorized::Block* in_block,
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 1066ea37236..bf2be9d850b 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -74,7 +74,6 @@ private:
     const int batch_size;
     std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
     std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
-    std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
     std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
@@ -97,6 +96,15 @@ class DistinctStreamingAggOperatorX final
 public:
     DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
                                   const DescriptorTbl& descs, bool 
require_bucket_distribution);
+#ifdef BE_TEST
+    DistinctStreamingAggOperatorX()
+            : _needs_finalize(false),
+              _is_first_phase(true),
+              _partition_exprs({}),
+              _is_colocate(false),
+              _require_bucket_distribution {false} {}
+#endif
+
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
     Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) 
const override;
@@ -119,9 +127,7 @@ public:
 
 private:
     friend class DistinctStreamingAggLocalState;
-    TupleId _intermediate_tuple_id;
-    TupleDescriptor* _intermediate_tuple_desc = nullptr;
-
+    void init_make_nullable(RuntimeState* state);
     TupleId _output_tuple_id;
     TupleDescriptor* _output_tuple_desc = nullptr;
     const bool _needs_finalize;
@@ -131,12 +137,10 @@ private:
     const bool _require_bucket_distribution;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
-    std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
     std::vector<size_t> _make_nullable_keys;
-    /// The total size of the row from the aggregate functions.
-    size_t _total_size_of_aggregate_states = 0;
+
+    // If _is_streaming_preagg = true, deduplication will be abandoned in 
cases where the deduplication rate is low.
     bool _is_streaming_preagg = false;
-    const bool _without_key;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 8900d6fe844..8542f879980 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -1024,6 +1024,9 @@ public:
     StatefulOperatorX(ObjectPool* pool, const TPlanNode& tnode, const int 
operator_id,
                       const DescriptorTbl& descs)
             : OperatorX<LocalStateType>(pool, tnode, operator_id, descs) {}
+#ifdef BE_TEST
+    StatefulOperatorX() = default;
+#endif
     virtual ~StatefulOperatorX() = default;
 
     using OperatorX<LocalStateType>::get_local_state;
diff --git 
a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp 
b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
new file mode 100644
index 00000000000..c1e92272739
--- /dev/null
+++ b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/distinct_streaming_aggregation_operator.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/operator/operator_helper.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_slot_ref.h"
+#include "vec/core/block.h"
+namespace doris::pipeline {
+
+using namespace vectorized;
+struct DistinctStreamingAggOperatorTest : public ::testing::Test {
+    void SetUp() override {
+        op = std::make_unique<DistinctStreamingAggOperatorX>();
+        mock_op = std::make_shared<MockOperatorX>();
+        state = std::make_shared<MockRuntimeState>();
+        state->batsh_size = 10;
+        op->_child = mock_op;
+    }
+
+    void create_op(DataTypes input_types, DataTypes output_types) {
+        op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(input_types);
+
+        op->_output_tuple_id = 0;
+        output_desc_tbl = std::make_unique<MockDescriptorTbl>(output_types, 
&pool);
+        state->set_desc_tbl(output_desc_tbl.get());
+
+        op->init_make_nullable(state.get());
+
+        create_local_state();
+    }
+
+    void create_local_state() {
+        local_state_uptr = 
std::make_unique<DistinctStreamingAggLocalState>(state.get(), op.get());
+        local_state = local_state_uptr.get();
+        LocalStateInfo info {.parent_profile = &profile,
+                             .scan_ranges = {},
+                             .shared_state = nullptr,
+                             .le_state_map = {},
+                             .task_idx = 0};
+        EXPECT_TRUE(local_state->init(state.get(), info));
+        state->resize_op_id_to_local_state(-100);
+        state->emplace_local_state(op->operator_id(), 
std::move(local_state_uptr));
+        EXPECT_TRUE(local_state->open(state.get()));
+    }
+
+    RuntimeProfile profile {"test"};
+    std::unique_ptr<DistinctStreamingAggOperatorX> op;
+    std::unique_ptr<MockDescriptorTbl> output_desc_tbl;
+    std::shared_ptr<MockOperatorX> mock_op;
+
+    std::unique_ptr<DistinctStreamingAggLocalState> local_state_uptr;
+
+    DistinctStreamingAggLocalState* local_state;
+
+    std::shared_ptr<MockRuntimeState> state;
+    ObjectPool pool;
+};
+
+TEST_F(DistinctStreamingAggOperatorTest, test1) {
+    op->_is_streaming_preagg = false;
+
+    create_op({std::make_shared<DataTypeInt64>()}, 
{std::make_shared<DataTypeInt64>()});
+
+    mock_op->_outout_blocks.push_back(
+            ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 3, 4, 
1, 2, 3, 4}));
+
+    {
+        bool eos = false;
+        Block block;
+
+        auto st = op->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st) << st.msg();
+        EXPECT_TRUE(eos);
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 
4})));
+    }
+}
+
+TEST_F(DistinctStreamingAggOperatorTest, test2) {
+    op->_is_streaming_preagg = false;
+    op->_limit = 3;
+    create_op({std::make_shared<DataTypeInt64>()}, 
{std::make_shared<DataTypeInt64>()});
+
+    mock_op->_outout_blocks.push_back(
+            ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 3, 4, 
1, 2, 3, 4}));
+
+    {
+        bool eos = false;
+        Block block;
+
+        auto st = op->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st) << st.msg();
+        EXPECT_TRUE(eos);
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+    }
+}
+
+TEST_F(DistinctStreamingAggOperatorTest, test3) {
+    // batch size  = 10
+    op->_is_streaming_preagg = true;
+
+    create_op({std::make_shared<DataTypeInt64>()}, 
{std::make_shared<DataTypeInt64>()});
+
+    {
+        auto block =
+                ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 
3, 4, 1, 2, 3, 4});
+        EXPECT_TRUE(op->push(state.get(), &block, false));
+        EXPECT_EQ(local_state->_cache_block.rows(), 0);
+        EXPECT_EQ(local_state->_aggregated_block->rows(), 4);
+        EXPECT_TRUE(op->need_more_input_data(state.get()));
+    }
+
+    {
+        auto block = ColumnHelper::create_block<DataTypeInt64>({5, 6, 7, 8});
+        EXPECT_TRUE(op->push(state.get(), &block, false));
+        EXPECT_EQ(local_state->_cache_block.rows(), 0);
+        EXPECT_EQ(local_state->_aggregated_block->rows(), 8);
+        EXPECT_TRUE(op->need_more_input_data(state.get()));
+    }
+
+    {
+        auto block = ColumnHelper::create_block<DataTypeInt64>({9, 10, 11, 
12});
+        EXPECT_TRUE(op->push(state.get(), &block, false));
+        EXPECT_EQ(local_state->_cache_block.rows(), 2);
+        EXPECT_EQ(local_state->_aggregated_block->rows(), 10);
+        EXPECT_FALSE(op->need_more_input_data(state.get()));
+    }
+
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(op->pull(state.get(), &block, &eos));
+        EXPECT_FALSE(eos);
+        EXPECT_EQ(local_state->_cache_block.rows(), 0);
+        EXPECT_EQ(local_state->_aggregated_block->rows(), 2);
+    }
+    {
+        local_state->_stop_emplace_flag = true;
+        auto block = ColumnHelper::create_block<DataTypeInt64>({13, 14, 15});
+        EXPECT_TRUE(op->push(state.get(), &block, false));
+        EXPECT_EQ(local_state->_cache_block.rows(), 0);
+        EXPECT_EQ(local_state->_aggregated_block->rows(), 5);
+        EXPECT_FALSE(op->need_more_input_data(state.get()));
+    }
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(op->pull(state.get(), &block, &eos));
+        EXPECT_FALSE(eos);
+        EXPECT_EQ(block.rows(), 5);
+        EXPECT_EQ(local_state->_cache_block.rows(), 0);
+        EXPECT_EQ(local_state->_aggregated_block->rows(), 0);
+    }
+    {
+        EXPECT_TRUE(op->need_more_input_data(state.get()));
+        local_state->_stop_emplace_flag = true;
+        auto block = ColumnHelper::create_block<DataTypeInt64>({13, 14, 15});
+        EXPECT_TRUE(op->push(state.get(), &block, false));
+        EXPECT_EQ(local_state->_cache_block.rows(), 0);
+        EXPECT_EQ(local_state->_aggregated_block->rows(), 3);
+        EXPECT_FALSE(op->need_more_input_data(state.get()));
+    }
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(op->pull(state.get(), &block, &eos));
+        EXPECT_FALSE(eos);
+        EXPECT_EQ(block.rows(), 3);
+        EXPECT_EQ(local_state->_cache_block.rows(), 0);
+        EXPECT_EQ(local_state->_aggregated_block->rows(), 0);
+    }
+    { EXPECT_TRUE(op->close(state.get())); }
+}
+
+} // namespace doris::pipeline
diff --git a/be/test/testutil/mock/mock_descriptors.h 
b/be/test/testutil/mock/mock_descriptors.h
index ffb37edf424..198c821dbe9 100644
--- a/be/test/testutil/mock/mock_descriptors.h
+++ b/be/test/testutil/mock/mock_descriptors.h
@@ -73,6 +73,7 @@ public:
         auto* tuple_desc = pool->add(new MockTupleDescriptor());
         tuple_desc->Slots = slots;
         tuple_descriptors.push_back(tuple_desc);
+        _tuple_desc_map[0] = tuple_desc;
     }
 
     MOCK_METHOD(std::vector<TupleDescriptor*>, get_tuple_descs, (), (const));
diff --git a/be/test/testutil/mock/mock_slot_ref.cpp 
b/be/test/testutil/mock/mock_slot_ref.cpp
index 3b556b8ee5c..e758fb5e893 100644
--- a/be/test/testutil/mock/mock_slot_ref.cpp
+++ b/be/test/testutil/mock/mock_slot_ref.cpp
@@ -45,6 +45,17 @@ VExprContextSPtr MockSlotRef::create_mock_context(int 
column_id, DataTypePtr dat
     return ctx;
 }
 
+VExprContextSPtrs MockSlotRef::create_mock_contexts(DataTypes data_types) {
+    VExprContextSPtrs ctxs;
+    for (int i = 0; i < data_types.size(); i++) {
+        auto ctx = 
VExprContext::create_shared(std::make_shared<MockSlotRef>(i, data_types[i]));
+        ctx->_prepared = true;
+        ctx->_opened = true;
+        ctxs.push_back(ctx);
+    }
+    return ctxs;
+}
+
 TEST(MockSlotRefTest, test) {
     auto old_ctx = 
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
 
diff --git a/be/test/testutil/mock/mock_slot_ref.h 
b/be/test/testutil/mock/mock_slot_ref.h
index 8e47fca38bf..79df5b67d58 100644
--- a/be/test/testutil/mock/mock_slot_ref.h
+++ b/be/test/testutil/mock/mock_slot_ref.h
@@ -68,6 +68,8 @@ public:
 
     static VExprContextSPtr create_mock_context(int column_id, DataTypePtr 
data_type);
 
+    static VExprContextSPtrs create_mock_contexts(DataTypes data_types);
+
 private:
     const std::string _name = "MockSlotRef";
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to