This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e9cef9e71f9 [test](beut) add pipeline UnionOperator beut (#48984) e9cef9e71f9 is described below commit e9cef9e71f9a464f9807f93f95962aa43c7dd830 Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Mon Mar 17 14:09:15 2025 +0800 [test](beut) add pipeline UnionOperator beut (#48984) ### What problem does this PR solve? add pipeline UnionOperator beut --- be/src/pipeline/exec/union_sink_operator.cpp | 64 +++-- be/src/pipeline/exec/union_sink_operator.h | 61 +---- be/src/pipeline/exec/union_source_operator.cpp | 101 ++++---- be/src/pipeline/exec/union_source_operator.h | 59 ++--- be/src/vec/exprs/vliteral.h | 4 + be/test/pipeline/operator/operator_helper.h | 18 ++ be/test/pipeline/operator/union_operator_test.cpp | 296 ++++++++++++++++++++++ be/test/testutil/column_helper.h | 11 + be/test/testutil/mock/mock_descriptors.h | 1 + be/test/testutil/mock/mock_literal_expr.cpp | 47 ++++ be/test/testutil/mock/mock_literal_expr.h | 83 ++++++ 11 files changed, 579 insertions(+), 166 deletions(-) diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 4bbb5eba3e3..8de21c6ac21 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -65,27 +65,16 @@ UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int sink_id, int dest_id, O Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); DCHECK(tnode.__isset.union_node); - { - // Create result_expr_ctx_lists_ from thrift exprs. - auto& result_texpr_lists = tnode.union_node.result_expr_lists; - auto& texprs = result_texpr_lists[_cur_child_id]; - vectorized::VExprContextSPtrs ctxs; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); - _child_expr = ctxs; - } + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( + tnode.union_node.result_expr_lists[_cur_child_id], _child_expr)); return Status::OK(); } Status UnionSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); - RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _row_descriptor)); - // open const expr lists. - RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state)); - - // open result expr lists. RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); - + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, row_descriptor())); return Status::OK(); } @@ -100,38 +89,45 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block local_state._output_block = local_state._shared_state->data_queue.get_free_block(_cur_child_id); } - if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through + if (is_child_passthrough(_cur_child_id)) { + //pass_through without expr if (in_block->rows() > 0) { local_state._output_block->swap(*in_block); local_state._shared_state->data_queue.push_block(std::move(local_state._output_block), _cur_child_id); } - } else if (_get_first_materialized_child_idx() != children_count() && - _cur_child_id < children_count()) { //need materialized - RETURN_IF_ERROR(materialize_child_block(state, _cur_child_id, in_block, - local_state._output_block.get())); } else { - return Status::InternalError("maybe can't reach here, execute const expr: {}, {}, {}", - _cur_child_id, _get_first_materialized_child_idx(), - children_count()); - } - if (UNLIKELY(eos)) { - //if _cur_child_id eos, need check to push block - //Now here can't check _output_block rows, even it's row==0, also need push block - //because maybe sink is eos and queue have none data, if not push block - //the source can't can_read again and can't set source finished - if (local_state._output_block) { + RETURN_IF_ERROR(materialize_child_block(state, in_block, local_state._output_block.get())); + if (local_state._output_block->rows() > 0) { local_state._shared_state->data_queue.push_block(std::move(local_state._output_block), _cur_child_id); } - + } + if (UNLIKELY(eos)) { + // set_finish will set source ready local_state._shared_state->data_queue.set_finish(_cur_child_id); return Status::OK(); } - // not eos and block rows is enough to output,so push block - if (local_state._output_block && (local_state._output_block->rows() >= state->batch_size())) { - local_state._shared_state->data_queue.push_block(std::move(local_state._output_block), - _cur_child_id); + return Status::OK(); +} + +Status UnionSinkOperatorX::materialize_child_block(RuntimeState* state, + vectorized::Block* input_block, + vectorized::Block* output_block) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state._expr_timer); + if (input_block->rows() > 0) { + vectorized::MutableBlock mutable_block = + vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, + row_descriptor()); + vectorized::ColumnsWithTypeAndName colunms; + const auto& child_exprs = local_state._child_expr; + for (const auto& child_expr : child_exprs) { + int result_column_id = -1; + RETURN_IF_ERROR(child_expr->execute(input_block, &result_column_id)); + colunms.emplace_back(input_block->get_by_position(result_column_id)); + } + RETURN_IF_ERROR(mutable_block.merge(vectorized::Block {colunms})); } return Status::OK(); } diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 170b99f12f1..00a13bb9e8e 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -36,8 +36,7 @@ class UnionSinkOperatorX; class UnionSinkLocalState final : public PipelineXSinkLocalState<UnionSharedState> { public: ENABLE_FACTORY_CREATOR(UnionSinkLocalState); - UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state), _child_row_idx(0) {} + UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; friend class UnionSinkOperatorX; @@ -47,25 +46,24 @@ public: private: std::unique_ptr<vectorized::Block> _output_block; - /// Const exprs materialized by this node. These exprs don't refer to any children. - /// Only materialized by the first fragment instance to avoid duplication. - vectorized::VExprContextSPtrs _const_expr; - /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. vectorized::VExprContextSPtrs _child_expr; - - /// Index of current row in child_row_block_. - int _child_row_idx; RuntimeProfile::Counter* _expr_timer = nullptr; }; -class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> { +class UnionSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<UnionSinkLocalState> { public: using Base = DataSinkOperatorX<UnionSinkLocalState>; friend class UnionSinkLocalState; UnionSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); +#ifdef BE_TEST + UnionSinkOperatorX(int child_size, int cur_child_id, int first_materialized_child_idx) + : _first_materialized_child_idx(first_materialized_child_idx), + _cur_child_id(cur_child_id), + _child_size(child_size) {} +#endif ~UnionSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TDataSink", @@ -102,13 +100,9 @@ public: bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } -private: - int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } - - /// Const exprs materialized by this node. These exprs don't refer to any children. - /// Only materialized by the first fragment instance to avoid duplication. - vectorized::VExprContextSPtrs _const_expr; + MOCK_FUNCTION const RowDescriptor& row_descriptor() { return _row_descriptor; } +private: /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. vectorized::VExprContextSPtrs _child_expr; /// Index of the first non-passthrough child; i.e. a child that needs materialization. @@ -119,42 +113,13 @@ private: const RowDescriptor _row_descriptor; const int _cur_child_id; const int _child_size; - int children_count() const { return _child_size; } + bool is_child_passthrough(int child_idx) const { DCHECK_LT(child_idx, _child_size); return child_idx < _first_materialized_child_idx; } - Status materialize_child_block(RuntimeState* state, int child_id, - vectorized::Block* input_block, - vectorized::Block* output_block) { - DCHECK_LT(child_id, _child_size); - DCHECK(!is_child_passthrough(child_id)); - if (input_block->rows() > 0) { - vectorized::MutableBlock mblock = - vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, - _row_descriptor); - vectorized::Block res; - RETURN_IF_ERROR(materialize_block(state, input_block, child_id, &res)); - RETURN_IF_ERROR(mblock.merge(res)); - } - return Status::OK(); - } - - Status materialize_block(RuntimeState* state, vectorized::Block* src_block, int child_idx, - vectorized::Block* res_block) { - auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state._expr_timer); - const auto& child_exprs = local_state._child_expr; - vectorized::ColumnsWithTypeAndName colunms; - for (size_t i = 0; i < child_exprs.size(); ++i) { - int result_column_id = -1; - RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id)); - colunms.emplace_back(src_block->get_by_position(result_column_id)); - } - local_state._child_row_idx += src_block->rows(); - *res_block = {colunms}; - return Status::OK(); - } + Status materialize_child_block(RuntimeState* state, vectorized::Block* input_block, + vectorized::Block* output_block); }; } // namespace pipeline diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index b381e1f2712..fb98f4c0ece 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -17,6 +17,7 @@ #include "pipeline/exec/union_source_operator.h" +#include <algorithm> #include <functional> #include <utility> @@ -55,31 +56,29 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { return Status::OK(); } -Status UnionSourceLocalState::open(RuntimeState* state) { - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); - RETURN_IF_ERROR(Base::open(state)); - - auto& p = _parent->cast<Parent>(); - // Const exprs materialized by this node. These exprs don't refer to any children. - // Only materialized by the first fragment instance to avoid duplication. - if (state->per_fragment_instance_idx() == 0) { - auto clone_expr_list = [&](vectorized::VExprContextSPtrs& cur_expr_list, - vectorized::VExprContextSPtrs& other_expr_list) { - cur_expr_list.resize(other_expr_list.size()); - for (int i = 0; i < cur_expr_list.size(); i++) { - RETURN_IF_ERROR(other_expr_list[i]->clone(state, cur_expr_list[i])); - } - return Status::OK(); - }; - _const_expr_lists.resize(p._const_expr_lists.size()); - for (int i = 0; i < _const_expr_lists.size(); i++) { - auto& _const_expr_list = _const_expr_lists[i]; - auto& other_expr_list = p._const_expr_lists[i]; - RETURN_IF_ERROR(clone_expr_list(_const_expr_list, other_expr_list)); - } +Status UnionSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + for (const auto& texprs : tnode.union_node.const_expr_lists) { + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _const_expr_lists.push_back(ctxs); + } + if (!std::ranges::all_of(_const_expr_lists, [&](const auto& exprs) { + return exprs.size() == _const_expr_lists.front().size(); + })) { + return Status::InternalError("Const expr lists size not match"); } + return Status::OK(); +} +Status UnionSourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state, row_descriptor())); + } + for (const auto& exprs : _const_expr_lists) { + RETURN_IF_ERROR(vectorized::VExpr::open(exprs, state)); + } return Status::OK(); } @@ -100,15 +99,19 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b // the eos check of union operator is complex, need check all logical if you want modify // could ref this PR: https://github.com/apache/doris/pull/29677 // have executing const expr, queue have no data anymore, and child could be closed - if (_child_size == 0 && !local_state._need_read_for_const_expr) { - *eos = true; - } else if (_has_data(state)) { + if (_child_size == 0) { + // If _child_size == 0, eos = true will only be returned when all constant expressions are executed + *eos = !local_state._need_read_for_const_expr; + } else if (has_data(state)) { + // data queue still has data, return eos = false *eos = false; } else if (local_state._shared_state->data_queue.is_all_finish()) { // Here, check the value of `_has_data(state)` again after `data_queue.is_all_finish()` is TRUE // as there may be one or more blocks when `data_queue.is_all_finish()` is TRUE. - *eos = !_has_data(state); + *eos = !has_data(state); } else { + // At this point, the data queue has no data, but the sink is not all finished, return eos = false + // (this situation may be because the source consumes too fast) *eos = false; } }}; @@ -128,7 +131,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b return Status::OK(); } block->swap(*output_block); - output_block->clear_column_data(_row_descriptor.num_materialized_slots()); + output_block->clear_column_data(row_descriptor().num_materialized_slots()); local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx); } local_state.reached_limit(block, eos); @@ -137,46 +140,34 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Block* block) { DCHECK_EQ(state->per_fragment_instance_idx(), 0); - auto& local_state = state->get_local_state(operator_id())->cast<UnionSourceLocalState>(); + auto& local_state = get_local_state(state); DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size()); SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); - auto& _const_expr_list_idx = local_state._const_expr_list_idx; - vectorized::MutableBlock mblock = - vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor); - for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() < state->batch_size(); - ++_const_expr_list_idx) { + auto& const_expr_list_idx = local_state._const_expr_list_idx; + vectorized::MutableBlock mutable_block = + vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, row_descriptor()); + for (; const_expr_list_idx < _const_expr_lists.size() && + mutable_block.rows() < state->batch_size(); + ++const_expr_list_idx) { vectorized::Block tmp_block; + // When we execute a constant expression, we need one row of data because the expr may use the block's rows for some judgments tmp_block.insert({vectorized::ColumnUInt8::create(1), std::make_shared<vectorized::DataTypeUInt8>(), ""}); - int const_expr_lists_size = cast_set<int>(_const_expr_lists[_const_expr_list_idx].size()); - if (_const_expr_list_idx && const_expr_lists_size != _const_expr_lists[0].size()) { - return Status::InternalError( - "[UnionNode]const expr at {}'s count({}) not matched({} expected)", - _const_expr_list_idx, const_expr_lists_size, _const_expr_lists[0].size()); - } - - std::vector<int> result_list(const_expr_lists_size); - for (size_t i = 0; i < const_expr_lists_size; ++i) { - RETURN_IF_ERROR(_const_expr_lists[_const_expr_list_idx][i]->execute(&tmp_block, - &result_list[i])); - } - tmp_block.erase_not_in(result_list); - if (tmp_block.columns() != mblock.columns()) { - return Status::InternalError( - "[UnionNode]columns count of const expr block not matched ({} vs {})", - tmp_block.columns(), mblock.columns()); - } - if (tmp_block.rows() > 0) { - RETURN_IF_ERROR(mblock.merge(tmp_block)); - tmp_block.clear(); + vectorized::ColumnsWithTypeAndName colunms; + for (auto& expr : _const_expr_lists[const_expr_list_idx]) { + int result_column_id = -1; + RETURN_IF_ERROR(expr->execute(&tmp_block, &result_column_id)); + colunms.emplace_back(tmp_block.get_by_position(result_column_id)); } + RETURN_IF_ERROR(mutable_block.merge(vectorized::Block {colunms})); } // some insert query like "insert into string_test select 1, repeat('a', 1024 * 1024);" // the const expr will be in output expr cause the union node return a empty block. so here we // need add one row to make sure the union node exec const expr return at least one row + /// TODO: maybe we can remove this if (block->rows() == 0) { block->insert({vectorized::ColumnUInt8::create(1), std::make_shared<vectorized::DataTypeUInt8>(), ""}); diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 6619b623ef5..3985cf3910e 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -43,7 +43,6 @@ public: UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; Status init(RuntimeState* state, LocalStateInfo& info) override; - Status open(RuntimeState* state) override; [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; @@ -52,49 +51,50 @@ private: friend class OperatorX<UnionSourceLocalState>; bool _need_read_for_const_expr {true}; int _const_expr_list_idx {0}; - std::vector<vectorized::VExprContextSPtrs> _const_expr_lists; // If this operator has no children, there is no shared state which owns dependency. So we // use this local state to hold this dependency. DependencySPtr _only_const_dependency = nullptr; }; -class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> { +/* +There are two cases for union node: one is only constant expressions, and the other is having other child nodes besides constant expressions. +Unlike other union operators, the union node only merges data without deduplication. + +| 0:VUNION(66) | +| constant exprs: | +| 1 | 2 | 3 | 4 | +| 5 | 6 | 7 | 8 | +| tuple ids: 0 | + +| 4:VUNION(179) | +| | constant exprs: | +| | 1 | 2 | 3 | 4 | +| | 5 | 6 | 7 | 8 | +| | child exprs: | +| | k1[#0] | k2[#1] | k3[#2] | k4[#3] | +| | k1[#4] | k2[#5] | k3[#6] | k4[#7] | +| | tuple ids: 2 | +*/ + +class UnionSourceOperatorX MOCK_REMOVE(final) : public OperatorX<UnionSourceLocalState> { public: using Base = OperatorX<UnionSourceLocalState>; UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) : Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {} + +#ifdef BE_TEST + UnionSourceOperatorX(int child_size) : _child_size(child_size) {} +#endif ~UnionSourceOperatorX() override = default; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; bool is_source() const override { return true; } - Status init(const TPlanNode& tnode, RuntimeState* state) override { - RETURN_IF_ERROR(Base::init(tnode, state)); - DCHECK(tnode.__isset.union_node); - // Create const_expr_ctx_lists_ from thrift exprs. - auto& const_texpr_lists = tnode.union_node.const_expr_lists; - for (auto& texprs : const_texpr_lists) { - vectorized::VExprContextSPtrs ctxs; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); - _const_expr_lists.push_back(ctxs); - } - return Status::OK(); - } + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; - Status prepare(RuntimeState* state) override { - static_cast<void>(Base::prepare(state)); - // Prepare const expr lists. - for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) { - RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state, _row_descriptor)); - } - // open const expr lists. - for (const auto& exprs : _const_expr_lists) { - RETURN_IF_ERROR(vectorized::VExpr::open(exprs, state)); - } - return Status::OK(); - } [[nodiscard]] int get_child_count() const { return _child_size; } bool require_shuffled_data_distribution() const override { return _followed_by_shuffled_operator; @@ -114,7 +114,7 @@ public: } private: - bool _has_data(RuntimeState* state) const { + bool has_data(RuntimeState* state) const { auto& local_state = get_local_state(state); if (_child_size == 0) { return local_state._need_read_for_const_expr; @@ -122,9 +122,10 @@ private: return local_state._shared_state->data_queue.remaining_has_data(); } bool has_more_const(RuntimeState* state) const { + // For constant expressions, only one instance will execute the expression auto& local_state = get_local_state(state); return state->per_fragment_instance_idx() == 0 && - local_state._const_expr_list_idx < local_state._const_expr_lists.size(); + local_state._const_expr_list_idx < _const_expr_lists.size(); } friend class UnionSourceLocalState; const int _child_size; diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h index 3d9c948ec88..8efb5677cb0 100644 --- a/be/src/vec/exprs/vliteral.h +++ b/be/src/vec/exprs/vliteral.h @@ -43,6 +43,10 @@ public: } } +#ifdef BE_TEST + VLiteral() = default; +#endif + Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; Status execute(VExprContext* context, Block* block, int* result_column_id) override; diff --git a/be/test/pipeline/operator/operator_helper.h b/be/test/pipeline/operator/operator_helper.h index a2d2c690c4e..db658ca49d7 100644 --- a/be/test/pipeline/operator/operator_helper.h +++ b/be/test/pipeline/operator/operator_helper.h @@ -43,6 +43,24 @@ struct OperatorHelper { LocalStateInfo info {&ctx.profile, scan_ranges, 0, {}, 0}; EXPECT_TRUE(op.setup_local_state(&ctx.state, info).ok()); } + + static bool is_block(std::vector<Dependency*> deps) { + for (auto* dep : deps) { + if (!dep->ready()) { + return true; + } + } + return false; + } + + static bool is_ready(std::vector<Dependency*> deps) { + for (auto* dep : deps) { + if (!dep->ready()) { + return false; + } + } + return true; + } }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/union_operator_test.cpp b/be/test/pipeline/operator/union_operator_test.cpp new file mode 100644 index 00000000000..b0c973f5499 --- /dev/null +++ b/be/test/pipeline/operator/union_operator_test.cpp @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <memory> +#include <vector> + +#include "operator_helper.h" +#include "pipeline/exec/mock_operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/exec/union_source_operator.h" +#include "pipeline/operator/operator_helper.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_descriptors.h" +#include "testutil/mock/mock_literal_expr.h" +#include "testutil/mock/mock_slot_ref.h" +#include "vec/core/block.h" +namespace doris::pipeline { + +using namespace vectorized; + +struct MockUnionSourceOperator : public UnionSourceOperatorX { + MockUnionSourceOperator(int32_t child_size, DataTypes types, ObjectPool* pool) + : UnionSourceOperatorX(child_size), _mock_row_descriptor(types, pool) {} + RowDescriptor& row_descriptor() override { return _mock_row_descriptor; } + MockRowDescriptor _mock_row_descriptor; +}; + +struct MockUnionSinkOperator : public UnionSinkOperatorX { + MockUnionSinkOperator(int child_size, int cur_child_id, int first_materialized_child_idx, + DataTypes types, ObjectPool* pool) + : UnionSinkOperatorX(child_size, cur_child_id, first_materialized_child_idx), + _mock_row_descriptor(types, pool) {} + + RowDescriptor& row_descriptor() override { return _mock_row_descriptor; } + MockRowDescriptor _mock_row_descriptor; +}; + +struct UnionOperatorTest : public ::testing::Test { + void SetUp() override { + state = std::make_shared<MockRuntimeState>(); + state->batsh_size = 10; + for (int i = 0; i < child_size; i++) { + sink_state.push_back(std::make_shared<MockRuntimeState>()); + sink_ops.push_back(nullptr); + } + } + + std::shared_ptr<MockUnionSourceOperator> source_op; + UnionSourceLocalState* source_local_state; + + std::shared_ptr<MockRuntimeState> state; + + RuntimeProfile profile {""}; + + ObjectPool pool; + + const int child_size = 3; + const int first_materialized_child_idx = 1; + + std::vector<std::shared_ptr<MockUnionSinkOperator>> sink_ops; + std::vector<std::shared_ptr<MockRuntimeState>> sink_state; +}; + +TEST_F(UnionOperatorTest, test_all_const_expr) { + state->batsh_size = 2; + source_op.reset(new MockUnionSourceOperator { + 0, + {std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>(), + std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()}, + &pool}); + EXPECT_TRUE(source_op->prepare(state.get())); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({1, 10, 100, 1000})); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({2, 20, 200, 2000})); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({3, 30, 300, 3000})); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({4, 40, 400, 4000})); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({5, 50, 500, 5000})); + auto source_local_state_uptr = + std::make_unique<UnionSourceLocalState>(state.get(), source_op.get()); + source_local_state = source_local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .le_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(source_local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(source_op->operator_id(), std::move(source_local_state_uptr)); + EXPECT_TRUE(source_local_state->open(state.get())); + EXPECT_EQ(source_local_state->dependencies().size(), 1); + EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies())); + + EXPECT_TRUE(source_local_state->_need_read_for_const_expr); + EXPECT_EQ(source_local_state->_const_expr_list_idx, 0); + + EXPECT_TRUE(source_op->has_more_const(state.get())); + + { + Block block; + bool eos; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok()); + EXPECT_EQ(block.rows(), 2); + EXPECT_FALSE(eos); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(ColumnHelper::block_equal( + block, Block { + ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2}), + ColumnHelper::create_column_with_name<DataTypeInt64>({10, 20}), + ColumnHelper::create_column_with_name<DataTypeInt64>({100, 200}), + ColumnHelper::create_column_with_name<DataTypeInt64>({1000, 2000}), + })); + } + + { + Block block; + bool eos; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok()); + EXPECT_EQ(block.rows(), 2); + EXPECT_FALSE(eos); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(ColumnHelper::block_equal( + block, Block { + ColumnHelper::create_column_with_name<DataTypeInt64>({3, 4}), + ColumnHelper::create_column_with_name<DataTypeInt64>({30, 40}), + ColumnHelper::create_column_with_name<DataTypeInt64>({300, 400}), + ColumnHelper::create_column_with_name<DataTypeInt64>({3000, 4000}), + })); + } + + { + Block block; + bool eos; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok()); + EXPECT_EQ(block.rows(), 1); + EXPECT_TRUE(eos); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(ColumnHelper::block_equal( + block, Block { + ColumnHelper::create_column_with_name<DataTypeInt64>({5}), + ColumnHelper::create_column_with_name<DataTypeInt64>({50}), + ColumnHelper::create_column_with_name<DataTypeInt64>({500}), + ColumnHelper::create_column_with_name<DataTypeInt64>({5000}), + })); + } +} + +TEST_F(UnionOperatorTest, test_sink_and_source) { + DataTypes types = {std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()}; + source_op.reset(new MockUnionSourceOperator {child_size, types, &pool}); + EXPECT_TRUE(source_op->prepare(state.get())); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({1, 10})); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({2, 20})); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({3, 30})); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({4, 40})); + source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({5, 50})); + + for (int i = 0; i < child_size; i++) { + sink_ops[i].reset(new MockUnionSinkOperator {child_size, i, first_materialized_child_idx, + types, &pool}); + sink_ops[i]->_child_expr = MockSlotRef::create_mock_contexts(types); + } + + auto shared_state = sink_ops[0]->create_shared_state(); + + //auto & data_queue = dynamic_cast<UnionSharedState*>(shared_state.get())->data_queue; + EXPECT_TRUE(shared_state != nullptr); + EXPECT_TRUE(sink_ops[1]->create_shared_state() == nullptr); + EXPECT_TRUE(sink_ops[2]->create_shared_state() == nullptr); + + { + auto source_local_state_uptr = + std::make_unique<UnionSourceLocalState>(state.get(), source_op.get()); + source_local_state = source_local_state_uptr.get(); + + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = shared_state.get(), + .le_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(source_local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(source_op->operator_id(), std::move(source_local_state_uptr)); + EXPECT_TRUE(source_local_state->open(state.get())); + EXPECT_EQ(source_local_state->dependencies().size(), 1); + EXPECT_TRUE(OperatorHelper::is_block(source_local_state->dependencies())); + } + + { + for (int i = 0; i < child_size; i++) { + auto sink_local_state_uptr = + std::make_unique<UnionSinkLocalState>(sink_ops[i].get(), sink_state[i].get()); + auto* sink_local_state = sink_local_state_uptr.get(); + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = &profile, + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink {}}; + EXPECT_TRUE(sink_local_state->init(sink_state[i].get(), info)); + sink_state[i]->resize_op_id_to_local_state(-100); + sink_state[i]->emplace_sink_local_state(sink_ops[i]->operator_id(), + std::move(sink_local_state_uptr)); + EXPECT_TRUE(sink_local_state->open(sink_state[i].get())); + EXPECT_EQ(sink_local_state->dependencies().size(), 1); + EXPECT_TRUE(OperatorHelper::is_ready(sink_local_state->dependencies())); + } + } + + { + Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4}); + EXPECT_TRUE(sink_ops[0]->sink(sink_state[0].get(), &block, false)); + } + { + Block block; + bool eos; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok()); + EXPECT_EQ(block.rows(), 5); + EXPECT_FALSE(eos); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(ColumnHelper::block_equal( + block, + Block { + ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5}), + ColumnHelper::create_column_with_name<DataTypeInt64>({10, 20, 30, 40, 50}), + })); + } + + EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies())); + + { + Block block; + bool eos; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok()); + EXPECT_EQ(block.rows(), 2); + EXPECT_FALSE(eos); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4}))); + } + + EXPECT_TRUE(OperatorHelper::is_block(source_local_state->dependencies())); + + { + for (int i = 0; i < child_size; i++) { + Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4}); + EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false)); + } + for (int i = 0; i < child_size; i++) { + Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4}); + EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false)); + } + for (int i = 0; i < child_size; i++) { + Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4}); + EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, true)); + } + } + + EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies())); + for (int i = 0; i < 8; i++) { + Block block; + bool eos; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok()); + EXPECT_EQ(block.rows(), 2); + EXPECT_FALSE(eos); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4}))); + } + + { + Block block; + bool eos; + EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok()); + EXPECT_EQ(block.rows(), 2); + EXPECT_TRUE(eos); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4}))); + } +} +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/testutil/column_helper.h b/be/test/testutil/column_helper.h index a9cf58bb880..9d35c7af62d 100644 --- a/be/test/testutil/column_helper.h +++ b/be/test/testutil/column_helper.h @@ -87,6 +87,17 @@ public: return block; } + template <typename DataType> + static Block create_block(const std::vector<typename DataType::FieldType>& data1, + const std::vector<typename DataType::FieldType>& data2) { + auto column1 = create_column<DataType>(data1); + auto column2 = create_column<DataType>(data2); + auto data_type = std::make_shared<DataType>(); + Block block({ColumnWithTypeAndName(column1, data_type, "column1"), + ColumnWithTypeAndName(column2, data_type, "column2")}); + return block; + } + template <typename DataType> static Block create_nullable_block(const std::vector<typename DataType::FieldType>& data, const std::vector<typename NullMap::value_type>& null_map) { diff --git a/be/test/testutil/mock/mock_descriptors.h b/be/test/testutil/mock/mock_descriptors.h index 198c821dbe9..5419a22d38b 100644 --- a/be/test/testutil/mock/mock_descriptors.h +++ b/be/test/testutil/mock/mock_descriptors.h @@ -54,6 +54,7 @@ public: tuple_desc->Slots = slots; tuple_desc_map.push_back(tuple_desc); _tuple_desc_map.push_back(tuple_desc); + _num_materialized_slots = types.size(); } const std::vector<TupleDescriptor*>& tuple_descriptors() const override { return tuple_desc_map; diff --git a/be/test/testutil/mock/mock_literal_expr.cpp b/be/test/testutil/mock/mock_literal_expr.cpp new file mode 100644 index 00000000000..9f881a2ba13 --- /dev/null +++ b/be/test/testutil/mock/mock_literal_expr.cpp @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "mock_literal_expr.h" + +#include <gtest/gtest.h> + +#include "testutil/column_helper.h" +#include "vec/data_types/data_type_number.h" + +namespace doris::vectorized { + +TEST(MockLiteralTest, test) { + { + auto ctxs = MockLiteral::create<DataTypeInt64>({1, 2, 3, 4}); + Block block; + for (auto& ctx : ctxs) { + int result_column_id = -1; + EXPECT_TRUE(ctx->execute(&block, &result_column_id)); + } + + std::cout << block.dump_data() << std::endl; + + EXPECT_TRUE(ColumnHelper::block_equal( + block, Block { + ColumnHelper::create_column_with_name<DataTypeInt64>({1}), + ColumnHelper::create_column_with_name<DataTypeInt64>({2}), + ColumnHelper::create_column_with_name<DataTypeInt64>({3}), + ColumnHelper::create_column_with_name<DataTypeInt64>({4}), + })); + } +} +} // namespace doris::vectorized diff --git a/be/test/testutil/mock/mock_literal_expr.h b/be/test/testutil/mock/mock_literal_expr.h new file mode 100644 index 00000000000..21bab1742d9 --- /dev/null +++ b/be/test/testutil/mock/mock_literal_expr.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <memory> +#include <string> + +#include "common/status.h" +#include "testutil/column_helper.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/exprs/vliteral.h" +#include "vec/exprs/vslot_ref.h" + +namespace doris { +class SlotDescriptor; +class RowDescriptor; +class RuntimeState; +class TExprNode; + +namespace vectorized { +class Block; +class VExprContext; + +class MockLiteral final : public VLiteral { +public: + MockLiteral(ColumnWithTypeAndName data) { + _data_type = data.type; + _column_ptr = data.column; + _expr_name = data.name; + } + + Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override { + _prepare_finished = true; + return Status::OK(); + } + + Status open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override { + _open_finished = true; + return Status::OK(); + } + const std::string& expr_name() const override { return _name; } + + template <typename DataType> + static VExprContextSPtr create(const DataType::FieldType& value) { + auto ctx = VExprContext::create_shared(std::make_shared<MockLiteral>( + ColumnHelper::create_column_with_name<DataType>({value}))); + ctx->_prepared = true; + ctx->_opened = true; + return ctx; + } + + template <typename DataType> + static VExprContextSPtrs create(const std::vector<typename DataType::FieldType>& values) { + VExprContextSPtrs ctxs; + for (const auto& value : values) { + ctxs.push_back(create<DataType>(value)); + } + return ctxs; + } + +private: + const std::string _name = "MockLiteral"; +}; + +} // namespace vectorized +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org