This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 07dd6830e8 [pipelineX](refactor) add union node in pipelineX (#24286)
07dd6830e8 is described below

commit 07dd6830e82a0c9b39d6efc06d1aeb25900a843b
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Wed Sep 13 20:39:58 2023 +0800

    [pipelineX](refactor) add union node in pipelineX (#24286)
---
 be/src/pipeline/exec/union_sink_operator.cpp       |  90 ++++++++++++++++++
 be/src/pipeline/exec/union_sink_operator.h         | 103 ++++++++++++++++++++
 be/src/pipeline/exec/union_source_operator.cpp     |  35 +++++++
 be/src/pipeline/exec/union_source_operator.h       |  41 ++++++++
 be/src/pipeline/pipeline_x/dependency.h            |  15 +++
 be/src/pipeline/pipeline_x/operator.cpp            |   8 +-
 be/src/pipeline/pipeline_x/operator.h              |  17 +++-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  33 ++++++-
 .../pipeline_x/pipeline_x_fragment_context.h       |   1 +
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |   6 +-
 .../data/pipelineX/test_union_operator.out         |  13 +++
 .../suites/pipelineX/test_union_operator.groovy    | 105 +++++++++++++++++++++
 12 files changed, 459 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index 9296df78d9..c1fd75d820 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -94,4 +94,94 @@ Status UnionSinkOperator::close(RuntimeState* state) {
     return StreamingOperator::close(state);
 }
 
+Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    return Status::OK();
+};
+
+UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* 
pool,
+                                       const TPlanNode& tnode, const 
DescriptorTbl& descs)
+        : Base(sink_id, tnode.node_id),
+          
_first_materialized_child_idx(tnode.union_node.first_materialized_child_idx),
+          _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
+          _cur_child_id(child_id),
+          _child_size(tnode.num_children) {}
+
+Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
+    DCHECK(tnode.__isset.union_node);
+    {
+        // Create result_expr_ctx_lists_ from thrift exprs.
+        auto& result_texpr_lists = tnode.union_node.result_expr_lists;
+        auto& texprs = result_texpr_lists[_cur_child_id];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    return Status::OK();
+}
+
+Status UnionSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child_x->row_desc()));
+    return Status::OK();
+}
+
+Status UnionSinkOperatorX::open(RuntimeState* state) {
+    // open const expr lists.
+    RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state));
+
+    // open result expr lists.
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+
+    return Status::OK();
+}
+
+Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block,
+                                SourceState source_state) {
+    auto& local_state = 
state->get_sink_local_state(id())->cast<UnionSinkLocalState>();
+    if (local_state._output_block == nullptr) {
+        local_state._output_block =
+                
local_state._shared_state->_data_queue->get_free_block(_cur_child_id);
+    }
+    if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through
+        if (in_block->rows() > 0) {
+            local_state._output_block->swap(*in_block);
+            
local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block),
+                                                               _cur_child_id);
+        }
+    } else if (_get_first_materialized_child_idx() != children_count() &&
+               _cur_child_id < children_count()) { //need materialized
+        RETURN_IF_ERROR(materialize_child_block(state, _cur_child_id, in_block,
+                                                
local_state._output_block.get()));
+    } else {
+        return Status::InternalError("maybe can't reach here, execute const 
expr: {}, {}, {}",
+                                     _cur_child_id, 
_get_first_materialized_child_idx(),
+                                     children_count());
+    }
+    if (UNLIKELY(source_state == SourceState::FINISHED)) {
+        //if _cur_child_id eos, need check to push block
+        //Now here can't check _output_block rows, even it's row==0, also need 
push block
+        //because maybe sink is eos and queue have none data, if not push block
+        //the source can't can_read again and can't set source finished
+        if (local_state._output_block) {
+            
local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block),
+                                                               _cur_child_id);
+        }
+
+        local_state._shared_state->_data_queue->set_finish(_cur_child_id);
+        return Status::OK();
+    }
+    // not eos and block rows is enough to output,so push block
+    if (local_state._output_block && (local_state._output_block->rows() >= 
state->batch_size())) {
+        
local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block),
+                                                           _cur_child_id);
+    }
+    return Status::OK();
+}
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 0da75147cc..2bbfab5cfc 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -23,6 +23,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
 #include "vec/core/block.h"
 #include "vec/exec/vunion_node.h"
 
