This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e9cef9e71f9 [test](beut) add pipeline UnionOperator beut (#48984)
e9cef9e71f9 is described below

commit e9cef9e71f9a464f9807f93f95962aa43c7dd830
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Mon Mar 17 14:09:15 2025 +0800

    [test](beut) add pipeline UnionOperator beut (#48984)
    
    ### What problem does this PR solve?
    add pipeline UnionOperator beut
---
 be/src/pipeline/exec/union_sink_operator.cpp      |  64 +++--
 be/src/pipeline/exec/union_sink_operator.h        |  61 +----
 be/src/pipeline/exec/union_source_operator.cpp    | 101 ++++----
 be/src/pipeline/exec/union_source_operator.h      |  59 ++---
 be/src/vec/exprs/vliteral.h                       |   4 +
 be/test/pipeline/operator/operator_helper.h       |  18 ++
 be/test/pipeline/operator/union_operator_test.cpp | 296 ++++++++++++++++++++++
 be/test/testutil/column_helper.h                  |  11 +
 be/test/testutil/mock/mock_descriptors.h          |   1 +
 be/test/testutil/mock/mock_literal_expr.cpp       |  47 ++++
 be/test/testutil/mock/mock_literal_expr.h         |  83 ++++++
 11 files changed, 579 insertions(+), 166 deletions(-)

diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index 4bbb5eba3e3..8de21c6ac21 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -65,27 +65,16 @@ UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int 
sink_id, int dest_id, O
 Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
     DCHECK(tnode.__isset.union_node);
-    {
-        // Create result_expr_ctx_lists_ from thrift exprs.
-        auto& result_texpr_lists = tnode.union_node.result_expr_lists;
-        auto& texprs = result_texpr_lists[_cur_child_id];
-        vectorized::VExprContextSPtrs ctxs;
-        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
-        _child_expr = ctxs;
-    }
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
+            tnode.union_node.result_expr_lists[_cur_child_id], _child_expr));
     return Status::OK();
 }
 
 Status UnionSinkOperatorX::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::prepare(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
-    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_row_descriptor));
-    // open const expr lists.
-    RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state));
-
-    // open result expr lists.
     RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
-
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
row_descriptor()));
     return Status::OK();
 }
 
@@ -100,38 +89,45 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block
         local_state._output_block =
                 
local_state._shared_state->data_queue.get_free_block(_cur_child_id);
     }
