This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0813d701224 [test](beut) add pipeline DistinctStreamingAggOperator beut (#48805) 0813d701224 is described below commit 0813d70122405d71d3ca320139eaaba571b468df Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Mon Mar 10 14:22:03 2025 +0800 [test](beut) add pipeline DistinctStreamingAggOperator beut (#48805) ### What problem does this PR solve? add pipeline DistinctStreamingAggOperator beut --- .../distinct_streaming_aggregation_operator.cpp | 88 +++------ .../exec/distinct_streaming_aggregation_operator.h | 20 ++- be/src/pipeline/exec/operator.h | 3 + ...istinct_streaming_aggregation_operator_test.cpp | 198 +++++++++++++++++++++ be/test/testutil/mock/mock_descriptors.h | 1 + be/test/testutil/mock/mock_slot_ref.cpp | 11 ++ be/test/testutil/mock/mock_slot_ref.h | 2 + 7 files changed, 250 insertions(+), 73 deletions(-) diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 24df662bb57..dc266dc9ae1 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -44,13 +44,13 @@ struct StreamingHtMinReductionEntry { // of the machine that we're running on. static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { // Expand up to L2 cache always. - {0, 0.0}, + {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0}, // Expand into L3 cache if we look like we're getting some reduction. // At present, The L2 cache is generally 1024k or more - {1024 * 1024, 0.0}, + {.min_ht_mem = 1024 * 1024, .streaming_ht_min_reduction = 0.0}, // Expand into main memory if we're getting a significant reduction. // The L3 cache is generally 16MB or more - {16 * 1024 * 1024, 2.0}, + {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0}, }; static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = @@ -85,15 +85,10 @@ Status DistinctStreamingAggLocalState::open(RuntimeState* state) { SCOPED_TIMER(Base::_open_timer); RETURN_IF_ERROR(Base::open(state)); auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>(); - for (auto& evaluator : p._aggregate_evaluators) { - _aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); - } _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); } - - DCHECK_EQ(p._total_size_of_aggregate_states, 0); RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs)); return Status::OK(); } @@ -192,15 +187,21 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( } } - size_t rows = in_block->rows(); + const size_t rows = in_block->rows(); _distinct_row.clear(); - _distinct_row.reserve(rows); + if (_parent->cast<DistinctStreamingAggOperatorX>()._is_streaming_preagg && low_memory_mode()) { _stop_emplace_flag = true; } if (!_stop_emplace_flag) { + // _distinct_row is used to calculate non-duplicate data in key_columns + // _emplace_into_hash_table_to_distinct will determine whether to continue inserting data into the hashmap + // If it decides not to insert data, it will set _stop_emplace_flag = true and _distinct_row will be empty + _distinct_row.reserve(rows); _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows); + DCHECK_LE(_distinct_row.size(), rows) + << "_distinct_row size should be less than or equal to rows"; } bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() && @@ -216,6 +217,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( } DCHECK_EQ(out_block->columns(), key_size); if (_stop_emplace_flag && _distinct_row.empty()) { + // If _stop_emplace_flag is true and _distinct_row is also empty, it means it is in streaming mode, outputting what is input // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1) for (int i = 0; i < key_size; ++i) { auto output_column = out_block->get_by_position(i).column; @@ -224,6 +226,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( } } else { DCHECK_EQ(_cache_block.rows(), 0); + // is output row > batch_size, split some to cache_block if (out_block->rows() + _distinct_row.size() > batch_size) { size_t split_size = batch_size - out_block->rows(); for (int i = 0; i < key_size; ++i) { @@ -243,6 +246,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( } } } else { + DCHECK(out_block->empty()) << "out_block must be empty , but rows is " << out_block->rows(); vectorized::ColumnsWithTypeAndName columns_with_schema; for (int i = 0; i < key_size; ++i) { if (_stop_emplace_flag) { @@ -320,7 +324,6 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i const DescriptorTbl& descs, bool require_bucket_distribution) : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, operator_id, descs), - _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _output_tuple_id(tnode.agg_node.output_tuple_id), _needs_finalize(tnode.agg_node.need_finalize), _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), @@ -328,8 +331,7 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i ? tnode.distribute_expr_lists[0] : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), - _require_bucket_distribution(require_bucket_distribution), - _without_key(tnode.agg_node.grouping_exprs.empty()) { + _require_bucket_distribution(require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { @@ -346,32 +348,22 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* RETURN_IF_ERROR( vectorized::VExpr::create_expr_trees(tnode.agg_node.grouping_exprs, _probe_expr_ctxs)); - // init aggregate functions - _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size()); - - TSortInfo dummy; - for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { - vectorized::AggFnEvaluator* evaluator = nullptr; - RETURN_IF_ERROR(vectorized::AggFnEvaluator::create( - _pool, tnode.agg_node.aggregate_functions[i], - tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, - tnode.agg_node.grouping_exprs.empty(), &evaluator)); - _aggregate_evaluators.push_back(evaluator); - } - _op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR"; return Status::OK(); } Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state)); - _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); - _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); - DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); + init_make_nullable(state); + return Status::OK(); +} - size_t j = _probe_expr_ctxs.size(); - for (size_t i = 0; i < j; ++i) { +void DistinctStreamingAggOperatorX::init_make_nullable(RuntimeState* state) { + _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + + for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable(); auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable(); if (nullable_output != nullable_input) { @@ -379,40 +371,6 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) { _make_nullable_keys.emplace_back(i); } } - for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) { - SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; - SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; - RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( - state, _child->row_desc(), intermediate_slot_desc, output_slot_desc)); - _aggregate_evaluators[i]->set_version(state->be_exec_version()); - } - - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - const auto& agg_function = _aggregate_evaluators[i]->function(); - _total_size_of_aggregate_states += agg_function->size_of_data(); - - // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. - if (i + 1 < _aggregate_evaluators.size()) { - size_t alignment_of_next_state = - _aggregate_evaluators[i + 1]->function()->align_of_data(); - if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0) { - return Status::RuntimeError("Logical error: align_of_data is not 2^N"); - } - - /// Extend total_size to next alignment requirement - /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. - _total_size_of_aggregate_states = - (_total_size_of_aggregate_states + alignment_of_next_state - 1) / - alignment_of_next_state * alignment_of_next_state; - } - } - RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); - - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state)); - } - - return Status::OK(); } Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_block, diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 1066ea37236..bf2be9d850b 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -74,7 +74,6 @@ private: const int batch_size; std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr; std::unique_ptr<DistinctDataVariants> _agg_data = nullptr; - std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr; @@ -97,6 +96,15 @@ class DistinctStreamingAggOperatorX final public: DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs, bool require_bucket_distribution); +#ifdef BE_TEST + DistinctStreamingAggOperatorX() + : _needs_finalize(false), + _is_first_phase(true), + _partition_exprs({}), + _is_colocate(false), + _require_bucket_distribution {false} {} +#endif + Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override; @@ -119,9 +127,7 @@ public: private: friend class DistinctStreamingAggLocalState; - TupleId _intermediate_tuple_id; - TupleDescriptor* _intermediate_tuple_desc = nullptr; - + void init_make_nullable(RuntimeState* state); TupleId _output_tuple_id; TupleDescriptor* _output_tuple_desc = nullptr; const bool _needs_finalize; @@ -131,12 +137,10 @@ private: const bool _require_bucket_distribution; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; - std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators; std::vector<size_t> _make_nullable_keys; - /// The total size of the row from the aggregate functions. - size_t _total_size_of_aggregate_states = 0; + + // If _is_streaming_preagg = true, deduplication will be abandoned in cases where the deduplication rate is low. bool _is_streaming_preagg = false; - const bool _without_key; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 8900d6fe844..8542f879980 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -1024,6 +1024,9 @@ public: StatefulOperatorX(ObjectPool* pool, const TPlanNode& tnode, const int operator_id, const DescriptorTbl& descs) : OperatorX<LocalStateType>(pool, tnode, operator_id, descs) {} +#ifdef BE_TEST + StatefulOperatorX() = default; +#endif virtual ~StatefulOperatorX() = default; using OperatorX<LocalStateType>::get_local_state; diff --git a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp new file mode 100644 index 00000000000..c1e92272739 --- /dev/null +++ b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/distinct_streaming_aggregation_operator.h" + +#include <gtest/gtest.h> + +#include <memory> + +#include "pipeline/exec/mock_operator.h" +#include "pipeline/operator/operator_helper.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_descriptors.h" +#include "testutil/mock/mock_slot_ref.h" +#include "vec/core/block.h" +namespace doris::pipeline { + +using namespace vectorized; +struct DistinctStreamingAggOperatorTest : public ::testing::Test { + void SetUp() override { + op = std::make_unique<DistinctStreamingAggOperatorX>(); + mock_op = std::make_shared<MockOperatorX>(); + state = std::make_shared<MockRuntimeState>(); + state->batsh_size = 10; + op->_child = mock_op; + } + + void create_op(DataTypes input_types, DataTypes output_types) { + op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(input_types); + + op->_output_tuple_id = 0; + output_desc_tbl = std::make_unique<MockDescriptorTbl>(output_types, &pool); + state->set_desc_tbl(output_desc_tbl.get()); + + op->init_make_nullable(state.get()); + + create_local_state(); + } + + void create_local_state() { + local_state_uptr = std::make_unique<DistinctStreamingAggLocalState>(state.get(), op.get()); + local_state = local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .le_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(local_state_uptr)); + EXPECT_TRUE(local_state->open(state.get())); + } + + RuntimeProfile profile {"test"}; + std::unique_ptr<DistinctStreamingAggOperatorX> op; + std::unique_ptr<MockDescriptorTbl> output_desc_tbl; + std::shared_ptr<MockOperatorX> mock_op; + + std::unique_ptr<DistinctStreamingAggLocalState> local_state_uptr; + + DistinctStreamingAggLocalState* local_state; + + std::shared_ptr<MockRuntimeState> state; + ObjectPool pool; +}; + +TEST_F(DistinctStreamingAggOperatorTest, test1) { + op->_is_streaming_preagg = false; + + create_op({std::make_shared<DataTypeInt64>()}, {std::make_shared<DataTypeInt64>()}); + + mock_op->_outout_blocks.push_back( + ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4})); + + { + bool eos = false; + Block block; + + auto st = op->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_TRUE(eos); + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4}))); + } +} + +TEST_F(DistinctStreamingAggOperatorTest, test2) { + op->_is_streaming_preagg = false; + op->_limit = 3; + create_op({std::make_shared<DataTypeInt64>()}, {std::make_shared<DataTypeInt64>()}); + + mock_op->_outout_blocks.push_back( + ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4})); + + { + bool eos = false; + Block block; + + auto st = op->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_TRUE(eos); + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3}))); + } +} + +TEST_F(DistinctStreamingAggOperatorTest, test3) { + // batch size = 10 + op->_is_streaming_preagg = true; + + create_op({std::make_shared<DataTypeInt64>()}, {std::make_shared<DataTypeInt64>()}); + + { + auto block = + ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}); + EXPECT_TRUE(op->push(state.get(), &block, false)); + EXPECT_EQ(local_state->_cache_block.rows(), 0); + EXPECT_EQ(local_state->_aggregated_block->rows(), 4); + EXPECT_TRUE(op->need_more_input_data(state.get())); + } + + { + auto block = ColumnHelper::create_block<DataTypeInt64>({5, 6, 7, 8}); + EXPECT_TRUE(op->push(state.get(), &block, false)); + EXPECT_EQ(local_state->_cache_block.rows(), 0); + EXPECT_EQ(local_state->_aggregated_block->rows(), 8); + EXPECT_TRUE(op->need_more_input_data(state.get())); + } + + { + auto block = ColumnHelper::create_block<DataTypeInt64>({9, 10, 11, 12}); + EXPECT_TRUE(op->push(state.get(), &block, false)); + EXPECT_EQ(local_state->_cache_block.rows(), 2); + EXPECT_EQ(local_state->_aggregated_block->rows(), 10); + EXPECT_FALSE(op->need_more_input_data(state.get())); + } + + { + Block block; + bool eos = false; + EXPECT_TRUE(op->pull(state.get(), &block, &eos)); + EXPECT_FALSE(eos); + EXPECT_EQ(local_state->_cache_block.rows(), 0); + EXPECT_EQ(local_state->_aggregated_block->rows(), 2); + } + { + local_state->_stop_emplace_flag = true; + auto block = ColumnHelper::create_block<DataTypeInt64>({13, 14, 15}); + EXPECT_TRUE(op->push(state.get(), &block, false)); + EXPECT_EQ(local_state->_cache_block.rows(), 0); + EXPECT_EQ(local_state->_aggregated_block->rows(), 5); + EXPECT_FALSE(op->need_more_input_data(state.get())); + } + { + Block block; + bool eos = false; + EXPECT_TRUE(op->pull(state.get(), &block, &eos)); + EXPECT_FALSE(eos); + EXPECT_EQ(block.rows(), 5); + EXPECT_EQ(local_state->_cache_block.rows(), 0); + EXPECT_EQ(local_state->_aggregated_block->rows(), 0); + } + { + EXPECT_TRUE(op->need_more_input_data(state.get())); + local_state->_stop_emplace_flag = true; + auto block = ColumnHelper::create_block<DataTypeInt64>({13, 14, 15}); + EXPECT_TRUE(op->push(state.get(), &block, false)); + EXPECT_EQ(local_state->_cache_block.rows(), 0); + EXPECT_EQ(local_state->_aggregated_block->rows(), 3); + EXPECT_FALSE(op->need_more_input_data(state.get())); + } + { + Block block; + bool eos = false; + EXPECT_TRUE(op->pull(state.get(), &block, &eos)); + EXPECT_FALSE(eos); + EXPECT_EQ(block.rows(), 3); + EXPECT_EQ(local_state->_cache_block.rows(), 0); + EXPECT_EQ(local_state->_aggregated_block->rows(), 0); + } + { EXPECT_TRUE(op->close(state.get())); } +} + +} // namespace doris::pipeline diff --git a/be/test/testutil/mock/mock_descriptors.h b/be/test/testutil/mock/mock_descriptors.h index ffb37edf424..198c821dbe9 100644 --- a/be/test/testutil/mock/mock_descriptors.h +++ b/be/test/testutil/mock/mock_descriptors.h @@ -73,6 +73,7 @@ public: auto* tuple_desc = pool->add(new MockTupleDescriptor()); tuple_desc->Slots = slots; tuple_descriptors.push_back(tuple_desc); + _tuple_desc_map[0] = tuple_desc; } MOCK_METHOD(std::vector<TupleDescriptor*>, get_tuple_descs, (), (const)); diff --git a/be/test/testutil/mock/mock_slot_ref.cpp b/be/test/testutil/mock/mock_slot_ref.cpp index 3b556b8ee5c..e758fb5e893 100644 --- a/be/test/testutil/mock/mock_slot_ref.cpp +++ b/be/test/testutil/mock/mock_slot_ref.cpp @@ -45,6 +45,17 @@ VExprContextSPtr MockSlotRef::create_mock_context(int column_id, DataTypePtr dat return ctx; } +VExprContextSPtrs MockSlotRef::create_mock_contexts(DataTypes data_types) { + VExprContextSPtrs ctxs; + for (int i = 0; i < data_types.size(); i++) { + auto ctx = VExprContext::create_shared(std::make_shared<MockSlotRef>(i, data_types[i])); + ctx->_prepared = true; + ctx->_opened = true; + ctxs.push_back(ctx); + } + return ctxs; +} + TEST(MockSlotRefTest, test) { auto old_ctx = MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>()); diff --git a/be/test/testutil/mock/mock_slot_ref.h b/be/test/testutil/mock/mock_slot_ref.h index 8e47fca38bf..79df5b67d58 100644 --- a/be/test/testutil/mock/mock_slot_ref.h +++ b/be/test/testutil/mock/mock_slot_ref.h @@ -68,6 +68,8 @@ public: static VExprContextSPtr create_mock_context(int column_id, DataTypePtr data_type); + static VExprContextSPtrs create_mock_contexts(DataTypes data_types); + private: const std::string _name = "MockSlotRef"; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org