@@ -64,5 +65,107 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
     std::unique_ptr<vectorized::Block> _output_block;
 };
+
+class UnionSinkOperatorX;
+class UnionSinkLocalState final : public 
PipelineXSinkLocalState<UnionDependency> {
+public:
+    ENABLE_FACTORY_CREATOR(UnionSinkLocalState);
+    UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state), _child_row_idx(0) {}
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    friend class UnionSinkOperatorX;
+    using Base = PipelineXSinkLocalState<UnionDependency>;
+    using Parent = UnionSinkOperatorX;
+
+private:
+    std::unique_ptr<vectorized::Block> _output_block;
+
+    /// Const exprs materialized by this node. These exprs don't refer to any 
children.
+    /// Only materialized by the first fragment instance to avoid duplication.
+    vectorized::VExprContextSPtrs _const_expr;
+
+    /// Exprs materialized by this node. The i-th result expr list refers to 
the i-th child.
+    vectorized::VExprContextSPtrs _child_expr;
+
+    /// Index of current row in child_row_block_.
+    int _child_row_idx;
+};
+
+class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> 
{
+public:
+    using Base = DataSinkOperatorX<UnionSinkLocalState>;
+
+    friend class UnionSinkLocalState;
+    UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const 
TPlanNode& tnode,
+                       const DescriptorTbl& descs);
+    ~UnionSinkOperatorX() override = default;
+    Status init(const TDataSink& tsink) override {
+        return Status::InternalError("{} should not init with TDataSink");
+    }
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override;
+
+    bool can_write(RuntimeState* state) override { return true; }
+
+private:
+    int _get_first_materialized_child_idx() const { return 
_first_materialized_child_idx; }
+
+    /// Const exprs materialized by this node. These exprs don't refer to any 
children.
+    /// Only materialized by the first fragment instance to avoid duplication.
+    vectorized::VExprContextSPtrs _const_expr;
+
+    /// Exprs materialized by this node. The i-th result expr list refers to 
the i-th child.
+    vectorized::VExprContextSPtrs _child_expr;
+    /// Index of the first non-passthrough child; i.e. a child that needs 
materialization.
+    /// 0 when all children are materialized, '_children.size()' when no 
children are
+    /// materialized.
+    const int _first_materialized_child_idx;
+
+    const RowDescriptor _row_descriptor;
+    const int _cur_child_id;
+    const int _child_size;
+    int children_count() const { return _child_size; }
+    bool is_child_passthrough(int child_idx) const {
+        DCHECK_LT(child_idx, _child_size);
+        return child_idx < _first_materialized_child_idx;
+    }
+    Status materialize_child_block(RuntimeState* state, int child_id,
+                                   vectorized::Block* input_block,
+                                   vectorized::Block* output_block) {
+        DCHECK_LT(child_id, _child_size);
+        DCHECK(!is_child_passthrough(child_id));
+        if (input_block->rows() > 0) {
+            vectorized::MutableBlock mblock =
+                    
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
+                                                                               
_row_descriptor);
+            vectorized::Block res;
+            RETURN_IF_ERROR(materialize_block(state, input_block, child_id, 
&res));
+            RETURN_IF_ERROR(mblock.merge(res));
+        }
+        return Status::OK();
+    }
+
+    Status materialize_block(RuntimeState* state, vectorized::Block* 
src_block, int child_idx,
+                             vectorized::Block* res_block) {
+        auto& local_state = 
state->get_sink_local_state(id())->cast<UnionSinkLocalState>();
+        const auto& child_exprs = local_state._child_expr;
+        vectorized::ColumnsWithTypeAndName colunms;
+        for (size_t i = 0; i < child_exprs.size(); ++i) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(child_exprs[i]->execute(src_block, 
&result_column_id));
+            colunms.emplace_back(src_block->get_by_position(result_column_id));
+        }
+        local_state._child_row_idx += src_block->rows();
+        *res_block = {colunms};
+        return Status::OK();
+    }
+};
+
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index f39e0a582b..c1fdae4851 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -97,5 +97,40 @@ Status UnionSourceOperator::get_block(RuntimeState* state, 
vectorized::Block* bl
 
     return Status::OK();
 }
