This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7155711431e [cherry-pick](branch-2.1) Improve local shuffle strategy 
(#40030)
7155711431e is described below

commit 7155711431e9fba015f6e459b75290c0926d6636
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Aug 29 14:16:16 2024 +0800

    [cherry-pick](branch-2.1) Improve local shuffle strategy (#40030)
    
    pick #34122 #35454 #35716 #37195
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  8 +++--
 be/src/pipeline/exec/aggregation_sink_operator.h   |  9 ++++--
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  9 ++++--
 be/src/pipeline/exec/analytic_sink_operator.h      |  7 +++--
 be/src/pipeline/exec/datagen_operator.cpp          |  4 +--
 .../distinct_streaming_aggregation_operator.cpp    | 11 ++++---
 .../exec/distinct_streaming_aggregation_operator.h |  7 +++--
 be/src/pipeline/exec/hashjoin_build_sink.h         |  4 +++
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  4 +++
 be/src/pipeline/exec/operator.h                    |  3 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  6 ++--
 .../exec/partitioned_aggregation_sink_operator.h   |  6 +++-
 .../exec/partitioned_hash_join_probe_operator.h    |  3 ++
 .../exec/partitioned_hash_join_sink_operator.h     |  4 +++
 be/src/pipeline/exec/sort_sink_operator.cpp        |  5 ++--
 be/src/pipeline/exec/sort_sink_operator.h          |  6 ++--
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  6 ++--
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  5 +++-
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 34 ++++++++++++++++++----
 .../pipeline_x/pipeline_x_fragment_context.h       |  2 ++
 .../org/apache/doris/planner/DataGenScanNode.java  |  9 ++++++
 .../pipeline/p1/conf/regression-conf.groovy        |  1 +
 .../correctness_p0/test_assert_row_num.groovy      |  2 +-
 .../external_table_p0/tvf/test_numbers.groovy      |  6 ++--
 24 files changed, 121 insertions(+), 40 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 730337561e8..704c256737a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -615,7 +615,7 @@ void AggSinkLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs& p
 }
 
 AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                   const DescriptorTbl& descs)
+                                   const DescriptorTbl& descs, bool 
require_bucket_distribution)
         : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _intermediate_tuple_desc(nullptr),
@@ -628,9 +628,11 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int 
operator_id, const TPla
           _limit(tnode.limit),
           _have_conjuncts((tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()) ||
                           (tnode.__isset.conjuncts && 
!tnode.conjuncts.empty())),
-          _partition_exprs(tnode.__isset.distribute_expr_lists ? 
tnode.distribute_expr_lists[0]
-                                                               : 
std::vector<TExpr> {}),
+          _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
+                                   ? tnode.distribute_expr_lists[0]
+                                   : tnode.agg_node.grouping_exprs),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
+          _require_bucket_distribution(require_bucket_distribution),
           _agg_fn_output_row_descriptor(descs, tnode.row_tuples, 
tnode.nullable_tuples) {}
 
 Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index b3ffa19d6db..3124a3981b4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -143,7 +143,7 @@ protected:
 class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
 public:
     AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
-                     const DescriptorTbl& descs);
+                     const DescriptorTbl& descs, bool 
require_bucket_distribution);
     ~AggSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
@@ -164,9 +164,11 @@ public:
                            ? DataDistribution(ExchangeType::PASSTHROUGH)
                            : 
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
         }
-        return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
-                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+        return _is_colocate && _require_bucket_distribution
+                       ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_partition_exprs)
+                       : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
+    bool require_data_distribution() const override { return _is_colocate; }
     size_t get_revocable_mem_size(RuntimeState* state) const;
 
     vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) {
@@ -213,6 +215,7 @@ protected:
 
     const std::vector<TExpr> _partition_exprs;
     const bool _is_colocate;
+    const bool _require_bucket_distribution;
 
     RowDescriptor _agg_fn_output_row_descriptor;
 };
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index a1d3384edc6..5b4f5cee5cb 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -193,14 +193,17 @@ vectorized::BlockRowPos 
AnalyticSinkLocalState::_get_partition_by_end() {
 }
 
 AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