-    if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through
+    if (is_child_passthrough(_cur_child_id)) {
+        //pass_through without expr
         if (in_block->rows() > 0) {
             local_state._output_block->swap(*in_block);
             
local_state._shared_state->data_queue.push_block(std::move(local_state._output_block),
                                                              _cur_child_id);
         }
-    } else if (_get_first_materialized_child_idx() != children_count() &&
-               _cur_child_id < children_count()) { //need materialized
-        RETURN_IF_ERROR(materialize_child_block(state, _cur_child_id, in_block,
-                                                
local_state._output_block.get()));
     } else {
-        return Status::InternalError("maybe can't reach here, execute const 
expr: {}, {}, {}",
-                                     _cur_child_id, 
_get_first_materialized_child_idx(),
-                                     children_count());
-    }
-    if (UNLIKELY(eos)) {
-        //if _cur_child_id eos, need check to push block
-        //Now here can't check _output_block rows, even it's row==0, also need 
push block
-        //because maybe sink is eos and queue have none data, if not push block
-        //the source can't can_read again and can't set source finished
-        if (local_state._output_block) {
+        RETURN_IF_ERROR(materialize_child_block(state, in_block, 
local_state._output_block.get()));
+        if (local_state._output_block->rows() > 0) {
             
local_state._shared_state->data_queue.push_block(std::move(local_state._output_block),
                                                              _cur_child_id);
         }
-
+    }
+    if (UNLIKELY(eos)) {
+        // set_finish will set source ready
         local_state._shared_state->data_queue.set_finish(_cur_child_id);
         return Status::OK();
     }
-    // not eos and block rows is enough to output,so push block
-    if (local_state._output_block && (local_state._output_block->rows() >= 
state->batch_size())) {
-        
local_state._shared_state->data_queue.push_block(std::move(local_state._output_block),
-                                                         _cur_child_id);
+    return Status::OK();
+}
+
+Status UnionSinkOperatorX::materialize_child_block(RuntimeState* state,
+                                                   vectorized::Block* 
input_block,
+                                                   vectorized::Block* 
output_block) {
+    auto& local_state = get_local_state(state);
+    SCOPED_TIMER(local_state._expr_timer);
+    if (input_block->rows() > 0) {
+        vectorized::MutableBlock mutable_block =
+                
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
+                                                                           
row_descriptor());
+        vectorized::ColumnsWithTypeAndName colunms;
+        const auto& child_exprs = local_state._child_expr;
+        for (const auto& child_expr : child_exprs) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(child_expr->execute(input_block, 
&result_column_id));
+            
colunms.emplace_back(input_block->get_by_position(result_column_id));
+        }
+        RETURN_IF_ERROR(mutable_block.merge(vectorized::Block {colunms}));
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 170b99f12f1..00a13bb9e8e 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -36,8 +36,7 @@ class UnionSinkOperatorX;
 class UnionSinkLocalState final : public 
PipelineXSinkLocalState<UnionSharedState> {
 public:
     ENABLE_FACTORY_CREATOR(UnionSinkLocalState);
-    UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
-            : Base(parent, state), _child_row_idx(0) {}
+    UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : 
Base(parent, state) {}
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
     friend class UnionSinkOperatorX;
@@ -47,25 +46,24 @@ public:
 private:
     std::unique_ptr<vectorized::Block> _output_block;
 
-    /// Const exprs materialized by this node. These exprs don't refer to any 
children.
-    /// Only materialized by the first fragment instance to avoid duplication.
-    vectorized::VExprContextSPtrs _const_expr;
-
     /// Exprs materialized by this node. The i-th result expr list refers to 
the i-th child.
     vectorized::VExprContextSPtrs _child_expr;
-
-    /// Index of current row in child_row_block_.
-    int _child_row_idx;
     RuntimeProfile::Counter* _expr_timer = nullptr;
 };
 
-class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> 
{
+class UnionSinkOperatorX MOCK_REMOVE(final) : public 
DataSinkOperatorX<UnionSinkLocalState> {
 public:
     using Base = DataSinkOperatorX<UnionSinkLocalState>;
 
     friend class UnionSinkLocalState;
     UnionSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* 
pool,
                        const TPlanNode& tnode, const DescriptorTbl& descs);
+#ifdef BE_TEST
+    UnionSinkOperatorX(int child_size, int cur_child_id, int 
first_materialized_child_idx)
+            : _first_materialized_child_idx(first_materialized_child_idx),
+              _cur_child_id(cur_child_id),
+              _child_size(child_size) {}
+#endif
     ~UnionSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TDataSink",
@@ -102,13 +100,9 @@ public:
 
     bool is_shuffled_operator() const override { return 
_followed_by_shuffled_operator; }
 
-private:
-    int _get_first_materialized_child_idx() const { return 
_first_materialized_child_idx; }
-
-    /// Const exprs materialized by this node. These exprs don't refer to any 
children.
-    /// Only materialized by the first fragment instance to avoid duplication.
-    vectorized::VExprContextSPtrs _const_expr;
+    MOCK_FUNCTION const RowDescriptor& row_descriptor() { return 
_row_descriptor; }
 
+private:
     /// Exprs materialized by this node. The i-th result expr list refers to 
the i-th child.
     vectorized::VExprContextSPtrs _child_expr;
     /// Index of the first non-passthrough child; i.e. a child that needs 
materialization.
@@ -119,42 +113,13 @@ private:
     const RowDescriptor _row_descriptor;
     const int _cur_child_id;
     const int _child_size;
-    int children_count() const { return _child_size; }
+
     bool is_child_passthrough(int child_idx) const {
         DCHECK_LT(child_idx, _child_size);
         return child_idx < _first_materialized_child_idx;
     }
-    Status materialize_child_block(RuntimeState* state, int child_id,
-                                   vectorized::Block* input_block,
-                                   vectorized::Block* output_block) {
-        DCHECK_LT(child_id, _child_size);
-        DCHECK(!is_child_passthrough(child_id));
-        if (input_block->rows() > 0) {
-            vectorized::MutableBlock mblock =
-                    
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
-                                                                               
_row_descriptor);
-            vectorized::Block res;
-            RETURN_IF_ERROR(materialize_block(state, input_block, child_id, 
&res));
-            RETURN_IF_ERROR(mblock.merge(res));
-        }
-        return Status::OK();
-    }
-
-    Status materialize_block(RuntimeState* state, vectorized::Block* 
src_block, int child_idx,
-                             vectorized::Block* res_block) {
-        auto& local_state = get_local_state(state);
-        SCOPED_TIMER(local_state._expr_timer);
-        const auto& child_exprs = local_state._child_expr;
-        vectorized::ColumnsWithTypeAndName colunms;
-        for (size_t i = 0; i < child_exprs.size(); ++i) {
-            int result_column_id = -1;
-            RETURN_IF_ERROR(child_exprs[i]->execute(src_block, 
&result_column_id));
-            colunms.emplace_back(src_block->get_by_position(result_column_id));
-        }
-        local_state._child_row_idx += src_block->rows();
-        *res_block = {colunms};
-        return Status::OK();
-    }
+    Status materialize_child_block(RuntimeState* state, vectorized::Block* 
input_block,
+                                   vectorized::Block* output_block);
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index b381e1f2712..fb98f4c0ece 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -17,6 +17,7 @@
 
 #include "pipeline/exec/union_source_operator.h"
 
+#include <algorithm>
 #include <functional>
 #include <utility>
 
@@ -55,31 +56,29 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     return Status::OK();
 }
 
