This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 637725440bb [Chore](shuffle) adjust some local shuffle rules (#59366)
637725440bb is described below

commit 637725440bbecdbed671bf69bb7ab27227ba2e46
Author: Pxl <[email protected]>
AuthorDate: Mon Dec 29 14:01:54 2025 +0800

    [Chore](shuffle) adjust some local shuffle rules (#59366)
    
    1. make distinct streaming agg always shuffle(Improve parallelism)
    2. make broadcast join probe do not shuffle
    
    This pull request updates the logic for determining required data
    distributions in the aggregation and join pipeline operators. The main
    focus is on improving the handling of exchange types, especially for
    passthrough and broadcast scenarios.
    
    Key changes include:
    
    **Data distribution logic updates:**
    
    * In `DistinctStreamingAggOperatorX`, the method now always returns
    `ExchangeType::PASSTHROUGH` instead of delegating to the base class,
    simplifying the distribution requirement when colocation is not needed.
    * In `HashJoinProbeOperatorX`, the logic for broadcast joins is refined:
    if the child is a serial operator, it returns
    `ExchangeType::PASSTHROUGH`; otherwise, it returns `ExchangeType::NOOP`.
    The handling of bucket shuffle and colocate join distributions is also
    clarified.
---
 .../exec/distinct_streaming_aggregation_operator.h       |  2 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h           | 16 +++++++++-------
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 0dea1413444..edb4ecbe063 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -120,7 +120,7 @@ public:
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         }
-        return 
StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(state);
+        return {ExchangeType::PASSTHROUGH};
     }
 
     bool require_data_distribution() const override { return _is_colocate; }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index e9cb74e5863..a0ad3e56c8b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -133,14 +133,16 @@ public:
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
+        } else if (_is_broadcast_join) {
+            return _child && _child->is_serial_operator()
+                           ? DataDistribution(ExchangeType::PASSTHROUGH)
+                           : DataDistribution(ExchangeType::NOOP);
         }
-        return _is_broadcast_join
-                       ? DataDistribution(ExchangeType::PASSTHROUGH)
-                       : (_join_distribution == 
TJoinDistributionType::BUCKET_SHUFFLE ||
-                                          _join_distribution == 
TJoinDistributionType::COLOCATE
-                                  ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
-                                                     _partition_exprs)
-                                  : 
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
+
+        return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+                                _join_distribution == 
TJoinDistributionType::COLOCATE
+                        ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_partition_exprs)
+                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs));
     }
     bool is_broadcast_join() const { return _is_broadcast_join; }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to