This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/fe_local_shuffle by this push:
new 298e0eb8c4f [fix](local shuffle) use requireHash() instead of
requireGlobalExecutionHash() in HashJoinNode and PartitionSortNode
298e0eb8c4f is described below
commit 298e0eb8c4fc46cfc7803ad4669cd132ed542431
Author: 924060929 <[email protected]>
AuthorDate: Wed Apr 1 01:11:45 2026 +0800
[fix](local shuffle) use requireHash() instead of
requireGlobalExecutionHash() in HashJoinNode and PartitionSortNode
HashJoinNode and PartitionSortNode were using requireGlobalExecutionHash()
which bypasses
shouldUseLocalExecutionHash() and directly creates
GLOBAL_EXECUTION_HASH_SHUFFLE local
exchanges. With use_serial_exchange=true, the computeDestIdToInstanceId map
has only 1
entry per BE (serial exchange reduces destinations), but
GLOBAL_HASH_SHUFFLE needs entries
for all instances. This causes rows to be lost with "Rows mismatched" error.
Fix: use requireHash() which goes through resolveExchangeType() and gets
downgraded to
LOCAL_EXECUTION_HASH_SHUFFLE by shouldUseLocalExecutionHash(). LOCAL hash
partitions by
local instance count without needing the external shuffle map.
---
.../src/main/java/org/apache/doris/planner/HashJoinNode.java | 10 ++++++++--
.../main/java/org/apache/doris/planner/PartitionSortNode.java | 10 +++++++---
2 files changed, 15 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 1b90770f539..b65d8355888 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -327,8 +327,14 @@ public class HashJoinNode extends JoinNodeBase {
LocalExchangeTypeRequire.requireBucketHash(),
translatorContext, this,
children.get(0));
} else {
- buildSideRequire = probeSideRequire =
LocalExchangeTypeRequire.requireGlobalExecutionHash();
- outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+ // Use requireHash() (not requireGlobalExecutionHash()) so that
resolveExchangeType()
+ // can downgrade to LOCAL_EXECUTION_HASH_SHUFFLE via
shouldUseLocalExecutionHash().
+ // This matches BE-native behavior where use_serial_exchange=true
sets _use_serial_source=true,
+ // causing _add_local_exchange_impl to use LOCAL (not GLOBAL) hash
shuffle.
+ // With use_serial_exchange=false, the upstream ExchangeNode
already outputs
+ // GLOBAL_EXECUTION_HASH_SHUFFLE which satisfies requireHash() —
no new exchange inserted.
+ buildSideRequire = probeSideRequire =
LocalExchangeTypeRequire.requireHash();
+ outputType = null; // derived from probeResult.second below
}
Pair<PlanNode, LocalExchangeType> probeResult = enforceChildExchange(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
index e6bc8dee60b..8eaaaf60384 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
@@ -180,8 +180,12 @@ public class PartitionSortNode extends PlanNode {
LocalExchangeTypeRequire requireChild;
LocalExchangeType outputType;
if (phase == PartitionTopnPhase.TWO_PHASE_GLOBAL_PTOPN) {
- requireChild =
LocalExchangeTypeRequire.requireGlobalExecutionHash();
- outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+ // Use requireHash() so resolveExchangeType() can downgrade to
LOCAL_EXECUTION_HASH_SHUFFLE,
+ // matching BE-native behavior where _use_serial_source=true
causes LOCAL (not GLOBAL) hash.
+ // Output type is derived from the child's actual output (may be
LOCAL or GLOBAL depending
+ // on whether a new exchange was inserted or the existing upstream
exchange already satisfied).
+ requireChild = LocalExchangeTypeRequire.requireHash();
+ outputType = null;
} else {
requireChild = LocalExchangeTypeRequire.requirePassthrough();
outputType = LocalExchangeType.PASSTHROUGH;
@@ -189,7 +193,7 @@ public class PartitionSortNode extends PlanNode {
Pair<PlanNode, LocalExchangeType> enforceResult
= enforceChild(translatorContext, requireChild,
children.get(0));
this.children = Lists.newArrayList(enforceResult.first);
- return Pair.of(this, outputType);
+ return Pair.of(this, outputType != null ? outputType :
enforceResult.second);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]