-Status UnionSourceLocalState::open(RuntimeState* state) {
-    SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
-    RETURN_IF_ERROR(Base::open(state));
-
-    auto& p = _parent->cast<Parent>();
-    // Const exprs materialized by this node. These exprs don't refer to any 
children.
-    // Only materialized by the first fragment instance to avoid duplication.
-    if (state->per_fragment_instance_idx() == 0) {
-        auto clone_expr_list = [&](vectorized::VExprContextSPtrs& 
cur_expr_list,
-                                   vectorized::VExprContextSPtrs& 
other_expr_list) {
-            cur_expr_list.resize(other_expr_list.size());
-            for (int i = 0; i < cur_expr_list.size(); i++) {
-                RETURN_IF_ERROR(other_expr_list[i]->clone(state, 
cur_expr_list[i]));
-            }
-            return Status::OK();
-        };
-        _const_expr_lists.resize(p._const_expr_lists.size());
-        for (int i = 0; i < _const_expr_lists.size(); i++) {
-            auto& _const_expr_list = _const_expr_lists[i];
-            auto& other_expr_list = p._const_expr_lists[i];
-            RETURN_IF_ERROR(clone_expr_list(_const_expr_list, 
other_expr_list));
-        }
+Status UnionSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) 
{
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    for (const auto& texprs : tnode.union_node.const_expr_lists) {
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _const_expr_lists.push_back(ctxs);
+    }
+    if (!std::ranges::all_of(_const_expr_lists, [&](const auto& exprs) {
+            return exprs.size() == _const_expr_lists.front().size();
+        })) {
+        return Status::InternalError("Const expr lists size not match");
     }
+    return Status::OK();
+}
 
+Status UnionSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) {
+        RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state, 
row_descriptor()));
+    }
+    for (const auto& exprs : _const_expr_lists) {
+        RETURN_IF_ERROR(vectorized::VExpr::open(exprs, state));
+    }
     return Status::OK();
 }
 
@@ -100,15 +99,19 @@ Status UnionSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block* b
         // the eos check of union operator is complex, need check all logical 
if you want modify
         // could ref this PR: https://github.com/apache/doris/pull/29677
         // have executing const expr, queue have no data anymore, and child 
could be closed
-        if (_child_size == 0 && !local_state._need_read_for_const_expr) {
-            *eos = true;
-        } else if (_has_data(state)) {
+        if (_child_size == 0) {
+            // If _child_size == 0, eos = true will only be returned when all 
constant expressions are executed
+            *eos = !local_state._need_read_for_const_expr;
+        } else if (has_data(state)) {
+            // data queue still has data, return eos = false
             *eos = false;
         } else if (local_state._shared_state->data_queue.is_all_finish()) {
             // Here, check the value of `_has_data(state)` again after 
`data_queue.is_all_finish()` is TRUE
             // as there may be one or more blocks when 
`data_queue.is_all_finish()` is TRUE.
-            *eos = !_has_data(state);
+            *eos = !has_data(state);
         } else {
+            // At this point, the data queue has no data, but the sink is not 
all finished, return eos = false
+            // (this situation may be because the source consumes too fast)
             *eos = false;
         }
     }};
@@ -128,7 +131,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* b
             return Status::OK();
         }
         block->swap(*output_block);
-        
output_block->clear_column_data(_row_descriptor.num_materialized_slots());
+        
output_block->clear_column_data(row_descriptor().num_materialized_slots());
         
local_state._shared_state->data_queue.push_free_block(std::move(output_block), 
child_idx);
     }
     local_state.reached_limit(block, eos);
@@ -137,46 +140,34 @@ Status UnionSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block* b
 
 Status UnionSourceOperatorX::get_next_const(RuntimeState* state, 
vectorized::Block* block) {
     DCHECK_EQ(state->per_fragment_instance_idx(), 0);
-    auto& local_state = 
state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
+    auto& local_state = get_local_state(state);
     DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size());
 
     SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