-                                             const TPlanNode& tnode, const 
DescriptorTbl& descs)
+                                             const TPlanNode& tnode, const 
DescriptorTbl& descs,
+                                             bool require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id),
           _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
                                      ? tnode.analytic_node.buffered_tuple_id
                                      : 0),
           _is_colocate(tnode.analytic_node.__isset.is_colocate && 
tnode.analytic_node.is_colocate),
-          _partition_exprs(tnode.__isset.distribute_expr_lists ? 
tnode.distribute_expr_lists[0]
-                                                               : 
std::vector<TExpr> {}) {}
+          _require_bucket_distribution(require_bucket_distribution),
+          _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
+                                   ? tnode.distribute_expr_lists[0]
+                                   : tnode.analytic_node.partition_exprs) {}
 
 Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 3ae4a7b5cff..d974f68cefa 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -86,7 +86,7 @@ private:
 class AnalyticSinkOperatorX final : public 
DataSinkOperatorX<AnalyticSinkLocalState> {
 public:
     AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& 
tnode,
-                          const DescriptorTbl& descs);
+                          const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      
DataSinkOperatorX<AnalyticSinkLocalState>::_name);
@@ -102,13 +102,15 @@ public:
         if (_partition_by_eq_expr_ctxs.empty()) {
             return {ExchangeType::PASSTHROUGH};
         } else if (_order_by_eq_expr_ctxs.empty()) {
-            return _is_colocate
+            return _is_colocate && _require_bucket_distribution
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         }
         return 
DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
     }
 
+    bool require_data_distribution() const override { return true; }
+
 private:
     Status _insert_range_column(vectorized::Block* block, const 
vectorized::VExprContextSPtr& expr,
                                 vectorized::IColumn* dst_column, size_t 
length);
@@ -125,6 +127,7 @@ private:
 
     std::vector<size_t> _num_agg_input;
     const bool _is_colocate;
+    const bool _require_bucket_distribution;
     const std::vector<TExpr> _partition_exprs;
 };
 
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 4fbe21f71d5..1f84bbf145a 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -97,8 +97,8 @@ Status DataGenLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     // TODO: use runtime filter to filte result block, maybe this node need 
derive from vscan_node.
     for (const auto& filter_desc : p._runtime_filter_descs) {
         IRuntimeFilter* runtime_filter = nullptr;
-        RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, 
false, p.node_id(),
-                                                                
&runtime_filter));
+        RETURN_IF_ERROR(state->register_consumer_runtime_filter(
+                filter_desc, p.ignore_data_distribution(), p.node_id(), 
&runtime_filter));
         runtime_filter->init_profile(_runtime_profile.get());
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index c33b436ba03..16c0df07b49 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -374,7 +374,8 @@ void 
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
 
 DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, 
int operator_id,
                                                              const TPlanNode& 
tnode,
-                                                             const 
DescriptorTbl& descs)
+                                                             const 
DescriptorTbl& descs,
+                                                             bool 
require_bucket_distribution)
         : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, 
operator_id, descs),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _intermediate_tuple_desc(nullptr),
@@ -382,9 +383,11 @@ 
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
           _output_tuple_desc(nullptr),
           _needs_finalize(tnode.agg_node.need_finalize),
           _is_first_phase(tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase),