+
+Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    auto& p = _parent->cast<Parent>();
+    std::shared_ptr<DataQueue> data_queue = 
std::make_shared<DataQueue>(p._child_size);
+    _shared_state->_data_queue.swap(data_queue);
+    return Status::OK();
+}
+
+Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block,
+                                       SourceState& source_state) {
+    auto& local_state = 
state->get_local_state(id())->cast<UnionSourceLocalState>();
+    bool eos = false;
+    std::unique_ptr<vectorized::Block> output_block = 
vectorized::Block::create_unique();
+    int child_idx = 0;
+    
local_state._shared_state->_data_queue->get_block_from_queue(&output_block, 
&child_idx);
+    if (!output_block) {
+        return Status::OK();
+    }
+    block->swap(*output_block);
+    output_block->clear_column_data(row_desc().num_materialized_slots());
+    
local_state._shared_state->_data_queue->push_free_block(std::move(output_block),
 child_idx);
+
+    local_state.reached_limit(block, &eos);
+    //have exectue const expr, queue have no data any more, and child could be 
colsed
+    if ((!_has_data(state) && 
local_state._shared_state->_data_queue->is_all_finish())) {
+        source_state = SourceState::FINISHED;
+    } else if (_has_data(state)) {
+        source_state = SourceState::MORE_DATA;
+    } else {
+        source_state = SourceState::DEPEND_ON_SOURCE;
+    }
+    return Status::OK();
+}
+
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 8bd2f484f3..580a7d4a15 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -22,6 +22,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
 #include "vec/exec/vunion_node.h"
 
 namespace doris {
@@ -67,6 +68,46 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
     bool _need_read_for_const_expr;
 };
+class UnionSourceOperatorX;
+class UnionSourceLocalState final : public 
PipelineXLocalState<UnionDependency> {
+public:
+    ENABLE_FACTORY_CREATOR(UnionSourceLocalState);
+    using Base = PipelineXLocalState<UnionDependency>;
+    using Parent = UnionSourceOperatorX;
+    UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
+
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+    friend class UnionSourceOperatorX;
+    bool _need_read_for_const_expr {false};
+};
+
+class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
+public:
+    using Base = OperatorX<UnionSourceLocalState>;
+    UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
+            : Base(pool, tnode, descs), _child_size(tnode.num_children) {};
+    ~UnionSourceOperatorX() override = default;
+    bool can_read(RuntimeState* state) override {
+        auto& local_state = 
state->get_local_state(id())->cast<UnionSourceLocalState>();
+        return local_state._shared_state->_data_queue->is_all_finish();
+    }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override;
+
+    bool is_source() const override { return true; }
+
+private:
+    bool _has_data(RuntimeState* state) {
+        auto& local_state = 
state->get_local_state(id())->cast<UnionSourceLocalState>();
+        return local_state._shared_state->_data_queue->remaining_has_data();
+    }
+    bool has_more_const(const RuntimeState* state) const {
+        return state->per_fragment_instance_idx() == 0;
+    }
+    friend class UnionSourceLocalState;
+    const int _child_size;
+};
 
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 597337ddfa..41c9299f01 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -157,6 +157,21 @@ private:
     SortSharedState _sort_state;
 };
 