-    auto& _const_expr_list_idx = local_state._const_expr_list_idx;
-    vectorized::MutableBlock mblock =
-            vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, 
_row_descriptor);
-    for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() < 
state->batch_size();
-         ++_const_expr_list_idx) {
+    auto& const_expr_list_idx = local_state._const_expr_list_idx;
+    vectorized::MutableBlock mutable_block =
+            vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, 
row_descriptor());
+    for (; const_expr_list_idx < _const_expr_lists.size() &&
+           mutable_block.rows() < state->batch_size();
+         ++const_expr_list_idx) {
         vectorized::Block tmp_block;
+        // When we execute a constant expression, we need one row of data 
because the expr may use the block's rows for some judgments
         tmp_block.insert({vectorized::ColumnUInt8::create(1),
                           std::make_shared<vectorized::DataTypeUInt8>(), ""});
-        int const_expr_lists_size = 
cast_set<int>(_const_expr_lists[_const_expr_list_idx].size());
-        if (_const_expr_list_idx && const_expr_lists_size != 
_const_expr_lists[0].size()) {
-            return Status::InternalError(
-                    "[UnionNode]const expr at {}'s count({}) not matched({} 
expected)",
-                    _const_expr_list_idx, const_expr_lists_size, 
_const_expr_lists[0].size());
-        }
-
-        std::vector<int> result_list(const_expr_lists_size);
-        for (size_t i = 0; i < const_expr_lists_size; ++i) {
-            
RETURN_IF_ERROR(_const_expr_lists[_const_expr_list_idx][i]->execute(&tmp_block,
-                                                                               
 &result_list[i]));
-        }
-        tmp_block.erase_not_in(result_list);
-        if (tmp_block.columns() != mblock.columns()) {
-            return Status::InternalError(
-                    "[UnionNode]columns count of const expr block not matched 
({} vs {})",
-                    tmp_block.columns(), mblock.columns());
-        }
-        if (tmp_block.rows() > 0) {
-            RETURN_IF_ERROR(mblock.merge(tmp_block));
-            tmp_block.clear();
+        vectorized::ColumnsWithTypeAndName colunms;
+        for (auto& expr : _const_expr_lists[const_expr_list_idx]) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(expr->execute(&tmp_block, &result_column_id));
+            colunms.emplace_back(tmp_block.get_by_position(result_column_id));
         }
+        RETURN_IF_ERROR(mutable_block.merge(vectorized::Block {colunms}));
     }
 
     // some insert query like "insert into string_test select 1, repeat('a', 
1024 * 1024);"
     // the const expr will be in output expr cause the union node return a 
empty block. so here we
     // need add one row to make sure the union node exec const expr return at 
least one row
+    /// TODO: maybe we can remove this
     if (block->rows() == 0) {
         block->insert({vectorized::ColumnUInt8::create(1),
                        std::make_shared<vectorized::DataTypeUInt8>(), ""});
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 6619b623ef5..3985cf3910e 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -43,7 +43,6 @@ public:
     UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
-    Status open(RuntimeState* state) override;
 
     [[nodiscard]] std::string debug_string(int indentation_level = 0) const 
override;
 
@@ -52,49 +51,50 @@ private:
     friend class OperatorX<UnionSourceLocalState>;
     bool _need_read_for_const_expr {true};
     int _const_expr_list_idx {0};
-    std::vector<vectorized::VExprContextSPtrs> _const_expr_lists;
 
     // If this operator has no children, there is no shared state which owns 
dependency. So we
     // use this local state to hold this dependency.
     DependencySPtr _only_const_dependency = nullptr;
 };
 
-class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
+/*
+There are two cases for union node: one is only constant expressions, and the 
other is having other child nodes besides constant expressions.
+Unlike other union operators, the union node only merges data without 
deduplication.
+
+|   0:VUNION(66)                                                               
                                            |
+|      constant exprs:                                                         
                                            |
+|          1 | 2 | 3 | 4                                                       
                                            |
+|          5 | 6 | 7 | 8                                                       
                                            |
+|      tuple ids: 0                                                            
                                            | 
+
+|   4:VUNION(179)                                                              
                                             |
+|   |  constant exprs:                                                         
                                             |
+|   |      1 | 2 | 3 | 4                                                       
                                             |
+|   |      5 | 6 | 7 | 8                                                       
                                             |
+|   |  child exprs:                                                            
                                             |
+|   |      k1[#0] | k2[#1] | k3[#2] | k4[#3]                                   
                                             |
+|   |      k1[#4] | k2[#5] | k3[#6] | k4[#7]                                   
                                             |
+|   |  tuple ids: 2                                                            
                                             |
+*/
+
+class UnionSourceOperatorX MOCK_REMOVE(final) : public 
OperatorX<UnionSourceLocalState> {
 public:
     using Base = OperatorX<UnionSourceLocalState>;
     UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                          const DescriptorTbl& descs)
             : Base(pool, tnode, operator_id, descs), 