-          _partition_exprs(tnode.__isset.distribute_expr_lists ? 
tnode.distribute_expr_lists[0]
-                                                               : 
std::vector<TExpr> {}),
-          _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate) {
+          _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
+                                   ? tnode.distribute_expr_lists[0]
+                                   : tnode.agg_node.grouping_exprs),
+          _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
+          _require_bucket_distribution(require_bucket_distribution) {
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
         _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
         if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index ca091f743bd..d0b0d963ead 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -97,7 +97,7 @@ class DistinctStreamingAggOperatorX final
         : public StatefulOperatorX<DistinctStreamingAggLocalState> {
 public:
     DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                  const DescriptorTbl& descs);
+                                  const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override;
@@ -107,13 +107,15 @@ public:
 
     DataDistribution required_data_distribution() const override {
         if (_needs_finalize || (!_probe_expr_ctxs.empty() && 
!_is_streaming_preagg)) {
-            return _is_colocate
+            return _is_colocate && _require_bucket_distribution
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         }
         return 
StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution();
     }
 
+    bool require_data_distribution() const override { return _is_colocate; }
+
 private:
     friend class DistinctStreamingAggLocalState;
     TupleId _intermediate_tuple_id;
@@ -125,6 +127,7 @@ private:
     const bool _is_first_phase;
     const std::vector<TExpr> _partition_exprs;
     const bool _is_colocate;
+    const bool _require_bucket_distribution;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
     std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 2dab03d5a19..d445e2f309c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -167,6 +167,10 @@ public:
     bool is_shuffled_hash_join() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
+    bool require_data_distribution() const override {
+        return _join_distribution == TJoinDistributionType::COLOCATE ||
+               _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
+    }
 
 private:
     friend class HashJoinBuildSinkLocalState;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 5cdfe9feeb7..264f177bcc9 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -178,6 +178,10 @@ public:
     bool is_shuffled_hash_join() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
+    bool require_data_distribution() const override {
+        return _join_distribution == TJoinDistributionType::COLOCATE ||
+               _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
+    }
 
 private:
     Status _do_evaluate(vectorized::Block& block, 
vectorized::VExprContextSPtrs& exprs,
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index c93cc8f592e..7c3fb945a2d 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -252,7 +252,8 @@ public:
 
     virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
 
-    virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); };
+    virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
+    [[nodiscard]] virtual bool require_data_distribution() const { return 
false; }
 
 protected:
     OperatorBuilderBase* _operator_builder = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 053e6dee0cb..9c5c1d6a81c 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -124,9 +124,11 @@ void 
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
 
 PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int 
operator_id,
                                                          const TPlanNode& 
tnode,
-                                                         const DescriptorTbl& 
descs)
+                                                         const DescriptorTbl& 
descs,
+                                                         bool 
require_bucket_distribution)
         : DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, 
tnode.node_id) {
-    _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, 
tnode, descs);
+    _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, 
tnode, descs,
+                                                            
require_bucket_distribution);
 }
 
 Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 016869374bd..d79ba6fd3d4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -289,7 +289,7 @@ public:
 class PartitionedAggSinkOperatorX : public 
DataSinkOperatorX<PartitionedAggSinkLocalState> {
 public:
     PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                const DescriptorTbl& descs);
+                                const DescriptorTbl& descs, bool 
require_bucket_distribution);
     ~PartitionedAggSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
@@ -308,6 +308,10 @@ public:
         return _agg_sink_operator->required_data_distribution();
     }
 
+    bool require_data_distribution() const override {
+        return _agg_sink_operator->require_data_distribution();
+    }
+
     Status set_child(OperatorXPtr child) override {
         
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
         return _agg_sink_operator->set_child(child);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index db20efda67e..b10c514b2f4 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -182,6 +182,9 @@ public:
         _inner_sink_operator = sink_operator;
         _inner_probe_operator = probe_operator;
     }
+    bool require_data_distribution() const override {
+        return _inner_probe_operator->require_data_distribution();
+    }
 
 private:
     Status _revoke_memory(RuntimeState* state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 82fe5eacd94..2fae1f15bfa 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -129,6 +129,10 @@ public:
         _inner_probe_operator = probe_operator;
     }
 
+    bool require_data_distribution() const override {
+        return _inner_probe_operator->require_data_distribution();
+    }
+
 private:
     friend class PartitionedHashJoinSinkLocalState;
 
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index d89e54614d1..61c35427e57 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -75,7 +75,7 @@ Status SortSinkLocalState::open(RuntimeState* state) {
 }
 
 SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                     const DescriptorTbl& descs)
+                                     const DescriptorTbl& descs, bool 
require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id),
           _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
           _pool(pool),
@@ -85,7 +85,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int 
operator_id, const TP
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
           _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
           _merge_by_exchange(tnode.sort_node.merge_by_exchange),
-          _is_colocate(tnode.sort_node.__isset.is_colocate ? 
tnode.sort_node.is_colocate : false),
+          _is_colocate(tnode.sort_node.__isset.is_colocate && 
tnode.sort_node.is_colocate),
+          _require_bucket_distribution(require_bucket_distribution),
           _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort
                                     ? tnode.sort_node.is_analytic_sort
                                     : false),
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index ad9c23401b4..f29d9bbde09 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -74,7 +74,7 @@ private:
 class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
 public:
     SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& 
