This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/fe_local_shuffle by this push:
     new 298e0eb8c4f [fix](local shuffle) use requireHash() instead of 
requireGlobalExecutionHash() in HashJoinNode and PartitionSortNode
298e0eb8c4f is described below

commit 298e0eb8c4fc46cfc7803ad4669cd132ed542431
Author: 924060929 <[email protected]>
AuthorDate: Wed Apr 1 01:11:45 2026 +0800

    [fix](local shuffle) use requireHash() instead of 
requireGlobalExecutionHash() in HashJoinNode and PartitionSortNode
    
    HashJoinNode and PartitionSortNode were using requireGlobalExecutionHash() 
which bypasses
    shouldUseLocalExecutionHash() and directly creates 
GLOBAL_EXECUTION_HASH_SHUFFLE local
    exchanges. With use_serial_exchange=true, the computeDestIdToInstanceId map 
has only 1
    entry per BE (serial exchange reduces destinations), but 
GLOBAL_HASH_SHUFFLE needs entries
    for all instances. This causes rows to be lost with "Rows mismatched" error.
    
    Fix: use requireHash() which goes through resolveExchangeType() and gets 
downgraded to
    LOCAL_EXECUTION_HASH_SHUFFLE by shouldUseLocalExecutionHash(). LOCAL hash 
partitions by
    local instance count without needing the external shuffle map.
---
 .../src/main/java/org/apache/doris/planner/HashJoinNode.java   | 10 ++++++++--
 .../main/java/org/apache/doris/planner/PartitionSortNode.java  | 10 +++++++---
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 1b90770f539..b65d8355888 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -327,8 +327,14 @@ public class HashJoinNode extends JoinNodeBase {
                     LocalExchangeTypeRequire.requireBucketHash(), 
translatorContext, this,
                     children.get(0));
         } else {
-            buildSideRequire = probeSideRequire = 
LocalExchangeTypeRequire.requireGlobalExecutionHash();
-            outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+            // Use requireHash() (not requireGlobalExecutionHash()) so that 
resolveExchangeType()
+            // can downgrade to LOCAL_EXECUTION_HASH_SHUFFLE via 
shouldUseLocalExecutionHash().
+            // This matches BE-native behavior where use_serial_exchange=true 
sets _use_serial_source=true,
+            // causing _add_local_exchange_impl to use LOCAL (not GLOBAL) hash 
shuffle.
+            // With use_serial_exchange=false, the upstream ExchangeNode 
already outputs
+            // GLOBAL_EXECUTION_HASH_SHUFFLE which satisfies requireHash() — 
no new exchange inserted.
+            buildSideRequire = probeSideRequire = 
LocalExchangeTypeRequire.requireHash();
+            outputType = null; // derived from probeResult.second below
         }
 
         Pair<PlanNode, LocalExchangeType> probeResult = enforceChildExchange(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
index e6bc8dee60b..8eaaaf60384 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
@@ -180,8 +180,12 @@ public class PartitionSortNode extends PlanNode {
         LocalExchangeTypeRequire requireChild;
         LocalExchangeType outputType;
         if (phase == PartitionTopnPhase.TWO_PHASE_GLOBAL_PTOPN) {
-            requireChild = 
LocalExchangeTypeRequire.requireGlobalExecutionHash();
-            outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+            // Use requireHash() so resolveExchangeType() can downgrade to 
LOCAL_EXECUTION_HASH_SHUFFLE,
+            // matching BE-native behavior where _use_serial_source=true 
causes LOCAL (not GLOBAL) hash.
+            // Output type is derived from the child's actual output (may be 
LOCAL or GLOBAL depending
+            // on whether a new exchange was inserted or the existing upstream 
exchange already satisfied).
+            requireChild = LocalExchangeTypeRequire.requireHash();
+            outputType = null;
         } else {
             requireChild = LocalExchangeTypeRequire.requirePassthrough();
             outputType = LocalExchangeType.PASSTHROUGH;
@@ -189,7 +193,7 @@ public class PartitionSortNode extends PlanNode {
         Pair<PlanNode, LocalExchangeType> enforceResult
                 = enforceChild(translatorContext, requireChild, 
children.get(0));
         this.children = Lists.newArrayList(enforceResult.first);
-        return Pair.of(this, outputType);
+        return Pair.of(this, outputType != null ? outputType : 
enforceResult.second);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to