_child_size(tnode.num_children) {}
+
+#ifdef BE_TEST
+    UnionSourceOperatorX(int child_size) : _child_size(child_size) {}
+#endif
     ~UnionSourceOperatorX() override = default;
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
     bool is_source() const override { return true; }
 
-    Status init(const TPlanNode& tnode, RuntimeState* state) override {
-        RETURN_IF_ERROR(Base::init(tnode, state));
-        DCHECK(tnode.__isset.union_node);
-        // Create const_expr_ctx_lists_ from thrift exprs.
-        auto& const_texpr_lists = tnode.union_node.const_expr_lists;
-        for (auto& texprs : const_texpr_lists) {
-            vectorized::VExprContextSPtrs ctxs;
-            RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, 
ctxs));
-            _const_expr_lists.push_back(ctxs);
-        }
-        return Status::OK();
-    }
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
-    Status prepare(RuntimeState* state) override {
-        static_cast<void>(Base::prepare(state));
-        // Prepare const expr lists.
-        for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) {
-            RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state, 
_row_descriptor));
-        }
-        // open const expr lists.
-        for (const auto& exprs : _const_expr_lists) {
-            RETURN_IF_ERROR(vectorized::VExpr::open(exprs, state));
-        }
-        return Status::OK();
-    }
     [[nodiscard]] int get_child_count() const { return _child_size; }
     bool require_shuffled_data_distribution() const override {
         return _followed_by_shuffled_operator;
@@ -114,7 +114,7 @@ public:
     }
 
 private:
-    bool _has_data(RuntimeState* state) const {
+    bool has_data(RuntimeState* state) const {
         auto& local_state = get_local_state(state);
         if (_child_size == 0) {
             return local_state._need_read_for_const_expr;
@@ -122,9 +122,10 @@ private:
         return local_state._shared_state->data_queue.remaining_has_data();
     }
     bool has_more_const(RuntimeState* state) const {
+        // For constant expressions, only one instance will execute the 
expression
         auto& local_state = get_local_state(state);
         return state->per_fragment_instance_idx() == 0 &&
-               local_state._const_expr_list_idx < 
local_state._const_expr_lists.size();
+               local_state._const_expr_list_idx < _const_expr_lists.size();
     }
     friend class UnionSourceLocalState;
     const int _child_size;
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 3d9c948ec88..8efb5677cb0 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -43,6 +43,10 @@ public:
         }
     }
 
+#ifdef BE_TEST
+    VLiteral() = default;
+#endif
+
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
override;
 
diff --git a/be/test/pipeline/operator/operator_helper.h 
b/be/test/pipeline/operator/operator_helper.h
index a2d2c690c4e..db658ca49d7 100644
--- a/be/test/pipeline/operator/operator_helper.h
+++ b/be/test/pipeline/operator/operator_helper.h
@@ -43,6 +43,24 @@ struct OperatorHelper {
         LocalStateInfo info {&ctx.profile, scan_ranges, 0, {}, 0};
         EXPECT_TRUE(op.setup_local_state(&ctx.state, info).ok());
     }