tnode,
-                      const DescriptorTbl& descs);
+                      const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      
DataSinkOperatorX<SortSinkLocalState>::_name);
@@ -87,7 +87,7 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution() const override {
         if (_is_analytic_sort) {
-            return _is_colocate
+            return _is_colocate && _require_bucket_distribution
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         } else if (_merge_by_exchange) {
@@ -96,6 +96,7 @@ public:
         }
         return 
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
     }
+    bool require_data_distribution() const override { return _is_colocate; }
 
     bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; 
}
 
@@ -128,6 +129,7 @@ private:
     const bool _use_two_phase_read;
     const bool _merge_by_exchange;
     const bool _is_colocate = false;
+    const bool _require_bucket_distribution = false;
     const bool _is_analytic_sort = false;
     const std::vector<TExpr> _partition_exprs;
 };
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index dfda2ff61e1..92cd1f542d8 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -112,9 +112,11 @@ Status 
SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
 }
 
 SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int 
operator_id,
-                                               const TPlanNode& tnode, const 
DescriptorTbl& descs)
+                                               const TPlanNode& tnode, const 
DescriptorTbl& descs,
+                                               bool 
require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id) {
-    _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, 
operator_id, tnode, descs);
+    _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, 
operator_id, tnode, descs,
+                                                              
require_bucket_distribution);
 }
 
 Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 9382edd6933..fae5fe3270f 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -64,7 +64,7 @@ class SpillSortSinkOperatorX final : public 
DataSinkOperatorX<SpillSortSinkLocal
 public:
     using LocalStateType = SpillSortSinkLocalState;
     SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& 
tnode,
-                           const DescriptorTbl& descs);
+                           const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      
DataSinkOperatorX<SpillSortSinkLocalState>::_name);
@@ -78,6 +78,9 @@ public:
     DataDistribution required_data_distribution() const override {
         return _sort_sink_operator->required_data_distribution();
     }
+    bool require_data_distribution() const override {
+        return _sort_sink_operator->require_data_distribution();
+    }
     Status set_child(OperatorXPtr child) override {
         
RETURN_IF_ERROR(DataSinkOperatorX<SpillSortSinkLocalState>::set_child(child));
         return _sort_sink_operator->set_child(child);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7d90cebc8d2..18eb9582a4b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1042,8 +1042,11 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             
request.query_options.__isset.enable_distinct_streaming_aggregation &&
             request.query_options.enable_distinct_streaming_aggregation &&
             !tnode.agg_node.grouping_exprs.empty()) {
-            op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs));
+            op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs,
+                                                       
_require_bucket_distribution));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            _require_bucket_distribution =
+                    _require_bucket_distribution || 
op->require_data_distribution();
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
                    tnode.agg_node.use_streaming_preaggregation &&
                    !tnode.agg_node.grouping_exprs.empty()) {
@@ -1067,14 +1070,18 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             DataSinkOperatorXPtr sink;
             if (_runtime_state->enable_agg_spill() && 
!tnode.agg_node.grouping_exprs.empty()) {
                 sink.reset(new PartitionedAggSinkOperatorX(pool, 
next_sink_operator_id(), tnode,
-                                                           descs));
+                                                           descs, 
_require_bucket_distribution));
             } else {
-                sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs));
+                sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs,
+                                                _require_bucket_distribution));
             }
+            _require_bucket_distribution =
+                    _require_bucket_distribution || 
sink->require_data_distribution();
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
         }
+        _require_bucket_distribution = true;
         break;
     }
     case TPlanNodeType::HASH_JOIN_NODE: {
@@ -1139,6 +1146,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             _pipeline_parent_map.push(op->node_id(), cur_pipe);
             _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         }
+        _require_bucket_distribution =
+                _require_bucket_distribution || 
op->require_data_distribution();
         break;
     }
     case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -1201,10 +1210,14 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
 
         DataSinkOperatorXPtr sink;
         if (_runtime_state->enable_sort_spill()) {
-            sink.reset(new SpillSortSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs));
+            sink.reset(new SpillSortSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs,
+                                                  
_require_bucket_distribution));
         } else {
-            sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs));
+            sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs,
+                                             _require_bucket_distribution));
         }
