This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 07dd6830e8 [pipelineX](refactor) add union node in pipelineX (#24286) 07dd6830e8 is described below commit 07dd6830e82a0c9b39d6efc06d1aeb25900a843b Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Wed Sep 13 20:39:58 2023 +0800 [pipelineX](refactor) add union node in pipelineX (#24286) --- be/src/pipeline/exec/union_sink_operator.cpp | 90 ++++++++++++++++++ be/src/pipeline/exec/union_sink_operator.h | 103 ++++++++++++++++++++ be/src/pipeline/exec/union_source_operator.cpp | 35 +++++++ be/src/pipeline/exec/union_source_operator.h | 41 ++++++++ be/src/pipeline/pipeline_x/dependency.h | 15 +++ be/src/pipeline/pipeline_x/operator.cpp | 8 +- be/src/pipeline/pipeline_x/operator.h | 17 +++- .../pipeline_x/pipeline_x_fragment_context.cpp | 33 ++++++- .../pipeline_x/pipeline_x_fragment_context.h | 1 + be/src/pipeline/pipeline_x/pipeline_x_task.h | 6 +- .../data/pipelineX/test_union_operator.out | 13 +++ .../suites/pipelineX/test_union_operator.groovy | 105 +++++++++++++++++++++ 12 files changed, 459 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 9296df78d9..c1fd75d820 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -94,4 +94,94 @@ Status UnionSinkOperator::close(RuntimeState* state) { return StreamingOperator::close(state); } +Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + return Status::OK(); +}; + +UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, + const TPlanNode& tnode, const DescriptorTbl& descs) + : Base(sink_id, tnode.node_id), + _first_materialized_child_idx(tnode.union_node.first_materialized_child_idx), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), + _cur_child_id(child_id), + _child_size(tnode.num_children) {} + +Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); + DCHECK(tnode.__isset.union_node); + { + // Create result_expr_ctx_lists_ from thrift exprs. + auto& result_texpr_lists = tnode.union_node.result_expr_lists; + auto& texprs = result_texpr_lists[_cur_child_id]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + return Status::OK(); +} + +Status UnionSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child_x->row_desc())); + return Status::OK(); +} + +Status UnionSinkOperatorX::open(RuntimeState* state) { + // open const expr lists. + RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state)); + + // open result expr lists. + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + + return Status::OK(); +} + +Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) { + auto& local_state = state->get_sink_local_state(id())->cast<UnionSinkLocalState>(); + if (local_state._output_block == nullptr) { + local_state._output_block = + local_state._shared_state->_data_queue->get_free_block(_cur_child_id); + } + if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through + if (in_block->rows() > 0) { + local_state._output_block->swap(*in_block); + local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block), + _cur_child_id); + } + } else if (_get_first_materialized_child_idx() != children_count() && + _cur_child_id < children_count()) { //need materialized + RETURN_IF_ERROR(materialize_child_block(state, _cur_child_id, in_block, + local_state._output_block.get())); + } else { + return Status::InternalError("maybe can't reach here, execute const expr: {}, {}, {}", + _cur_child_id, _get_first_materialized_child_idx(), + children_count()); + } + if (UNLIKELY(source_state == SourceState::FINISHED)) { + //if _cur_child_id eos, need check to push block + //Now here can't check _output_block rows, even it's row==0, also need push block + //because maybe sink is eos and queue have none data, if not push block + //the source can't can_read again and can't set source finished + if (local_state._output_block) { + local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block), + _cur_child_id); + } + + local_state._shared_state->_data_queue->set_finish(_cur_child_id); + return Status::OK(); + } + // not eos and block rows is enough to output,so push block + if (local_state._output_block && (local_state._output_block->rows() >= state->batch_size())) { + local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block), + _cur_child_id); + } + return Status::OK(); +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 0da75147cc..2bbfab5cfc 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/core/block.h" #include "vec/exec/vunion_node.h" @@ -64,5 +65,107 @@ private: std::shared_ptr<DataQueue> _data_queue; std::unique_ptr<vectorized::Block> _output_block; }; + +class UnionSinkOperatorX; +class UnionSinkLocalState final : public PipelineXSinkLocalState<UnionDependency> { +public: + ENABLE_FACTORY_CREATOR(UnionSinkLocalState); + UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state), _child_row_idx(0) {} + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + friend class UnionSinkOperatorX; + using Base = PipelineXSinkLocalState<UnionDependency>; + using Parent = UnionSinkOperatorX; + +private: + std::unique_ptr<vectorized::Block> _output_block; + + /// Const exprs materialized by this node. These exprs don't refer to any children. + /// Only materialized by the first fragment instance to avoid duplication. + vectorized::VExprContextSPtrs _const_expr; + + /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. + vectorized::VExprContextSPtrs _child_expr; + + /// Index of current row in child_row_block_. + int _child_row_idx; +}; + +class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> { +public: + using Base = DataSinkOperatorX<UnionSinkLocalState>; + + friend class UnionSinkLocalState; + UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + ~UnionSinkOperatorX() override = default; + Status init(const TDataSink& tsink) override { + return Status::InternalError("{} should not init with TDataSink"); + } + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + + bool can_write(RuntimeState* state) override { return true; } + +private: + int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } + + /// Const exprs materialized by this node. These exprs don't refer to any children. + /// Only materialized by the first fragment instance to avoid duplication. + vectorized::VExprContextSPtrs _const_expr; + + /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. + vectorized::VExprContextSPtrs _child_expr; + /// Index of the first non-passthrough child; i.e. a child that needs materialization. + /// 0 when all children are materialized, '_children.size()' when no children are + /// materialized. + const int _first_materialized_child_idx; + + const RowDescriptor _row_descriptor; + const int _cur_child_id; + const int _child_size; + int children_count() const { return _child_size; } + bool is_child_passthrough(int child_idx) const { + DCHECK_LT(child_idx, _child_size); + return child_idx < _first_materialized_child_idx; + } + Status materialize_child_block(RuntimeState* state, int child_id, + vectorized::Block* input_block, + vectorized::Block* output_block) { + DCHECK_LT(child_id, _child_size); + DCHECK(!is_child_passthrough(child_id)); + if (input_block->rows() > 0) { + vectorized::MutableBlock mblock = + vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, + _row_descriptor); + vectorized::Block res; + RETURN_IF_ERROR(materialize_block(state, input_block, child_id, &res)); + RETURN_IF_ERROR(mblock.merge(res)); + } + return Status::OK(); + } + + Status materialize_block(RuntimeState* state, vectorized::Block* src_block, int child_idx, + vectorized::Block* res_block) { + auto& local_state = state->get_sink_local_state(id())->cast<UnionSinkLocalState>(); + const auto& child_exprs = local_state._child_expr; + vectorized::ColumnsWithTypeAndName colunms; + for (size_t i = 0; i < child_exprs.size(); ++i) { + int result_column_id = -1; + RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id)); + colunms.emplace_back(src_block->get_by_position(result_column_id)); + } + local_state._child_row_idx += src_block->rows(); + *res_block = {colunms}; + return Status::OK(); + } +}; + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index f39e0a582b..c1fdae4851 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -97,5 +97,40 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl return Status::OK(); } + +Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + auto& p = _parent->cast<Parent>(); + std::shared_ptr<DataQueue> data_queue = std::make_shared<DataQueue>(p._child_size); + _shared_state->_data_queue.swap(data_queue); + return Status::OK(); +} + +Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + auto& local_state = state->get_local_state(id())->cast<UnionSourceLocalState>(); + bool eos = false; + std::unique_ptr<vectorized::Block> output_block = vectorized::Block::create_unique(); + int child_idx = 0; + local_state._shared_state->_data_queue->get_block_from_queue(&output_block, &child_idx); + if (!output_block) { + return Status::OK(); + } + block->swap(*output_block); + output_block->clear_column_data(row_desc().num_materialized_slots()); + local_state._shared_state->_data_queue->push_free_block(std::move(output_block), child_idx); + + local_state.reached_limit(block, &eos); + //have exectue const expr, queue have no data any more, and child could be colsed + if ((!_has_data(state) && local_state._shared_state->_data_queue->is_all_finish())) { + source_state = SourceState::FINISHED; + } else if (_has_data(state)) { + source_state = SourceState::MORE_DATA; + } else { + source_state = SourceState::DEPEND_ON_SOURCE; + } + return Status::OK(); +} + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 8bd2f484f3..580a7d4a15 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/exec/vunion_node.h" namespace doris { @@ -67,6 +68,46 @@ private: std::shared_ptr<DataQueue> _data_queue; bool _need_read_for_const_expr; }; +class UnionSourceOperatorX; +class UnionSourceLocalState final : public PipelineXLocalState<UnionDependency> { +public: + ENABLE_FACTORY_CREATOR(UnionSourceLocalState); + using Base = PipelineXLocalState<UnionDependency>; + using Parent = UnionSourceOperatorX; + UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + friend class UnionSourceOperatorX; + bool _need_read_for_const_expr {false}; +}; + +class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> { +public: + using Base = OperatorX<UnionSourceLocalState>; + UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : Base(pool, tnode, descs), _child_size(tnode.num_children) {}; + ~UnionSourceOperatorX() override = default; + bool can_read(RuntimeState* state) override { + auto& local_state = state->get_local_state(id())->cast<UnionSourceLocalState>(); + return local_state._shared_state->_data_queue->is_all_finish(); + } + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + bool is_source() const override { return true; } + +private: + bool _has_data(RuntimeState* state) { + auto& local_state = state->get_local_state(id())->cast<UnionSourceLocalState>(); + return local_state._shared_state->_data_queue->remaining_has_data(); + } + bool has_more_const(const RuntimeState* state) const { + return state->per_fragment_instance_idx() == 0; + } + friend class UnionSourceLocalState; + const int _child_size; +}; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 597337ddfa..41c9299f01 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -157,6 +157,21 @@ private: SortSharedState _sort_state; }; +struct UnionSharedState { +public: + std::shared_ptr<DataQueue> _data_queue; +}; + +class UnionDependency final : public Dependency { +public: + using SharedState = UnionSharedState; + UnionDependency(int id) : Dependency(id, "UnionDependency") {} + ~UnionDependency() override = default; + void* shared_state() override { return (void*)&_union_state; }; + +private: + UnionSharedState _union_state; +}; struct AnalyticSharedState { public: AnalyticSharedState() = default; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 14b7fbfd21..328dd43cb0 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -39,6 +39,8 @@ #include "pipeline/exec/sort_source_operator.h" #include "pipeline/exec/streaming_aggregation_sink_operator.h" #include "pipeline/exec/streaming_aggregation_source_operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/exec/union_source_operator.h" #include "util/debug_util.h" namespace doris::pipeline { @@ -268,7 +270,7 @@ Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state, template <typename LocalStateType> void DataSinkOperatorX<LocalStateType>::get_dependency(DependencySPtr& dependency) { - dependency.reset(new typename LocalStateType::Dependency(id())); + dependency.reset(new typename LocalStateType::Dependency(dest_id())); } template <typename LocalStateType> @@ -327,6 +329,7 @@ DECLARE_OPERATOR_X(BlockingAggSinkLocalState) DECLARE_OPERATOR_X(StreamingAggSinkLocalState) DECLARE_OPERATOR_X(ExchangeSinkLocalState) DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState) +DECLARE_OPERATOR_X(UnionSinkLocalState) #undef DECLARE_OPERATOR_X @@ -341,6 +344,7 @@ DECLARE_OPERATOR_X(RepeatLocalState) DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState) DECLARE_OPERATOR_X(AssertNumRowsLocalState) DECLARE_OPERATOR_X(EmptySetLocalState) +DECLARE_OPERATOR_X(UnionSourceLocalState) #undef DECLARE_OPERATOR_X @@ -357,6 +361,7 @@ template class PipelineXSinkLocalState<NestedLoopJoinDependency>; template class PipelineXSinkLocalState<AnalyticDependency>; template class PipelineXSinkLocalState<AggDependency>; template class PipelineXSinkLocalState<FakeDependency>; +template class PipelineXSinkLocalState<UnionDependency>; template class PipelineXLocalState<HashJoinDependency>; template class PipelineXLocalState<SortDependency>; @@ -364,5 +369,6 @@ template class PipelineXLocalState<NestedLoopJoinDependency>; template class PipelineXLocalState<AnalyticDependency>; template class PipelineXLocalState<AggDependency>; template class PipelineXLocalState<FakeDependency>; +template class PipelineXLocalState<UnionDependency>; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 12b3d371a4..e8a129795b 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -398,7 +398,10 @@ protected: class DataSinkOperatorXBase : public OperatorBase { public: - DataSinkOperatorXBase(const int id) : OperatorBase(nullptr), _id(id) {} + DataSinkOperatorXBase(const int id) : OperatorBase(nullptr), _id(id), _dest_id(id) {} + + DataSinkOperatorXBase(const int id, const int dest_id) + : OperatorBase(nullptr), _id(id), _dest_id(dest_id) {} virtual ~DataSinkOperatorXBase() override = default; @@ -465,6 +468,8 @@ public: [[nodiscard]] int id() const override { return _id; } + [[nodiscard]] int dest_id() const { return _dest_id; } + [[nodiscard]] std::string get_name() const override { return _name; } Status finalize(RuntimeState* state) override { return Status::OK(); } @@ -473,6 +478,7 @@ public: protected: const int _id; + const int _dest_id; std::string _name; // Maybe this will be transferred to BufferControlBlock. @@ -488,7 +494,8 @@ class DataSinkOperatorX : public DataSinkOperatorXBase { public: DataSinkOperatorX(const int id) : DataSinkOperatorXBase(id) {} - virtual ~DataSinkOperatorX() override = default; + DataSinkOperatorX(const int id, const int source_id) : DataSinkOperatorXBase(id, source_id) {} + ~DataSinkOperatorX() override = default; Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; @@ -501,7 +508,7 @@ public: using Dependency = DependencyType; PipelineXSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : PipelineXSinkLocalStateBase(parent, state) {} - virtual ~PipelineXSinkLocalState() {} + ~PipelineXSinkLocalState() override = default; virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) override { _dependency = (DependencyType*)info.dependency; @@ -514,7 +521,7 @@ public: return Status::OK(); } - virtual Status close(RuntimeState* state) override { + Status close(RuntimeState* state) override { if (_closed) { return Status::OK(); } @@ -522,7 +529,7 @@ public: return Status::OK(); } - virtual std::string debug_string(int indentation_level) const override; + std::string debug_string(int indentation_level) const override; protected: DependencyType* _dependency; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 8faa2a76b8..03af3aabdd 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -64,6 +64,8 @@ #include "pipeline/exec/sort_source_operator.h" #include "pipeline/exec/streaming_aggregation_sink_operator.h" #include "pipeline/exec/streaming_aggregation_source_operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/exec/union_source_operator.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -296,7 +298,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( } _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl); - std::map<PipelineId, PipelineXTask*> pipeline_id_to_task; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto task = std::make_unique<PipelineXTask>(_pipelines[pip_idx], _total_tasks++, @@ -360,6 +361,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( } } _build_side_pipelines.clear(); + _union_child_pipelines.clear(); _dag.clear(); // register the profile of child data stream sender // for (auto& sender : _multi_cast_stream_sink_senders) { @@ -504,6 +506,9 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_build_side_pipelines.find(parent_idx) != _build_side_pipelines.end() && child_idx > 0) { cur_pipe = _build_side_pipelines[parent_idx]; } + if (_union_child_pipelines.find(parent_idx) != _union_child_pipelines.end()) { + cur_pipe = _union_child_pipelines[parent_idx][child_idx]; + } std::stringstream error_msg; switch (tnode.node_type) { case TPlanNodeType::OLAP_SCAN_NODE: { @@ -588,6 +593,32 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _build_side_pipelines.insert({sink->id(), build_side_pipe}); break; } + case TPlanNodeType::UNION_NODE: { + int child_count = tnode.num_children; + op.reset(new UnionSourceOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } + int father_id = tnode.node_id; + for (int i = 0; i < child_count; i++) { + PipelinePtr build_side_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); + DataSinkOperatorXPtr sink; + sink.reset(new UnionSinkOperatorX(i, father_id + 1000 * (i + 1), pool, tnode, descs)); + RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); + RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); + if (_union_child_pipelines.find(father_id) == _union_child_pipelines.end()) { + _union_child_pipelines.insert({father_id, {build_side_pipe}}); + } else { + _union_child_pipelines[father_id].push_back(build_side_pipe); + } + } + + break; + } case TPlanNodeType::SORT_NODE: { op.reset(new SortSourceOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index a0970b166e..86796fe8e0 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -156,6 +156,7 @@ private: std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state; std::mutex _state_map_lock; + std::map<int, std::vector<PipelinePtr>> _union_child_pipelines; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index c9e1103a58..7e2458b12a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -104,7 +104,11 @@ public: DependencySPtr& get_downstream_dependency() { return _downstream_dependency; } void set_upstream_dependency(DependencySPtr& upstream_dependency) { - _upstream_dependency.insert({upstream_dependency->id(), upstream_dependency}); + if (_upstream_dependency.contains(upstream_dependency->id())) { + upstream_dependency = _upstream_dependency[upstream_dependency->id()]; + } else { + _upstream_dependency.insert({upstream_dependency->id(), upstream_dependency}); + } } Dependency* get_upstream_dependency(int id) { diff --git a/regression-test/data/pipelineX/test_union_operator.out b/regression-test/data/pipelineX/test_union_operator.out new file mode 100644 index 0000000000..439b88e790 --- /dev/null +++ b/regression-test/data/pipelineX/test_union_operator.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !pipeline -- +20 + +-- !pipeline -- +21 + +-- !pipelineX -- +20 + +-- !pipelineX -- +21 + diff --git a/regression-test/suites/pipelineX/test_union_operator.groovy b/regression-test/suites/pipelineX/test_union_operator.groovy new file mode 100644 index 0000000000..59448c35ec --- /dev/null +++ b/regression-test/suites/pipelineX/test_union_operator.groovy @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_union_operator") { + sql """ DROP TABLE IF EXISTS UNIONNODE """ + sql """ + CREATE TABLE IF NOT EXISTS UNIONNODE ( + `k1` INT(11) NULL COMMENT "", + `k2` INT(11) NULL COMMENT "", + `k3` INT(11) NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2" + ); + """ + sql """ set forbid_unknown_col_stats = false """ + sql """ + INSERT INTO UNIONNODE (k1, k2, k3) VALUES + (5, 10, 15), + (8, 12, 6), + (3, 7, 11), + (9, 4, 14), + (2, 13, 1), + (6, 20, 16), + (11, 17, 19), + (7, 18, 8), + (12, 9, 2), + (4, 15, 10), + (16, 3, 13), + (10, 1, 7), + (14, 5, 12), + (19, 6, 4), + (1, 2, 18), + (13, 11, 3), + (18, 8, 5), + (15, 19, 9), + (17, 14, 17), + (20, 16, 45); + """ + + sql"""set enable_pipeline_engine = true,parallel_pipeline_task_num = 8; """ + + + + qt_pipeline """ + SELECT count(*) + FROM ( + SELECT k1 FROM UNIONNODE + UNION + SELECT k2 FROM UNIONNODE + ) AS merged_result; + """ + qt_pipeline """ + SELECT count(*) + FROM ( + SELECT k1 FROM UNIONNODE + UNION + SELECT k2 FROM UNIONNODE + UNION + SELECT k3 FROM UNIONNODE + ) AS merged_result; + + + """ + + sql"""set experimental_enable_pipeline_x_engine=true,parallel_pipeline_task_num = 8;; """ + + qt_pipelineX """ + SELECT count(*) + FROM ( + SELECT k1 FROM UNIONNODE + UNION + SELECT k2 FROM UNIONNODE + ) AS merged_result; + """ + qt_pipelineX """ + SELECT count(*) + FROM ( + SELECT k1 FROM UNIONNODE + UNION + SELECT k2 FROM UNIONNODE + UNION + SELECT k3 FROM UNIONNODE + ) AS merged_result; + """ + + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org