+
+    static bool is_block(std::vector<Dependency*> deps) {
+        for (auto* dep : deps) {
+            if (!dep->ready()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    static bool is_ready(std::vector<Dependency*> deps) {
+        for (auto* dep : deps) {
+            if (!dep->ready()) {
+                return false;
+            }
+        }
+        return true;
+    }
 };
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/union_operator_test.cpp 
b/be/test/pipeline/operator/union_operator_test.cpp
new file mode 100644
index 00000000000..b0c973f5499
--- /dev/null
+++ b/be/test/pipeline/operator/union_operator_test.cpp
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "operator_helper.h"
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/exec/union_source_operator.h"
+#include "pipeline/operator/operator_helper.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_literal_expr.h"
+#include "testutil/mock/mock_slot_ref.h"
+#include "vec/core/block.h"
+namespace doris::pipeline {
+
+using namespace vectorized;
+
+struct MockUnionSourceOperator : public UnionSourceOperatorX {
+    MockUnionSourceOperator(int32_t child_size, DataTypes types, ObjectPool* 
pool)
+            : UnionSourceOperatorX(child_size), _mock_row_descriptor(types, 
pool) {}
+    RowDescriptor& row_descriptor() override { return _mock_row_descriptor; }
+    MockRowDescriptor _mock_row_descriptor;
+};
+
+struct MockUnionSinkOperator : public UnionSinkOperatorX {
+    MockUnionSinkOperator(int child_size, int cur_child_id, int 
first_materialized_child_idx,
+                          DataTypes types, ObjectPool* pool)
+            : UnionSinkOperatorX(child_size, cur_child_id, 
first_materialized_child_idx),
+              _mock_row_descriptor(types, pool) {}
+
+    RowDescriptor& row_descriptor() override { return _mock_row_descriptor; }
+    MockRowDescriptor _mock_row_descriptor;
+};
+
+struct UnionOperatorTest : public ::testing::Test {
+    void SetUp() override {
+        state = std::make_shared<MockRuntimeState>();
+        state->batsh_size = 10;
+        for (int i = 0; i < child_size; i++) {
+            sink_state.push_back(std::make_shared<MockRuntimeState>());
+            sink_ops.push_back(nullptr);
+        }
+    }
+
+    std::shared_ptr<MockUnionSourceOperator> source_op;
+    UnionSourceLocalState* source_local_state;
+
+    std::shared_ptr<MockRuntimeState> state;
+
+    RuntimeProfile profile {""};
+
+    ObjectPool pool;
+
+    const int child_size = 3;
+    const int first_materialized_child_idx = 1;
+
+    std::vector<std::shared_ptr<MockUnionSinkOperator>> sink_ops;
+    std::vector<std::shared_ptr<MockRuntimeState>> sink_state;
+};
+
+TEST_F(UnionOperatorTest, test_all_const_expr) {
+    state->batsh_size = 2;
+    source_op.reset(new MockUnionSourceOperator {
+            0,
+            {std::make_shared<DataTypeInt64>(), 
std::make_shared<DataTypeInt64>(),
+             std::make_shared<DataTypeInt64>(), 
std::make_shared<DataTypeInt64>()},
+            &pool});
+    EXPECT_TRUE(source_op->prepare(state.get()));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({1, 
10, 100, 1000}));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({2, 
20, 200, 2000}));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({3, 
30, 300, 3000}));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({4, 
40, 400, 4000}));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({5, 
50, 500, 5000}));
+    auto source_local_state_uptr =
+            std::make_unique<UnionSourceLocalState>(state.get(), 
source_op.get());
+    source_local_state = source_local_state_uptr.get();
+    LocalStateInfo info {.parent_profile = &profile,
+                         .scan_ranges = {},
+                         .shared_state = nullptr,
+                         .le_state_map = {},
+                         .task_idx = 0};
+    EXPECT_TRUE(source_local_state->init(state.get(), info));
+    state->resize_op_id_to_local_state(-100);
+    state->emplace_local_state(source_op->operator_id(), 
std::move(source_local_state_uptr));
+    EXPECT_TRUE(source_local_state->open(state.get()));
+    EXPECT_EQ(source_local_state->dependencies().size(), 1);
+    EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
+
+    EXPECT_TRUE(source_local_state->_need_read_for_const_expr);
+    EXPECT_EQ(source_local_state->_const_expr_list_idx, 0);
+
+    EXPECT_TRUE(source_op->has_more_const(state.get()));
+
+    {
+        Block block;
+        bool eos;
+        EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+        EXPECT_EQ(block.rows(), 2);
+        EXPECT_FALSE(eos);
+        std::cout << block.dump_data() << std::endl;
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, Block {
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({10, 20}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({100, 200}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({1000, 2000}),
+                       }));
+    }
+
+    {
+        Block block;
+        bool eos;
+        EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+        EXPECT_EQ(block.rows(), 2);
+        EXPECT_FALSE(eos);
+        std::cout << block.dump_data() << std::endl;
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, Block {
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({3, 4}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({30, 40}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({300, 400}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({3000, 4000}),
+                       }));
+    }
+
+    {
+        Block block;
+        bool eos;
+        EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+        EXPECT_EQ(block.rows(), 1);
+        EXPECT_TRUE(eos);
+        std::cout << block.dump_data() << std::endl;
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, Block {
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({5}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({50}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({500}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({5000}),
+                       }));
+    }
+}
+
+TEST_F(UnionOperatorTest, test_sink_and_source) {
+    DataTypes types = {std::make_shared<DataTypeInt64>(), 
std::make_shared<DataTypeInt64>()};
+    source_op.reset(new MockUnionSourceOperator {child_size, types, &pool});
+    EXPECT_TRUE(source_op->prepare(state.get()));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({1, 
10}));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({2, 
20}));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({3, 
30}));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({4, 
40}));
+    
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({5, 
50}));
+
+    for (int i = 0; i < child_size; i++) {
+        sink_ops[i].reset(new MockUnionSinkOperator {child_size, i, 
first_materialized_child_idx,
+                                                     types, &pool});
+        sink_ops[i]->_child_expr = MockSlotRef::create_mock_contexts(types);
+    }
+
+    auto shared_state = sink_ops[0]->create_shared_state();
+
+    //auto & data_queue = 
dynamic_cast<UnionSharedState*>(shared_state.get())->data_queue;
+    EXPECT_TRUE(shared_state != nullptr);
+    EXPECT_TRUE(sink_ops[1]->create_shared_state() == nullptr);
+    EXPECT_TRUE(sink_ops[2]->create_shared_state() == nullptr);
+
+    {
+        auto source_local_state_uptr =
+                std::make_unique<UnionSourceLocalState>(state.get(), 
source_op.get());
+        source_local_state = source_local_state_uptr.get();
+
+        LocalStateInfo info {.parent_profile = &profile,
+                             .scan_ranges = {},
+                             .shared_state = shared_state.get(),
+                             .le_state_map = {},
+                             .task_idx = 0};
+        EXPECT_TRUE(source_local_state->init(state.get(), info));
+        state->resize_op_id_to_local_state(-100);
+        state->emplace_local_state(source_op->operator_id(), 
std::move(source_local_state_uptr));
+        EXPECT_TRUE(source_local_state->open(state.get()));
+        EXPECT_EQ(source_local_state->dependencies().size(), 1);
+        
EXPECT_TRUE(OperatorHelper::is_block(source_local_state->dependencies()));
+    }
+
+    {
+        for (int i = 0; i < child_size; i++) {
+            auto sink_local_state_uptr =
+                    std::make_unique<UnionSinkLocalState>(sink_ops[i].get(), 
sink_state[i].get());
+            auto* sink_local_state = sink_local_state_uptr.get();
+            LocalSinkStateInfo info {.task_idx = 0,
+                                     .parent_profile = &profile,
+                                     .sender_id = 0,
+                                     .shared_state = shared_state.get(),
+                                     .le_state_map = {},
+                                     .tsink = TDataSink {}};
+            EXPECT_TRUE(sink_local_state->init(sink_state[i].get(), info));
+            sink_state[i]->resize_op_id_to_local_state(-100);
+            sink_state[i]->emplace_sink_local_state(sink_ops[i]->operator_id(),
+                                                    
std::move(sink_local_state_uptr));
+            EXPECT_TRUE(sink_local_state->open(sink_state[i].get()));
+            EXPECT_EQ(sink_local_state->dependencies().size(), 1);
+            
EXPECT_TRUE(OperatorHelper::is_ready(sink_local_state->dependencies()));
+        }
+    }
+
+    {
+        Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 
4});
+        EXPECT_TRUE(sink_ops[0]->sink(sink_state[0].get(), &block, false));
+    }
+    {
+        Block block;
+        bool eos;
+        EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+        EXPECT_EQ(block.rows(), 5);
+        EXPECT_FALSE(eos);
+        std::cout << block.dump_data() << std::endl;
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block,
+                Block {
+                        
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5}),
+                        
ColumnHelper::create_column_with_name<DataTypeInt64>({10, 20, 30, 40, 50}),
+                }));
+    }
+
+    EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
+
+    {
+        Block block;
+        bool eos;
+        EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+        EXPECT_EQ(block.rows(), 2);
+        EXPECT_FALSE(eos);
+        std::cout << block.dump_data() << std::endl;
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 
4})));
+    }
+
+    EXPECT_TRUE(OperatorHelper::is_block(source_local_state->dependencies()));
+
+    {
+        for (int i = 0; i < child_size; i++) {
+            Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, 
{3, 4});
+            EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false));
+        }
+        for (int i = 0; i < child_size; i++) {
+            Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, 
{3, 4});
+            EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false));
+        }
+        for (int i = 0; i < child_size; i++) {
+            Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, 
{3, 4});
+            EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, true));
+        }
+    }
+
+    EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
+    for (int i = 0; i < 8; i++) {
+        Block block;
+        bool eos;
+        EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+        EXPECT_EQ(block.rows(), 2);
+        EXPECT_FALSE(eos);
+        std::cout << block.dump_data() << std::endl;
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 
4})));
+    }
+
+    {
+        Block block;
+        bool eos;
+        EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+        EXPECT_EQ(block.rows(), 2);
+        EXPECT_TRUE(eos);
+        std::cout << block.dump_data() << std::endl;
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 
4})));
+    }
+}
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/testutil/column_helper.h b/be/test/testutil/column_helper.h
index a9cf58bb880..9d35c7af62d 100644
--- a/be/test/testutil/column_helper.h
+++ b/be/test/testutil/column_helper.h
@@ -87,6 +87,17 @@ public:
         return block;
     }
 