+        _require_bucket_distribution =
+                _require_bucket_distribution || 
sink->require_data_distribution();
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
         RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -1240,7 +1253,10 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs));
+        sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs,
+                                             _require_bucket_distribution));
+        _require_bucket_distribution =
+                _require_bucket_distribution || 
sink->require_data_distribution();
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
         RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -1279,6 +1295,10 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
     case TPlanNodeType::DATA_GEN_SCAN_NODE: {
         op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        if (request.__isset.parallel_instances) {
+            cur_pipe->set_num_tasks(request.parallel_instances);
+            op->set_ignore_data_distribution();
+        }
         break;
     }
     case TPlanNodeType::SCHEMA_SCAN_NODE: {
@@ -1301,6 +1321,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
                                      print_plan_node_type(tnode.node_type));
     }
 
+    _require_bucket_distribution = true;
+
     return Status::OK();
 }
 // NOLINTEND(readability-function-cognitive-complexity)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 55866400374..14e4b05d7e8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -235,6 +235,8 @@ private:
 
     // Total instance num running on all BEs
     int _total_instances = -1;
+
+    bool _require_bucket_distribution = false;
 };
 
 } // namespace pipeline
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index 14a50160d63..f4e6dc93130 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.NereidsException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalScanNode;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.tablefunction.DataGenTableValuedFunction;
 import org.apache.doris.tablefunction.TableValuedFunctionTask;
@@ -116,6 +117,14 @@ public class DataGenScanNode extends ExternalScanNode {
     // by multi-processes or multi-threads. So we assign instance number to 1.
     @Override
     public int getNumInstances() {
+        if 
(ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
+            return 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        }
+        return 1;
+    }
+
+    @Override
+    public int getScanRangeNum() {
         return 1;
     }
 
diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy 
b/regression-test/pipeline/p1/conf/regression-conf.groovy
index 8f8458e47a6..2a0156e16b4 100644
--- a/regression-test/pipeline/p1/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p1/conf/regression-conf.groovy
@@ -60,6 +60,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // 
keep this line as th
     "test_profile," +
     "test_refresh_mtmv," +
     "test_spark_load," +
+    "test_iot_auto_detect_concurrent," +
     "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line
 
 // this dir will not be executed
diff --git a/regression-test/suites/correctness_p0/test_assert_row_num.groovy 
b/regression-test/suites/correctness_p0/test_assert_row_num.groovy
index 818213f56fe..68e9740a321 100644
--- a/regression-test/suites/correctness_p0/test_assert_row_num.groovy
+++ b/regression-test/suites/correctness_p0/test_assert_row_num.groovy
@@ -21,7 +21,7 @@ suite("test_assert_num_rows") {
     """
 
     qt_sql_2 """
-        SELECT * from numbers("number"="10") WHERE  ( SELECT * FROM (SELECT 3) 
__DORIS_DUAL__ ) IS NOT NULL
+        SELECT * from numbers("number"="10") WHERE  ( SELECT * FROM (SELECT 3) 
__DORIS_DUAL__ ) IS NOT NULL ORDER BY number
     """
     sql """
         DROP TABLE IF EXISTS table_9_undef_undef;
diff --git a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy 
b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy
index 6f0f74f6433..c0f2cafa403 100644
--- a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy
@@ -39,17 +39,17 @@
     order_qt_inner_join1 """
                     select a.number as num1, b.number as num2
                     from numbers("number" = "10") a inner join 
numbers("number" = "10") b 
-                    on a.number=b.number;
+                    on a.number=b.number ORDER BY a.number,b.number;
                   """
     order_qt_inner_join2 """
                     select a.number as num1, b.number as num2
                     from numbers("number" = "6") a inner join numbers("number" 
= "6") b
-                    on a.number>b.number;
+                    on a.number>b.number ORDER BY a.number,b.number;
                   """
     order_qt_inner_join3 """
                     select a.number as num1, b.number as num2
                     from numbers("number" = "10") a inner join 
numbers("number" = "10") b
-                    on a.number=b.number and b.number%2 = 0;
+                    on a.number=b.number and b.number%2 = 0 ORDER BY 
a.number,b.number;
                   """
     order_qt_left_join """
                     select a.number as num1, b.number as num2


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to