This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 7155711431e [cherry-pick](branch-2.1) Improve local shuffle strategy (#40030) 7155711431e is described below commit 7155711431e9fba015f6e459b75290c0926d6636 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Aug 29 14:16:16 2024 +0800 [cherry-pick](branch-2.1) Improve local shuffle strategy (#40030) pick #34122 #35454 #35716 #37195 --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 8 +++-- be/src/pipeline/exec/aggregation_sink_operator.h | 9 ++++-- be/src/pipeline/exec/analytic_sink_operator.cpp | 9 ++++-- be/src/pipeline/exec/analytic_sink_operator.h | 7 +++-- be/src/pipeline/exec/datagen_operator.cpp | 4 +-- .../distinct_streaming_aggregation_operator.cpp | 11 ++++--- .../exec/distinct_streaming_aggregation_operator.h | 7 +++-- be/src/pipeline/exec/hashjoin_build_sink.h | 4 +++ be/src/pipeline/exec/hashjoin_probe_operator.h | 4 +++ be/src/pipeline/exec/operator.h | 3 +- .../exec/partitioned_aggregation_sink_operator.cpp | 6 ++-- .../exec/partitioned_aggregation_sink_operator.h | 6 +++- .../exec/partitioned_hash_join_probe_operator.h | 3 ++ .../exec/partitioned_hash_join_sink_operator.h | 4 +++ be/src/pipeline/exec/sort_sink_operator.cpp | 5 ++-- be/src/pipeline/exec/sort_sink_operator.h | 6 ++-- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 ++-- be/src/pipeline/exec/spill_sort_sink_operator.h | 5 +++- .../pipeline_x/pipeline_x_fragment_context.cpp | 34 ++++++++++++++++++---- .../pipeline_x/pipeline_x_fragment_context.h | 2 ++ .../org/apache/doris/planner/DataGenScanNode.java | 9 ++++++ .../pipeline/p1/conf/regression-conf.groovy | 1 + .../correctness_p0/test_assert_row_num.groovy | 2 +- .../external_table_p0/tvf/test_numbers.groovy | 6 ++-- 24 files changed, 121 insertions(+), 40 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 730337561e8..704c256737a 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -615,7 +615,7 @@ void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& p } AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -628,9 +628,11 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector<TExpr> {}), + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index b3ffa19d6db..3124a3981b4 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -143,7 +143,7 @@ protected: class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> { public: AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~AggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", @@ -164,9 +164,11 @@ public: ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(); } - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate && _require_bucket_distribution + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } + bool require_data_distribution() const override { return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) { @@ -213,6 +215,7 @@ protected: const std::vector<TExpr> _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; RowDescriptor _agg_fn_output_row_descriptor; }; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index a1d3384edc6..5b4f5cee5cb 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -193,14 +193,17 @@ vectorized::BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() { } AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector<TExpr> {}) {} + _require_bucket_distribution(require_bucket_distribution), + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.analytic_node.partition_exprs) {} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 3ae4a7b5cff..d974f68cefa 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -86,7 +86,7 @@ private: class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> { public: AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX<AnalyticSinkLocalState>::_name); @@ -102,13 +102,15 @@ public: if (_partition_by_eq_expr_ctxs.empty()) { return {ExchangeType::PASSTHROUGH}; } else if (_order_by_eq_expr_ctxs.empty()) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } return DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution(); } + bool require_data_distribution() const override { return true; } + private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length); @@ -125,6 +127,7 @@ private: std::vector<size_t> _num_agg_input; const bool _is_colocate; + const bool _require_bucket_distribution; const std::vector<TExpr> _partition_exprs; }; diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 4fbe21f71d5..1f84bbf145a 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -97,8 +97,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, false, p.node_id(), - &runtime_filter)); + RETURN_IF_ERROR(state->register_consumer_runtime_filter( + filter_desc, p.ignore_data_distribution(), p.node_id(), &runtime_filter)); runtime_filter->init_profile(_runtime_profile.get()); } return Status::OK(); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index c33b436ba03..16c0df07b49 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -374,7 +374,8 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct( DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, operator_id, descs), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -382,9 +383,11 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _output_tuple_desc(nullptr), _needs_finalize(tnode.agg_node.need_finalize), _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector<TExpr> {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) { + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index ca091f743bd..d0b0d963ead 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -97,7 +97,7 @@ class DistinctStreamingAggOperatorX final : public StatefulOperatorX<DistinctStreamingAggLocalState> { public: DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; @@ -107,13 +107,15 @@ public: DataDistribution required_data_distribution() const override { if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } + private: friend class DistinctStreamingAggLocalState; TupleId _intermediate_tuple_id; @@ -125,6 +127,7 @@ private: const bool _is_first_phase; const std::vector<TExpr> _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 2dab03d5a19..d445e2f309c 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -167,6 +167,10 @@ public: bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } + bool require_data_distribution() const override { + return _join_distribution == TJoinDistributionType::COLOCATE || + _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; + } private: friend class HashJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 5cdfe9feeb7..264f177bcc9 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -178,6 +178,10 @@ public: bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } + bool require_data_distribution() const override { + return _join_distribution == TJoinDistributionType::COLOCATE || + _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; + } private: Status _do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs, diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index c93cc8f592e..7c3fb945a2d 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -252,7 +252,8 @@ public: virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; } - virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }; + virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); } + [[nodiscard]] virtual bool require_data_distribution() const { return false; } protected: OperatorBuilderBase* _operator_builder = nullptr; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 053e6dee0cb..9c5c1d6a81c 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -124,9 +124,11 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, tnode.node_id) { - _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, tnode, descs); + _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 016869374bd..d79ba6fd3d4 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -289,7 +289,7 @@ public: class PartitionedAggSinkOperatorX : public DataSinkOperatorX<PartitionedAggSinkLocalState> { public: PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~PartitionedAggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", @@ -308,6 +308,10 @@ public: return _agg_sink_operator->required_data_distribution(); } + bool require_data_distribution() const override { + return _agg_sink_operator->require_data_distribution(); + } + Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child)); return _agg_sink_operator->set_child(child); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index db20efda67e..b10c514b2f4 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -182,6 +182,9 @@ public: _inner_sink_operator = sink_operator; _inner_probe_operator = probe_operator; } + bool require_data_distribution() const override { + return _inner_probe_operator->require_data_distribution(); + } private: Status _revoke_memory(RuntimeState* state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 82fe5eacd94..2fae1f15bfa 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -129,6 +129,10 @@ public: _inner_probe_operator = probe_operator; } + bool require_data_distribution() const override { + return _inner_probe_operator->require_data_distribution(); + } + private: friend class PartitionedHashJoinSinkLocalState; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index d89e54614d1..61c35427e57 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -75,7 +75,7 @@ Status SortSinkLocalState::open(RuntimeState* state) { } SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), _pool(pool), @@ -85,7 +85,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _is_colocate(tnode.sort_node.__isset.is_colocate ? tnode.sort_node.is_colocate : false), + _is_colocate(tnode.sort_node.__isset.is_colocate && tnode.sort_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort ? tnode.sort_node.is_analytic_sort : false), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index ad9c23401b4..f29d9bbde09 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -74,7 +74,7 @@ private: class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> { public: SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX<SortSinkLocalState>::_name); @@ -87,7 +87,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { @@ -96,6 +96,7 @@ public: } return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; } @@ -128,6 +129,7 @@ private: const bool _use_two_phase_read; const bool _merge_by_exchange; const bool _is_colocate = false; + const bool _require_bucket_distribution = false; const bool _is_analytic_sort = false; const std::vector<TExpr> _partition_exprs; }; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index dfda2ff61e1..92cd1f542d8 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -112,9 +112,11 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { } SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id) { - _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs); + _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 9382edd6933..fae5fe3270f 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -64,7 +64,7 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorX<SpillSortSinkLocal public: using LocalStateType = SpillSortSinkLocalState; SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX<SpillSortSinkLocalState>::_name); @@ -78,6 +78,9 @@ public: DataDistribution required_data_distribution() const override { return _sort_sink_operator->required_data_distribution(); } + bool require_data_distribution() const override { + return _sort_sink_operator->require_data_distribution(); + } Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX<SpillSortSinkLocalState>::set_child(child)); return _sort_sink_operator->set_child(child); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7d90cebc8d2..18eb9582a4b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1042,8 +1042,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && !tnode.agg_node.grouping_exprs.empty()) { - op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); + op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, + _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && !tnode.agg_node.grouping_exprs.empty()) { @@ -1067,14 +1070,18 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) { sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(), tnode, - descs)); + descs, _require_bucket_distribution)); } else { - sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); } + _require_bucket_distribution = true; break; } case TPlanNodeType::HASH_JOIN_NODE: { @@ -1139,6 +1146,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); } + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -1201,10 +1210,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; if (_runtime_state->enable_sort_spill()) { - sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } else { - sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1240,7 +1253,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1279,6 +1295,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::DATA_GEN_SCAN_NODE: { op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + op->set_ignore_data_distribution(); + } break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { @@ -1301,6 +1321,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN print_plan_node_type(tnode.node_type)); } + _require_bucket_distribution = true; + return Status::OK(); } // NOLINTEND(readability-function-cognitive-complexity) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 55866400374..14e4b05d7e8 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -235,6 +235,8 @@ private: // Total instance num running on all BEs int _total_instances = -1; + + bool _require_bucket_distribution = false; }; } // namespace pipeline diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java index 14a50160d63..f4e6dc93130 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.NereidsException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.tablefunction.DataGenTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionTask; @@ -116,6 +117,14 @@ public class DataGenScanNode extends ExternalScanNode { // by multi-processes or multi-threads. So we assign instance number to 1. @Override public int getNumInstances() { + if (ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) { + return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + } + return 1; + } + + @Override + public int getScanRangeNum() { return 1; } diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy b/regression-test/pipeline/p1/conf/regression-conf.groovy index 8f8458e47a6..2a0156e16b4 100644 --- a/regression-test/pipeline/p1/conf/regression-conf.groovy +++ b/regression-test/pipeline/p1/conf/regression-conf.groovy @@ -60,6 +60,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th "test_profile," + "test_refresh_mtmv," + "test_spark_load," + + "test_iot_auto_detect_concurrent," + "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line // this dir will not be executed diff --git a/regression-test/suites/correctness_p0/test_assert_row_num.groovy b/regression-test/suites/correctness_p0/test_assert_row_num.groovy index 818213f56fe..68e9740a321 100644 --- a/regression-test/suites/correctness_p0/test_assert_row_num.groovy +++ b/regression-test/suites/correctness_p0/test_assert_row_num.groovy @@ -21,7 +21,7 @@ suite("test_assert_num_rows") { """ qt_sql_2 """ - SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL + SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL ORDER BY number """ sql """ DROP TABLE IF EXISTS table_9_undef_undef; diff --git a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy index 6f0f74f6433..c0f2cafa403 100644 --- a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy @@ -39,17 +39,17 @@ order_qt_inner_join1 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number; + on a.number=b.number ORDER BY a.number,b.number; """ order_qt_inner_join2 """ select a.number as num1, b.number as num2 from numbers("number" = "6") a inner join numbers("number" = "6") b - on a.number>b.number; + on a.number>b.number ORDER BY a.number,b.number; """ order_qt_inner_join3 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number and b.number%2 = 0; + on a.number=b.number and b.number%2 = 0 ORDER BY a.number,b.number; """ order_qt_left_join """ select a.number as num1, b.number as num2 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org