This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bf737b12530 [Improvement](local shuffle) Improve local shuffle strategy (#41789) bf737b12530 is described below commit bf737b12530301cd3e2dd6d50918cc3051e76a7c Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Oct 25 10:20:23 2024 +0800 [Improvement](local shuffle) Improve local shuffle strategy (#41789) Add local shuffle to unpartitioned fragment to add parallel for perfomance ```sql SELECT h1.UserID, h2.URL, COUNT(*) AS visit_count FROM ( SELECT * FROM hits_10m LIMIT 5000 ) AS h1 CROSS JOIN ( SELECT * FROM hits_10m LIMIT 5000 ) AS h2 GROUP BY h1.UserID, h2.URL ORDER BY visit_count DESC LIMIT 1000 ``` Add a rule to apply local exchanger: ``` ┌───────────────────────┐ ┌───────────────────────┐ │ │ │ │ │Exchange(UNPARTITIONED)│ │Exchange(UNPARTITIONED)│ │ │ │ │ └───────────────────────┴──────┬────────┴───────────────────────┘ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ CROSS JOIN │ │ │ └──────┬──────┘ │ │ │ ┌──────────────────▼─────────────────────┐ │ │ │ LOCAL EXCHANGE (HASH PARTITION) 1 -> n │ │ │ └──────────────────┬─────────────────────┘ │ │ │ │ ▼ ┌──▼────┐ │ │ │ AGG │ │ │ └───────┘ ``` before: 1 min 17.79 sec after: 16.73 sec --- be/src/pipeline/dependency.h | 8 +- be/src/pipeline/exec/aggregation_sink_operator.cpp | 5 +- be/src/pipeline/exec/aggregation_sink_operator.h | 7 +- .../pipeline/exec/aggregation_source_operator.cpp | 4 +- be/src/pipeline/exec/analytic_sink_operator.cpp | 4 +- be/src/pipeline/exec/analytic_source_operator.cpp | 1 + be/src/pipeline/exec/assert_num_rows_operator.cpp | 1 + .../distinct_streaming_aggregation_operator.cpp | 4 +- .../exec/distinct_streaming_aggregation_operator.h | 4 + be/src/pipeline/exec/exchange_sink_operator.h | 1 + be/src/pipeline/exec/join_build_sink_operator.cpp | 2 + be/src/pipeline/exec/join_probe_operator.cpp | 1 + .../exec/nested_loop_join_probe_operator.h | 4 +- be/src/pipeline/exec/operator.cpp | 9 +- be/src/pipeline/exec/operator.h | 6 +- .../partitioned_aggregation_source_operator.cpp | 4 + .../exec/partitioned_aggregation_source_operator.h | 2 + be/src/pipeline/exec/sort_sink_operator.cpp | 4 +- be/src/pipeline/exec/sort_sink_operator.h | 3 +- be/src/pipeline/exec/sort_source_operator.cpp | 4 +- be/src/pipeline/exec/union_source_operator.h | 4 +- .../local_exchange_sink_operator.cpp | 6 +- .../local_exchange/local_exchange_sink_operator.h | 2 +- be/src/pipeline/pipeline.cpp | 43 +++++++- be/src/pipeline/pipeline.h | 44 ++++---- be/src/pipeline/pipeline_fragment_context.cpp | 120 +++++++++++++-------- be/src/pipeline/pipeline_fragment_context.h | 6 +- .../org/apache/doris/planner/AggregationNode.java | 5 + .../org/apache/doris/planner/AnalyticEvalNode.java | 5 + .../apache/doris/planner/AssertNumRowsNode.java | 5 + .../org/apache/doris/planner/DataPartition.java | 4 + .../org/apache/doris/planner/EmptySetNode.java | 4 + .../org/apache/doris/planner/ExchangeNode.java | 10 ++ .../org/apache/doris/planner/JoinNodeBase.java | 1 - .../apache/doris/planner/NestedLoopJoinNode.java | 17 +++ .../org/apache/doris/planner/PlanFragment.java | 35 ++++++ .../java/org/apache/doris/planner/PlanNode.java | 10 ++ .../java/org/apache/doris/planner/RepeatNode.java | 5 + .../java/org/apache/doris/planner/ScanNode.java | 5 + .../java/org/apache/doris/planner/SelectNode.java | 5 + .../java/org/apache/doris/planner/SortNode.java | 5 + .../java/org/apache/doris/planner/UnionNode.java | 5 + .../main/java/org/apache/doris/qe/Coordinator.java | 22 ++++ gensrc/thrift/PlanNodes.thrift | 1 + gensrc/thrift/Planner.thrift | 4 + .../insert_into_table/complex_insert.groovy | 6 +- .../distribute/local_shuffle.groovy | 12 +-- 47 files changed, 361 insertions(+), 108 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 0885dbf380f..8060ee8362d 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -111,19 +111,19 @@ public: // Notify downstream pipeline tasks this dependency is ready. void set_ready(); void set_ready_to_read() { - DCHECK(_shared_state->source_deps.size() == 1) << debug_string(); + DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string(); _shared_state->source_deps.front()->set_ready(); } void set_block_to_read() { - DCHECK(_shared_state->source_deps.size() == 1) << debug_string(); + DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string(); _shared_state->source_deps.front()->block(); } void set_ready_to_write() { - DCHECK(_shared_state->sink_deps.size() == 1) << debug_string(); + DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string(); _shared_state->sink_deps.front()->set_ready(); } void set_block_to_write() { - DCHECK(_shared_state->sink_deps.size() == 1) << debug_string(); + DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string(); _shared_state->sink_deps.front()->block(); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 4007f50f58a..5fb14c02585 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -717,7 +717,10 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), _require_bucket_distribution(require_bucket_distribution), - _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), + _without_key(tnode.agg_node.grouping_exprs.empty()) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state)); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 1f846ec88ff..8271f1451b4 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -143,9 +143,8 @@ public: DataDistribution required_data_distribution() const override { if (_probe_expr_ctxs.empty()) { - return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child - ->ignore_data_distribution() - ? DataDistribution(ExchangeType::PASSTHROUGH) + return _needs_finalize + ? DataDistribution(ExchangeType::NOOP) : DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(); } return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator @@ -204,8 +203,8 @@ protected: const std::vector<TExpr> _partition_exprs; const bool _is_colocate; const bool _require_bucket_distribution; - RowDescriptor _agg_fn_output_row_descriptor; + const bool _without_key; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index c68601fcdca..6d4cd291079 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -441,7 +441,9 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : Base(pool, tnode, operator_id, descs), _needs_finalize(tnode.agg_node.need_finalize), - _without_key(tnode.agg_node.grouping_exprs.empty()) {} + _without_key(tnode.agg_node.grouping_exprs.empty()) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 1da5c1f7c35..afe9aeab8fd 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -201,7 +201,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, _require_bucket_distribution(require_bucket_distribution), _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution ? tnode.distribute_expr_lists[0] - : tnode.analytic_node.partition_exprs) {} + : tnode.analytic_node.partition_exprs) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 134a0ad82d7..019f95042c2 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -475,6 +475,7 @@ AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNo _has_range_window(tnode.analytic_node.window.type == TAnalyticWindowType::RANGE), _has_window_start(tnode.analytic_node.window.__isset.window_start), _has_window_end(tnode.analytic_node.window.__isset.window_end) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; _fn_scope = AnalyticFnScope::PARTITION; if (tnode.analytic_node.__isset.window && tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) { diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index c1a02b6f838..345e42b7d96 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -27,6 +27,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode : StreamingOperatorX<AssertNumRowsLocalState>(pool, tnode, operator_id, descs), _desired_num_rows(tnode.assert_num_rows_node.desired_num_rows), _subquery_string(tnode.assert_num_rows_node.subquery_string) { + _is_serial_operator = true; if (tnode.assert_num_rows_node.__isset.assertion) { _assertion = tnode.assert_num_rows_node.assertion; } else { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index ddec533a9ff..a59af8ce7b4 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -326,7 +326,9 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i ? tnode.distribute_expr_lists[0] : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), - _require_bucket_distribution(require_bucket_distribution) { + _require_bucket_distribution(require_bucket_distribution), + _without_key(tnode.agg_node.grouping_exprs.empty()) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 71d289402ec..1f7a21190ad 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -104,6 +104,9 @@ public: bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution() const override { + if (_needs_finalize && _probe_expr_ctxs.empty()) { + return {ExchangeType::NOOP}; + } if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) @@ -136,6 +139,7 @@ private: /// The total size of the row from the aggregate functions. size_t _total_size_of_aggregate_states = 0; bool _is_streaming_preagg = false; + const bool _without_key; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 8af944728a2..689172dfc6b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -224,6 +224,7 @@ public: Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* src, PBlock* dest, int num_receivers = 1); DataDistribution required_data_distribution() const override; + bool is_serial_operator() const override { return true; } private: friend class ExchangeSinkLocalState; diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp b/be/src/pipeline/exec/join_build_sink_operator.cpp index a1f3262d6ed..fc0d3b87460 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.cpp +++ b/be/src/pipeline/exec/join_build_sink_operator.cpp @@ -82,6 +82,8 @@ JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool, _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join), _runtime_filter_descs(tnode.runtime_filters) { + DataSinkOperatorX<LocalStateType>::_is_serial_operator = + tnode.__isset.is_serial_operator && tnode.is_serial_operator; _init_join_op(); if (_is_mark_join) { DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN || diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 8e5010d7513..76dc75a90d8 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -220,6 +220,7 @@ JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T : true) ) { + Base::_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; if (tnode.__isset.hash_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 4121de64210..5b0fec159e2 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -203,7 +203,9 @@ public: } DataDistribution required_data_distribution() const override { - if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN || + _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { return {ExchangeType::NOOP}; } return {ExchangeType::ADAPTIVE_PASSTHROUGH}; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 5a13fdcbd84..6e3099db748 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -141,8 +141,9 @@ std::string PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentatio std::string OperatorXBase::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}", - std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks); + fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}, _is_serial_operator={}", + std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks, + _is_serial_operator); return fmt::to_string(debug_string_buffer); } @@ -363,8 +364,8 @@ void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) std::string DataSinkOperatorXBase::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}", std::string(indentation_level * 2, ' '), - _name, node_id()); + fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}", + std::string(indentation_level * 2, ' '), _name, node_id(), _is_serial_operator); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index b5bd0fe4713..5df0a19498f 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -101,6 +101,9 @@ public: return Status::OK(); } + // Operators need to be executed serially. (e.g. finalized agg without key) + [[nodiscard]] virtual bool is_serial_operator() const { return _is_serial_operator; } + [[nodiscard]] bool is_closed() const { return _is_closed; } virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; } @@ -122,6 +125,7 @@ protected: bool _is_closed; bool _followed_by_shuffled_operator = false; + bool _is_serial_operator = false; }; class PipelineXLocalStateBase { @@ -444,7 +448,7 @@ public: Status init(const TDataSink& tsink) override; [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets, - const bool is_shuffled_hash_join, + const bool use_global_hash_shuffle, const std::map<int, int>& shuffle_idx_to_instance_idx) { return Status::InternalError("init() is only implemented in local exchange!"); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 48df5587198..655a6e19725 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -118,6 +118,10 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) { return _agg_source_operator->close(state); } +bool PartitionedAggSourceOperatorX::is_serial_operator() const { + return _agg_source_operator->is_serial_operator(); +} + Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index edae99c716a..7e73241745e 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -91,6 +91,8 @@ public: bool is_source() const override { return true; } + bool is_serial_operator() const override; + private: friend class PartitionedAggLocalState; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index ee8689a8084..6d6684437b8 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -90,7 +90,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP : std::vector<TExpr> {}), _algorithm(tnode.sort_node.__isset.algorithm ? tnode.sort_node.algorithm : TSortAlgorithm::FULL_SORT), - _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {} + _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 8462472dd02..0829c38b40f 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -69,8 +69,9 @@ public: } else if (_merge_by_exchange) { // The current sort node is used for the ORDER BY return {ExchangeType::PASSTHROUGH}; + } else { + return {ExchangeType::NOOP}; } - return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution(); } bool require_shuffled_data_distribution() const override { return _is_analytic_sort; } bool require_data_distribution() const override { return _is_colocate; } diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 02a99e183c8..7f801b79c0b 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -30,7 +30,9 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnod const DescriptorTbl& descs) : OperatorX<SortLocalState>(pool, tnode, operator_id, descs), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) {} + _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; +} Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(Base::init(tnode, state)); diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 2d112ebf2df..200e7de8597 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -63,7 +63,9 @@ public: using Base = OperatorX<UnionSourceLocalState>; UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {}; + : Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) { + _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; + } ~UnionSourceOperatorX() override = default; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index d87113ca80a..ff243186c47 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -36,17 +36,17 @@ std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const { } Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets, - const bool should_disable_bucket_shuffle, + const bool use_global_hash_shuffle, const std::map<int, int>& shuffle_idx_to_instance_idx) { _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; if (_type == ExchangeType::HASH_SHUFFLE) { - _use_global_shuffle = should_disable_bucket_shuffle; + _use_global_shuffle = use_global_hash_shuffle; // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be // distributed to all BEs. Otherwise, we should use shuffle idx directly. - if (should_disable_bucket_shuffle) { + if (use_global_hash_shuffle) { std::for_each(shuffle_idx_to_instance_idx.begin(), shuffle_idx_to_instance_idx.end(), [&](const auto& item) { DCHECK(item.first != -1); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 1cd9736d429..09b1f2cc310 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -100,7 +100,7 @@ public: return Status::InternalError("{} should not init with TPlanNode", Base::_name); } - Status init(ExchangeType type, const int num_buckets, const bool should_disable_bucket_shuffle, + Status init(ExchangeType type, const int num_buckets, const bool use_global_hash_shuffle, const std::map<int, int>& shuffle_idx_to_instance_idx) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 6e83c7805e4..5b93fbdf1f8 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -22,6 +22,7 @@ #include <utility> #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_task.h" namespace doris::pipeline { @@ -31,7 +32,47 @@ void Pipeline::_init_profile() { _pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s)); } -Status Pipeline::add_operator(OperatorPtr& op) { +bool Pipeline::need_to_local_exchange(const DataDistribution target_data_distribution, + const int idx) const { + // If serial operator exists after `idx`-th operator, we should not improve parallelism. + if (std::any_of(_operators.begin() + idx, _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + return false; + } + if (std::all_of(_operators.begin(), _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + if (!_sink->is_serial_operator()) { + return true; + } + } else if (std::any_of(_operators.begin(), _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + return true; + } + + if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && + target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { + return true; + } else if (_operators.front()->ignore_data_hash_distribution()) { + if (_data_distribution.distribution_type == target_data_distribution.distribution_type && + (_data_distribution.partition_exprs.empty() || + target_data_distribution.partition_exprs.empty())) { + return true; + } + return _data_distribution.distribution_type != target_data_distribution.distribution_type && + !(is_hash_exchange(_data_distribution.distribution_type) && + is_hash_exchange(target_data_distribution.distribution_type)); + } else { + return _data_distribution.distribution_type != target_data_distribution.distribution_type && + !(is_hash_exchange(_data_distribution.distribution_type) && + is_hash_exchange(target_data_distribution.distribution_type)); + } +} + +Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) { + if (parallelism > 0 && op->is_serial_operator()) { + set_num_tasks(parallelism); + op->set_ignore_data_distribution(); + } op->set_parallel_tasks(num_tasks()); _operators.emplace_back(op); if (op->is_source()) { diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 8a20ccb631c..ef0ae9e9a75 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -44,14 +44,16 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> { public: explicit Pipeline(PipelineId pipeline_id, int num_tasks, - std::weak_ptr<PipelineFragmentContext> context) - : _pipeline_id(pipeline_id), _num_tasks(num_tasks) { + std::weak_ptr<PipelineFragmentContext> context, int num_tasks_of_parent) + : _pipeline_id(pipeline_id), + _num_tasks(num_tasks), + _num_tasks_of_parent(num_tasks_of_parent) { _init_profile(); _tasks.resize(_num_tasks, nullptr); } // Add operators for pipelineX - Status add_operator(OperatorPtr& op); + Status add_operator(OperatorPtr& op, const int parallelism); // prepare operators for pipelineX Status prepare(RuntimeState* state); @@ -71,28 +73,8 @@ public: return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; } - bool need_to_local_exchange(const DataDistribution target_data_distribution) const { - if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && - target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { - return true; - } else if (_operators.front()->ignore_data_hash_distribution()) { - if (_data_distribution.distribution_type == - target_data_distribution.distribution_type && - (_data_distribution.partition_exprs.empty() || - target_data_distribution.partition_exprs.empty())) { - return true; - } - return _data_distribution.distribution_type != - target_data_distribution.distribution_type && - !(is_hash_exchange(_data_distribution.distribution_type) && - is_hash_exchange(target_data_distribution.distribution_type)); - } else { - return _data_distribution.distribution_type != - target_data_distribution.distribution_type && - !(is_hash_exchange(_data_distribution.distribution_type) && - is_hash_exchange(target_data_distribution.distribution_type)); - } - } + bool need_to_local_exchange(const DataDistribution target_data_distribution, + const int idx) const; void init_data_distribution() { set_data_distribution(_operators.front()->required_data_distribution()); } @@ -120,6 +102,14 @@ public: for (auto& op : _operators) { op->set_parallel_tasks(_num_tasks); } + +#ifndef NDEBUG + if (num_tasks > 1 && + std::any_of(_operators.begin(), _operators.end(), + [&](OperatorPtr op) -> bool { return op->is_serial_operator(); })) { + DCHECK(false) << debug_string(); + } +#endif } int num_tasks() const { return _num_tasks; } bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; } @@ -136,6 +126,8 @@ public: return fmt::to_string(debug_string_buffer); } + int num_tasks_of_parent() const { return _num_tasks_of_parent; } + private: void _init_profile(); @@ -173,6 +165,8 @@ private: std::atomic<int> _num_tasks_running = 0; // Tasks in this pipeline. std::vector<PipelineTask*> _tasks; + // Parallelism of parent pipeline. + const int _num_tasks_of_parent; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 28cfefbf6c1..fd3baefa76f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -214,8 +214,9 @@ void PipelineFragmentContext::cancel(const Status reason) { PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { PipelineId id = _next_pipeline_id++; auto pipeline = std::make_shared<Pipeline>( - id, _num_instances, - std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this())); + id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances, + std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()), + parent ? parent->num_tasks() : _num_instances); if (idx >= 0) { _pipelines.insert(_pipelines.begin() + idx, pipeline); } else { @@ -235,6 +236,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re if (request.__isset.query_options && request.query_options.__isset.execution_timeout) { _timeout = request.query_options.execution_timeout; } + _use_serial_source = + request.fragment.__isset.use_serial_source && request.fragment.use_serial_source; _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext"); _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime"); @@ -749,13 +752,12 @@ Status PipelineFragmentContext::_add_local_exchange_impl( const bool followed_by_shuffled_operator = operators.size() > idx ? operators[idx]->followed_by_shuffled_operator() : cur_pipe->sink()->followed_by_shuffled_operator(); - const bool should_disable_bucket_shuffle = + const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_operator; + followed_by_shuffled_operator && !_use_serial_source; sink.reset(new LocalExchangeSinkOperatorX( - sink_id, local_exchange_id, - should_disable_bucket_shuffle ? _total_instances : _num_instances, + sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx)); if (bucket_seq_to_instance_idx.empty() && data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { @@ -763,8 +765,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink()->init(data_distribution.distribution_type, num_buckets, - should_disable_bucket_shuffle, - shuffle_idx_to_instance_idx)); + use_global_hash_shuffle, shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. std::shared_ptr<LocalExchangeSharedState> shared_state = @@ -775,7 +776,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), - should_disable_bucket_shuffle ? _total_instances : _num_instances, + use_global_hash_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? _runtime_state->query_options().local_exchange_free_blocks_limit : 0); @@ -915,11 +916,11 @@ Status PipelineFragmentContext::_add_local_exchange( const std::map<int, int>& bucket_seq_to_instance_idx, const std::map<int, int>& shuffle_idx_to_instance_idx, const bool ignore_data_distribution) { - if (_num_instances <= 1) { + if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) { return Status::OK(); } - if (!cur_pipe->need_to_local_exchange(data_distribution)) { + if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) { return Status::OK(); } *do_local_exchange = true; @@ -1154,7 +1155,8 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline source_op.reset(new MultiCastDataStreamerSourceOperatorX( i, pool, thrift_sink.multi_cast_stream_sink.sinks[i], row_desc, source_id)); - RETURN_IF_ERROR(new_pipeline->add_operator(source_op)); + RETURN_IF_ERROR(new_pipeline->add_operator( + source_op, params.__isset.parallel_instances ? params.parallel_instances : 0)); // 2. create and set sink operator of data stream sender for new pipeline DataSinkOperatorPtr sink_op; @@ -1203,7 +1205,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new OlapScanOperatorX( pool, tnode, next_operator_id(), descs, _num_instances, enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {})); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1216,7 +1219,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _query_ctx->query_mem_tracker->is_group_commit_load = true; #endif op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1226,7 +1230,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case doris::TPlanNodeType::JDBC_SCAN_NODE: { if (config::enable_java_support) { op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } else { return Status::InternalError( "Jdbc scan node is disabled, you can change be config enable_java_support " @@ -1240,7 +1245,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case doris::TPlanNodeType::FILE_SCAN_NODE: { op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1250,7 +1256,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::ES_SCAN_NODE: case TPlanNodeType::ES_HTTP_SCAN_NODE: { op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1261,7 +1268,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo int num_senders = find_with_default(request.per_exch_num_senders, tnode.node_id, 0); DCHECK_GT(num_senders, 0); op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { op->set_ignore_data_distribution(); cur_pipe->set_num_tasks(request.parallel_instances); @@ -1280,7 +1288,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo auto cache_source_id = next_operator_id(); op.reset(new CacheSourceOperatorX(pool, cache_node_id, cache_source_id, request.fragment.query_cache_param)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1315,7 +1324,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _require_bucket_distribution)); op->set_followed_by_shuffled_operator(false); _require_bucket_distribution = true; - RETURN_IF_ERROR(new_pipe->add_operator(op)); + RETURN_IF_ERROR(new_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); cur_pipe = new_pipe; } else { @@ -1324,7 +1334,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && @@ -1335,11 +1346,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); - RETURN_IF_ERROR(new_pipe->add_operator(op)); + RETURN_IF_ERROR(new_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); cur_pipe = new_pipe; } else { op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } } else { // create new pipeline to add query cache operator @@ -1355,10 +1368,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } if (enable_query_cache) { RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); - RETURN_IF_ERROR(new_pipe->add_operator(op)); + RETURN_IF_ERROR(new_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); cur_pipe = new_pipe; } else { - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } const auto downstream_pipeline_id = cur_pipe->id(); @@ -1406,7 +1421,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo pool, tnode_, next_operator_id(), descs, partition_count); probe_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); op = std::move(probe_operator); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1430,7 +1446,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1457,7 +1474,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::CROSS_JOIN_NODE: { op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1480,7 +1498,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); op->set_followed_by_shuffled_operator(_require_bucket_distribution); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1508,7 +1527,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else { op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), descs)); } - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1535,7 +1555,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case doris::TPlanNodeType::PARTITION_SORT_NODE: { op.reset(new PartitionSortSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1553,7 +1574,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::ANALYTIC_EVAL_NODE: { op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -1575,39 +1597,44 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>( - pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request)); op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>( - pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request)); op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: { op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::TABLE_FUNCTION_NODE: { op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::ASSERT_NUM_ROWS_NODE: { op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::EMPTY_SET_NODE: { op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::DATA_GEN_SCAN_NODE: { op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); if (request.__isset.parallel_instances) { cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); @@ -1616,17 +1643,20 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::SCHEMA_SCAN_NODE: { op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::META_SCAN_NODE: { op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } case TPlanNodeType::SELECT_NODE: { op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); break; } default: @@ -1642,9 +1672,11 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo template <bool is_intersect> Status PipelineFragmentContext::_build_operators_for_set_operation_node( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, - PipelinePtr& cur_pipe, int parent_idx, int child_idx) { + PipelinePtr& cur_pipe, int parent_idx, int child_idx, + const doris::TPipelineFragmentParams& request) { op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs)); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 0749729789e..6caa0e5c106 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -121,7 +121,7 @@ public: _tasks[j][i]->stop_if_finished(); } } - }; + } private: Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, @@ -140,7 +140,8 @@ private: Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, - int child_idx); + int child_idx, + const doris::TPipelineFragmentParams& request); Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, const std::vector<TExpr>& output_exprs, @@ -224,6 +225,7 @@ private: int _num_instances = 1; int _timeout = -1; + bool _use_serial_source = false; OperatorPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index 4dca9384d65..55d1b4b50c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -488,6 +488,11 @@ public class AggregationNode extends PlanNode { } } + @Override + public boolean isSerialOperator() { + return aggInfo.getGroupingExprs().isEmpty() && needsFinalize; + } + public void setColocate(boolean colocate) { isColocate = colocate; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java index cdbf827aed9..dce6c3d1b04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java @@ -296,4 +296,9 @@ public class AnalyticEvalNode extends PlanNode { return output.toString(); } + + @Override + public boolean isSerialOperator() { + return partitionExprs.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java index 57d9ce8742f..a4c4aa42c65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java @@ -116,4 +116,9 @@ public class AssertNumRowsNode extends PlanNode { public int getNumInstances() { return 1; } + + @Override + public boolean isSerialOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 9c6ba83408a..ce57a57c377 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -90,6 +90,10 @@ public class DataPartition { return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; } + public boolean isTabletSinkShufflePartition() { + return type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED; + } + public TPartitionType getType() { return type; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java index 867c220d9fe..f6ddf23429e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java @@ -81,4 +81,8 @@ public class EmptySetNode extends PlanNode { return 1; } + @Override + public boolean isSerialOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 4ada9a82f7c..7af09287191 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -195,6 +195,11 @@ public class ExchangeNode extends PlanNode { return prefix + "offset: " + offset + "\n"; } + @Override + public boolean isMerging() { + return mergeInfo != null; + } + public boolean isRightChildOfBroadcastHashJoin() { return isRightChildOfBroadcastHashJoin; } @@ -202,4 +207,9 @@ public class ExchangeNode extends PlanNode { public void setRightChildOfBroadcastHashJoin(boolean value) { isRightChildOfBroadcastHashJoin = value; } + + @Override + public boolean isSerialOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java index 91a3c26e770..5dc81e29d85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java @@ -597,7 +597,6 @@ public abstract class JoinNodeBase extends PlanNode { this.useSpecificProjections = useSpecificProjections; } - public boolean isUseSpecificProjections() { return useSpecificProjections; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 30c0a2d0394..c7b3525e4cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -281,4 +281,21 @@ public class NestedLoopJoinNode extends JoinNodeBase { } return output.toString(); } + + /** + * If joinOp is one of type below: + * 1. NULL_AWARE_LEFT_ANTI_JOIN + * 2. RIGHT_OUTER_JOIN + * 3. RIGHT_ANTI_JOIN + * 4. RIGHT_SEMI_JOIN + * + * We will + * @return + */ + @Override + public boolean isSerialOperator() { + return joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN || joinOp == JoinOperator.RIGHT_OUTER_JOIN + || joinOp == JoinOperator.RIGHT_ANTI_JOIN || joinOp == JoinOperator.RIGHT_SEMI_JOIN + || joinOp == JoinOperator.FULL_OUTER_JOIN; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index ae1d34308a3..3e3c49bf675 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -341,6 +341,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { // TODO chenhao , calculated by cost result.setMinReservationBytes(0); result.setInitialReservationTotalClaims(0); + result.setUseSerialSource(useSerialSource(ConnectContext.get())); return result; } @@ -502,4 +503,38 @@ public class PlanFragment extends TreeNode<PlanFragment> { public boolean hasNullAwareLeftAntiJoin() { return planRoot.isNullAwareLeftAntiJoin(); } + + private boolean isMergingFragment() { + return planRoot.isMerging(); + } + + public boolean useSerialSource(ConnectContext context) { + return context != null + && context.getSessionVariable().isIgnoreStorageDataDistribution() + && !hasNullAwareLeftAntiJoin() + // If input data partition is UNPARTITIONED and sink is DataStreamSink and root node is not a serial + // operator, we use local exchange to improve parallelism + && getDataPartition() == DataPartition.UNPARTITIONED && !children.isEmpty() + && sink instanceof DataStreamSink && !planRoot.isSerialOperator() + /** + * If table `t1` has unique key `k1` and value column `v1`. + * Now use plan below to load data into `t1`: + * ``` + * FRAGMENT 0: + * Merging Exchange (id = 1) + * NL Join (id = 2) + * DataStreamSender (id = 3, dst_id = 3) (TABLET_SINK_SHUFFLE_PARTITIONED) + * + * FRAGMENT 1: + * Exchange (id = 3) + * OlapTableSink (id = 4) ``` + * + * In this plan, `Exchange (id = 1)` needs to do merge sort using column `k1` and `v1` so parallelism + * of FRAGMENT 0 must be 1 and data will be shuffled to FRAGMENT 1 which also has only 1 instance + * because this loading job relies on the global ordering of column `k1` and `v1`. + * + * So FRAGMENT 0 should not use serial source. + */ + && !isMergingFragment(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 1e9d5646939..d1ba493682b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -279,6 +279,10 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { return children.stream().anyMatch(PlanNode::isNullAwareLeftAntiJoin); } + public boolean isMerging() { + return children.stream().anyMatch(PlanNode::isMerging); + } + public PlanFragment getFragment() { return fragment; } @@ -639,6 +643,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { TPlanNode msg = new TPlanNode(); msg.node_id = id.asInt(); msg.setNereidsId(nereidsId); + msg.setIsSerialOperator(isSerialOperator()); msg.num_children = children.size(); msg.limit = limit; for (TupleId tid : tupleIds) { @@ -1374,4 +1379,9 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { return true; }); } + + // Operators need to be executed serially. (e.g. finalized agg without key) + public boolean isSerialOperator() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java index 3c6a88cea08..407d8a6444c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java @@ -200,4 +200,9 @@ public class RepeatNode extends PlanNode { } return output.toString(); } + + @Override + public boolean isSerialOperator() { + return children.get(0).isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index a92cac7b510..1681699d651 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -848,4 +848,9 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { public long getSelectedSplitNum() { return selectedSplitNum; } + + @Override + public boolean isSerialOperator() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java index 6c6b665b00a..b3b088837a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java @@ -109,4 +109,9 @@ public class SelectNode extends PlanNode { } return output.toString(); } + + @Override + public boolean isSerialOperator() { + return children.get(0).isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index e3c405bcbab..fc1c50c0bba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -389,6 +389,11 @@ public class SortNode extends PlanNode { return new HashSet<>(result); } + @Override + public boolean isSerialOperator() { + return !isAnalyticSort && !mergeByexchange; + } + public void setColocate(boolean colocate) { isColocate = colocate; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java index 40982d07e77..bf48a770f1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java @@ -42,4 +42,9 @@ public class UnionNode extends SetOperationNode { protected void toThrift(TPlanNode msg) { toThrift(msg, TPlanNodeType.UNION_NODE); } + + @Override + public boolean isSerialOperator() { + return children.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8e580c549df..4eda6775b5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1779,6 +1779,20 @@ public class Coordinator implements CoordInterface { FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, 0, params); params.instanceExecParams.add(instanceParam); + + // Using serial source means a serial source operator will be used in this fragment (e.g. data will be + // shuffled to only 1 exchange operator) and then splitted by followed local exchanger + int expectedInstanceNum = fragment.getParallelExecNum(); + boolean useSerialSource = fragment.useSerialSource(context) && useNereids + && fragment.queryCacheParam == null; + if (useSerialSource) { + for (int j = 1; j < expectedInstanceNum; j++) { + params.instanceExecParams.add(new FInstanceExecParam( + null, execHostport, 0, params)); + } + params.ignoreDataDistribution = true; + params.parallelTasksNum = 1; + } continue; } @@ -1808,6 +1822,10 @@ public class Coordinator implements CoordInterface { if (leftMostNode.getNumInstances() == 1) { exchangeInstances = 1; } + // Using serial source means a serial source operator will be used in this fragment (e.g. data will be + // shuffled to only 1 exchange operator) and then splitted by followed local exchanger + boolean useSerialSource = fragment.useSerialSource(context) && useNereids + && fragment.queryCacheParam == null; if (exchangeInstances > 0 && fragmentExecParamsMap.get(inputFragmentId) .instanceExecParams.size() > exchangeInstances) { // random select some instance @@ -1825,12 +1843,16 @@ public class Coordinator implements CoordInterface { hosts.get(index % hosts.size()), 0, params); params.instanceExecParams.add(instanceParam); } + params.ignoreDataDistribution = useSerialSource; + params.parallelTasksNum = useSerialSource ? 1 : params.instanceExecParams.size(); } else { for (FInstanceExecParam execParams : fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, 0, params); params.instanceExecParams.add(instanceParam); } + params.ignoreDataDistribution = useSerialSource; + params.parallelTasksNum = useSerialSource ? 1 : params.instanceExecParams.size(); } // When group by cardinality is smaller than number of backend, only some backends always diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5c0273da791..eb5266942c0 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1366,6 +1366,7 @@ struct TPlanNode { 49: optional i64 push_down_count 50: optional list<list<Exprs.TExpr>> distribute_expr_lists + 51: optional bool is_serial_operator // projections is final projections, which means projecting into results and materializing them into the output block. 101: optional list<Exprs.TExpr> projections 102: optional Types.TTupleId output_tuple_id diff --git a/gensrc/thrift/Planner.thrift b/gensrc/thrift/Planner.thrift index 866d8d45320..ffcc33638db 100644 --- a/gensrc/thrift/Planner.thrift +++ b/gensrc/thrift/Planner.thrift @@ -64,6 +64,10 @@ struct TPlanFragment { 8: optional i64 initial_reservation_total_claims 9: optional QueryCache.TQueryCacheParam query_cache_param + + // Using serial source means a serial source operator will be used in this fragment (e.g. data will be shuffled to + // only 1 exchange operator) and then splitted by followed local exchanger + 10: optional bool use_serial_source } // location information for a single scan range diff --git a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy index 2493a7df5de..049cbe0b4d7 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy @@ -177,15 +177,15 @@ suite('complex_insert') { sql 'insert into t1(id, c1, c2, c3) select id, c1 * 2, c2, c3 from t1' sql 'sync' - qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id' + qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, t3.id' sql 'insert into t2(id, c1, c2, c3) select id, c1, c2 * 2, c3 from t2' sql 'sync' - qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id' + qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, t3.id' sql 'insert into t2(c1, c3) select c1 + 1, c3 + 1 from (select id, c1, c3 from t1 order by id, c1 limit 10) t1, t3' sql 'sync' - qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id' + qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, t3.id' sql 'drop table if exists agg_have_dup_base' diff --git a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy index 997230b1a06..950b6171c7c 100644 --- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy +++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy @@ -52,7 +52,7 @@ suite("local_shuffle") { set force_to_local_shuffle=true; """ - order_qt_read_single_olap_table "select * from test_local_shuffle1" + order_qt_read_single_olap_table "select * from test_local_shuffle1 order by id, id2" order_qt_broadcast_join """ select * @@ -96,7 +96,7 @@ suite("local_shuffle") { ) a right outer join [shuffle] test_local_shuffle2 - on a.id=test_local_shuffle2.id2 + on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id, test_local_shuffle2.id2 """ order_qt_bucket_shuffle_with_prune_tablets2 """ @@ -109,7 +109,7 @@ suite("local_shuffle") { from test_local_shuffle1 where id=1 ) a - on a.id=test_local_shuffle2.id2 + on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id, test_local_shuffle2.id2 """ order_qt_bucket_shuffle_with_prune_tablets3 """ @@ -150,11 +150,11 @@ suite("local_shuffle") { """ order_qt_fillup_bucket """ - SELECT cast(a.c0 as int), cast(b.c0 as int) FROM + SELECT cast(a.c0 as int), cast(b.c0 as int) res FROM (select * from test_local_shuffle3 where c0 =1)a RIGHT OUTER JOIN (select * from test_local_shuffle4)b - ON a.c0 = b.c0 + ON a.c0 = b.c0 order by res """ multi_sql """ @@ -182,6 +182,6 @@ suite("local_shuffle") { ) a inner join [shuffle] test_shuffle_left_with_local_shuffle b - on a.id2=b.id; + on a.id2=b.id order by a.id2; """ } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org