+struct UnionSharedState {
+public:
+    std::shared_ptr<DataQueue> _data_queue;
+};
+
+class UnionDependency final : public Dependency {
+public:
+    using SharedState = UnionSharedState;
+    UnionDependency(int id) : Dependency(id, "UnionDependency") {}
+    ~UnionDependency() override = default;
+    void* shared_state() override { return (void*)&_union_state; };
+
+private:
+    UnionSharedState _union_state;
+};
 struct AnalyticSharedState {
 public:
     AnalyticSharedState() = default;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 14b7fbfd21..328dd43cb0 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -39,6 +39,8 @@
 #include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/exec/streaming_aggregation_sink_operator.h"
 #include "pipeline/exec/streaming_aggregation_source_operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/exec/union_source_operator.h"
 #include "util/debug_util.h"
 
 namespace doris::pipeline {
@@ -268,7 +270,7 @@ Status 
DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state,
 
 template <typename LocalStateType>
 void DataSinkOperatorX<LocalStateType>::get_dependency(DependencySPtr& 
dependency) {
-    dependency.reset(new typename LocalStateType::Dependency(id()));
+    dependency.reset(new typename LocalStateType::Dependency(dest_id()));
 }
 
 template <typename LocalStateType>
@@ -327,6 +329,7 @@ DECLARE_OPERATOR_X(BlockingAggSinkLocalState)
 DECLARE_OPERATOR_X(StreamingAggSinkLocalState)
 DECLARE_OPERATOR_X(ExchangeSinkLocalState)
 DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState)
+DECLARE_OPERATOR_X(UnionSinkLocalState)
 
 #undef DECLARE_OPERATOR_X
 
@@ -341,6 +344,7 @@ DECLARE_OPERATOR_X(RepeatLocalState)
 DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
 DECLARE_OPERATOR_X(AssertNumRowsLocalState)
 DECLARE_OPERATOR_X(EmptySetLocalState)
+DECLARE_OPERATOR_X(UnionSourceLocalState)
 
 #undef DECLARE_OPERATOR_X
 
@@ -357,6 +361,7 @@ template class 
PipelineXSinkLocalState<NestedLoopJoinDependency>;
 template class PipelineXSinkLocalState<AnalyticDependency>;
 template class PipelineXSinkLocalState<AggDependency>;
 template class PipelineXSinkLocalState<FakeDependency>;
+template class PipelineXSinkLocalState<UnionDependency>;
 
 template class PipelineXLocalState<HashJoinDependency>;
 template class PipelineXLocalState<SortDependency>;
@@ -364,5 +369,6 @@ template class 
PipelineXLocalState<NestedLoopJoinDependency>;
 template class PipelineXLocalState<AnalyticDependency>;
 template class PipelineXLocalState<AggDependency>;
 template class PipelineXLocalState<FakeDependency>;
+template class PipelineXLocalState<UnionDependency>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 12b3d371a4..e8a129795b 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -398,7 +398,10 @@ protected:
 
 class DataSinkOperatorXBase : public OperatorBase {
 public:
-    DataSinkOperatorXBase(const int id) : OperatorBase(nullptr), _id(id) {}
+    DataSinkOperatorXBase(const int id) : OperatorBase(nullptr), _id(id), 
_dest_id(id) {}
+
+    DataSinkOperatorXBase(const int id, const int dest_id)
+            : OperatorBase(nullptr), _id(id), _dest_id(dest_id) {}
 
     virtual ~DataSinkOperatorXBase() override = default;
 
@@ -465,6 +468,8 @@ public:
 
     [[nodiscard]] int id() const override { return _id; }
 
+    [[nodiscard]] int dest_id() const { return _dest_id; }
+
     [[nodiscard]] std::string get_name() const override { return _name; }
 
     Status finalize(RuntimeState* state) override { return Status::OK(); }
@@ -473,6 +478,7 @@ public:
 
 protected:
     const int _id;
+    const int _dest_id;
     std::string _name;
 
     // Maybe this will be transferred to BufferControlBlock.
@@ -488,7 +494,8 @@ class DataSinkOperatorX : public DataSinkOperatorXBase {
 public:
     DataSinkOperatorX(const int id) : DataSinkOperatorXBase(id) {}
 
-    virtual ~DataSinkOperatorX() override = default;
+    DataSinkOperatorX(const int id, const int source_id) : 
DataSinkOperatorXBase(id, source_id) {}
+    ~DataSinkOperatorX() override = default;
 
     Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
 
@@ -501,7 +508,7 @@ public:
     using Dependency = DependencyType;
     PipelineXSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
             : PipelineXSinkLocalStateBase(parent, state) {}
-    virtual ~PipelineXSinkLocalState() {}
+    ~PipelineXSinkLocalState() override = default;
 
     virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) 
override {
         _dependency = (DependencyType*)info.dependency;
@@ -514,7 +521,7 @@ public:
         return Status::OK();
     }
 
-    virtual Status close(RuntimeState* state) override {
+    Status close(RuntimeState* state) override {
         if (_closed) {
             return Status::OK();
         }
@@ -522,7 +529,7 @@ public:
         return Status::OK();
     }
 
-    virtual std::string debug_string(int indentation_level) const override;
+    std::string debug_string(int indentation_level) const override;
 
 protected:
     DependencyType* _dependency;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 8faa2a76b8..03af3aabdd 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -64,6 +64,8 @@
 #include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/exec/streaming_aggregation_sink_operator.h"
 #include "pipeline/exec/streaming_aggregation_source_operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/exec/union_source_operator.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -296,7 +298,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         }
 
         _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
-
         std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             auto task = std::make_unique<PipelineXTask>(_pipelines[pip_idx], 
_total_tasks++,
@@ -360,6 +361,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         }
     }
     _build_side_pipelines.clear();
+    _union_child_pipelines.clear();
     _dag.clear();
     // register the profile of child data stream sender
     //    for (auto& sender : _multi_cast_stream_sink_senders) {
@@ -504,6 +506,9 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
     if (_build_side_pipelines.find(parent_idx) != _build_side_pipelines.end() 
&& child_idx > 0) {
         cur_pipe = _build_side_pipelines[parent_idx];
     }
+    if (_union_child_pipelines.find(parent_idx) != 
_union_child_pipelines.end()) {
+        cur_pipe = _union_child_pipelines[parent_idx][child_idx];
+    }
     std::stringstream error_msg;
     switch (tnode.node_type) {
     case TPlanNodeType::OLAP_SCAN_NODE: {
@@ -588,6 +593,32 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         _build_side_pipelines.insert({sink->id(), build_side_pipe});
         break;
     }
+    case TPlanNodeType::UNION_NODE: {
+        int child_count = tnode.num_children;
+        op.reset(new UnionSourceOperatorX(pool, tnode, descs));
+        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
+        int father_id = tnode.node_id;
+        for (int i = 0; i < child_count; i++) {
+            PipelinePtr build_side_pipe = add_pipeline();
+            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
+            DataSinkOperatorXPtr sink;
+            sink.reset(new UnionSinkOperatorX(i, father_id + 1000 * (i + 1), 
pool, tnode, descs));
+            RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+            RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
+            if (_union_child_pipelines.find(father_id) == 
_union_child_pipelines.end()) {
+                _union_child_pipelines.insert({father_id, {build_side_pipe}});
+            } else {
+                _union_child_pipelines[father_id].push_back(build_side_pipe);
+            }
+        }
+
+        break;
+    }
     case TPlanNodeType::SORT_NODE: {
         op.reset(new SortSourceOperatorX(pool, tnode, descs));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index a0970b166e..86796fe8e0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -156,6 +156,7 @@ private:
 
     std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
     std::mutex _state_map_lock;
+    std::map<int, std::vector<PipelinePtr>> _union_child_pipelines;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index c9e1103a58..7e2458b12a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -104,7 +104,11 @@ public:
 
     DependencySPtr& get_downstream_dependency() { return 
_downstream_dependency; }
     void set_upstream_dependency(DependencySPtr& upstream_dependency) {
-        _upstream_dependency.insert({upstream_dependency->id(), 
upstream_dependency});
+        if (_upstream_dependency.contains(upstream_dependency->id())) {
+            upstream_dependency = 
_upstream_dependency[upstream_dependency->id()];
+        } else {
+            _upstream_dependency.insert({upstream_dependency->id(), 
upstream_dependency});
+        }
     }
 
     Dependency* get_upstream_dependency(int id) {
diff --git a/regression-test/data/pipelineX/test_union_operator.out 
b/regression-test/data/pipelineX/test_union_operator.out
new file mode 100644
index 0000000000..439b88e790
--- /dev/null
+++ b/regression-test/data/pipelineX/test_union_operator.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !pipeline --
+20
+
+-- !pipeline --
+21
+
+-- !pipelineX --
+20
+
+-- !pipelineX --
+21
+
diff --git a/regression-test/suites/pipelineX/test_union_operator.groovy 
b/regression-test/suites/pipelineX/test_union_operator.groovy
new file mode 100644
index 0000000000..59448c35ec
--- /dev/null
+++ b/regression-test/suites/pipelineX/test_union_operator.groovy
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_union_operator") {
+    sql """ DROP TABLE IF EXISTS UNIONNODE """
+    sql """
+       CREATE TABLE IF NOT EXISTS UNIONNODE (
+              `k1` INT(11) NULL COMMENT "",
+              `k2` INT(11) NULL COMMENT "",
+              `k3` INT(11) NULL COMMENT ""
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "storage_format" = "V2"
+    );
+    """
+    sql """ set forbid_unknown_col_stats = false """
+    sql """
+    INSERT INTO UNIONNODE (k1, k2, k3) VALUES
+    (5, 10, 15),
+    (8, 12, 6),
+    (3, 7, 11),
+    (9, 4, 14),
+    (2, 13, 1),
+    (6, 20, 16),
+    (11, 17, 19),
+    (7, 18, 8),
+    (12, 9, 2),
+    (4, 15, 10),
+    (16, 3, 13),
+    (10, 1, 7),
+    (14, 5, 12),
+    (19, 6, 4),
+    (1, 2, 18),
+    (13, 11, 3),
+    (18, 8, 5),
+    (15, 19, 9),
+    (17, 14, 17),
+    (20, 16, 45);
+    """
+
+    sql"""set enable_pipeline_engine = true,parallel_pipeline_task_num = 8; """
+
+
+
+    qt_pipeline """
+        SELECT count(*)
+        FROM (
+            SELECT k1 FROM UNIONNODE
+            UNION 
+            SELECT k2 FROM UNIONNODE
+        ) AS merged_result;
+    """
+    qt_pipeline """
+        SELECT count(*)
+        FROM (
+            SELECT k1 FROM UNIONNODE
+            UNION 
+            SELECT k2 FROM UNIONNODE
+            UNION 
+            SELECT k3 FROM UNIONNODE
+        ) AS merged_result;
+
+
+    """
+    
+    sql"""set 
experimental_enable_pipeline_x_engine=true,parallel_pipeline_task_num = 8;;    
"""
+
+    qt_pipelineX """
+        SELECT count(*)
+        FROM (
+            SELECT k1 FROM UNIONNODE
+            UNION 
+            SELECT k2 FROM UNIONNODE
+        ) AS merged_result;
+    """
+    qt_pipelineX """
+        SELECT count(*)
+        FROM (
+            SELECT k1 FROM UNIONNODE
+            UNION 
+            SELECT k2 FROM UNIONNODE
+            UNION 
+            SELECT k3 FROM UNIONNODE
+        ) AS merged_result;
+    """
+
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to