+    template <typename DataType>
+    static Block create_block(const std::vector<typename DataType::FieldType>& 
data1,
+                              const std::vector<typename DataType::FieldType>& 
data2) {
+        auto column1 = create_column<DataType>(data1);
+        auto column2 = create_column<DataType>(data2);
+        auto data_type = std::make_shared<DataType>();
+        Block block({ColumnWithTypeAndName(column1, data_type, "column1"),
+                     ColumnWithTypeAndName(column2, data_type, "column2")});
+        return block;
+    }
+
     template <typename DataType>
     static Block create_nullable_block(const std::vector<typename 
DataType::FieldType>& data,
                                        const std::vector<typename 
NullMap::value_type>& null_map) {
diff --git a/be/test/testutil/mock/mock_descriptors.h 
b/be/test/testutil/mock/mock_descriptors.h
index 198c821dbe9..5419a22d38b 100644
--- a/be/test/testutil/mock/mock_descriptors.h
+++ b/be/test/testutil/mock/mock_descriptors.h
@@ -54,6 +54,7 @@ public:
         tuple_desc->Slots = slots;
         tuple_desc_map.push_back(tuple_desc);
         _tuple_desc_map.push_back(tuple_desc);
+        _num_materialized_slots = types.size();
     }
     const std::vector<TupleDescriptor*>& tuple_descriptors() const override {
         return tuple_desc_map;
diff --git a/be/test/testutil/mock/mock_literal_expr.cpp 
b/be/test/testutil/mock/mock_literal_expr.cpp
new file mode 100644
index 00000000000..9f881a2ba13
--- /dev/null
+++ b/be/test/testutil/mock/mock_literal_expr.cpp
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "mock_literal_expr.h"
+
+#include <gtest/gtest.h>
+
+#include "testutil/column_helper.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::vectorized {
+
+TEST(MockLiteralTest, test) {
+    {
+        auto ctxs = MockLiteral::create<DataTypeInt64>({1, 2, 3, 4});
+        Block block;
+        for (auto& ctx : ctxs) {
+            int result_column_id = -1;
+            EXPECT_TRUE(ctx->execute(&block, &result_column_id));
+        }
+
+        std::cout << block.dump_data() << std::endl;
+
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, Block {
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({1}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({2}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({3}),
+                               
ColumnHelper::create_column_with_name<DataTypeInt64>({4}),
+                       }));
+    }
+}
+} // namespace doris::vectorized
diff --git a/be/test/testutil/mock/mock_literal_expr.h 
b/be/test/testutil/mock/mock_literal_expr.h
new file mode 100644
index 00000000000..21bab1742d9
--- /dev/null
+++ b/be/test/testutil/mock/mock_literal_expr.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "testutil/column_helper.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris {
+class SlotDescriptor;
+class RowDescriptor;
+class RuntimeState;
+class TExprNode;
+
+namespace vectorized {
+class Block;
+class VExprContext;
+
+class MockLiteral final : public VLiteral {
+public:
+    MockLiteral(ColumnWithTypeAndName data) {
+        _data_type = data.type;
+        _column_ptr = data.column;
+        _expr_name = data.name;
+    }
+
+    Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override {
+        _prepare_finished = true;
+        return Status::OK();
+    }
+
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override {
+        _open_finished = true;
+        return Status::OK();
+    }
+    const std::string& expr_name() const override { return _name; }
+
+    template <typename DataType>
+    static VExprContextSPtr create(const DataType::FieldType& value) {
+        auto ctx = VExprContext::create_shared(std::make_shared<MockLiteral>(
+                ColumnHelper::create_column_with_name<DataType>({value})));
+        ctx->_prepared = true;
+        ctx->_opened = true;
+        return ctx;
+    }
+
+    template <typename DataType>
+    static VExprContextSPtrs create(const std::vector<typename 
DataType::FieldType>& values) {
+        VExprContextSPtrs ctxs;
+        for (const auto& value : values) {
+            ctxs.push_back(create<DataType>(value));
+        }
+        return ctxs;
+    }
+
+private:
+    const std::string _name = "MockLiteral";
+};
+
+} // namespace vectorized
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to