This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_rebase in repository https://gitbox.apache.org/repos/asf/doris.git
commit 261abe7dc475f0a838f0c1083ec856b8e5ed47c0 Author: 924060929 <[email protected]> AuthorDate: Mon Jun 1 12:53:15 2026 +0800 [fix](local shuffle) PARTITIONED hash join: requireGlobalExecutionHash instead of requireHash PARTITIONED (shuffle) joins have both sides entering via global hash exchange. The else branch used generic requireHash() which resolved to LOCAL_EXECUTION_HASH_SHUFFLE when a new exchange was needed. LOCAL hash uses per-BE instance count as modulus, incompatible with the global exchange's total-instance modulus on the other side, causing join mismatches and missing rows (DORIS-26101). Fix: use requireGlobalExecutionHash() so any inserted exchange matches the cross-fragment global exchange instance mapping. --- .../org/apache/doris/planner/HashJoinNode.java | 14 +- .../planner/LocalShuffleNodeCoverageTest.java | 25 ++- .../test_local_shuffle_cross_join_hash_join.groovy | 230 +++++++++++++++++++++ 3 files changed, 258 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index ae01b515cab..a737e6106b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -344,13 +344,13 @@ public class HashJoinNode extends JoinNodeBase { LocalExchangeTypeRequire.requireBucketHash(), translatorContext, this, children.get(0)); } else { - // Use requireHash() (not requireGlobalExecutionHash()) so that resolveExchangeType() - // can downgrade to LOCAL_EXECUTION_HASH_SHUFFLE via shouldUseLocalExecutionHash(). - // This matches BE-native behavior where use_serial_exchange=true sets _use_serial_source=true, - // causing _add_local_exchange_impl to use LOCAL (not GLOBAL) hash shuffle. - // With use_serial_exchange=false, the upstream ExchangeNode already outputs - // GLOBAL_EXECUTION_HASH_SHUFFLE which satisfies requireHash() — no new exchange inserted. - buildSideRequire = probeSideRequire = LocalExchangeTypeRequire.requireHash(); + // PARTITIONED (shuffle) join: both sides enter via global hash exchange. + // Require GLOBAL specifically so that any inserted exchange uses the same + // instance mapping as the cross-fragment exchange. LOCAL hash has a different + // modulus (per-BE instance count vs total instance count) and would cause + // join mismatches (DORIS-26101). + buildSideRequire = probeSideRequire + = LocalExchangeTypeRequire.requireGlobalExecutionHash(); outputType = null; // derived from probeResult.second below } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java index 4abd9286384..dbf9d1cda81 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java @@ -229,10 +229,27 @@ public class LocalShuffleNodeCoverageTest { hashJoin.setDistributionMode(DistributionMode.PARTITIONED); Pair<PlanNode, LocalExchangeType> hashOutput = hashJoin.enforceAndDeriveLocalExchange( ctx, null, LocalExchangeTypeRequire.requireHash()); - // enforceRequire resolves RequireHash to LOCAL_EXECUTION_HASH_SHUFFLE (FE-planned always uses LOCAL) - Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, hashOutput.second); - assertChildLocalExchangeType(hashJoin, 0, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); - assertChildLocalExchangeType(hashJoin, 1, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + // PARTITIONED join requires GLOBAL hash to match cross-fragment exchange (DORIS-26101) + Assertions.assertEquals(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE, hashOutput.second); + assertChildLocalExchangeType(hashJoin, 0, LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE); + assertChildLocalExchangeType(hashJoin, 1, LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE); + + // DORIS-26101: PARTITIONED join with probe child already providing GLOBAL hash + // (e.g. upstream ExchangeNode) should satisfy requireGlobalExecutionHash without + // inserting a new exchange. + TrackingPlanNode probeGlobal = new TrackingPlanNode(nextPlanNodeId(), + LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE); + TrackingPlanNode buildGlobal = new TrackingPlanNode(nextPlanNodeId(), + LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE); + HashJoinNode partitionedSatisfied = new HashJoinNode(nextPlanNodeId(), probeGlobal, buildGlobal, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + partitionedSatisfied.setDistributionMode(DistributionMode.PARTITIONED); + Pair<PlanNode, LocalExchangeType> satisfiedOutput = partitionedSatisfied.enforceAndDeriveLocalExchange( + ctx, null, LocalExchangeTypeRequire.requireHash()); + Assertions.assertEquals(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE, satisfiedOutput.second); + Assertions.assertSame(probeGlobal, partitionedSatisfied.getChild(0), + "no exchange should be inserted when child already provides GLOBAL hash"); + Assertions.assertSame(buildGlobal, partitionedSatisfied.getChild(1)); TrackingPlanNode probe3 = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); TrackingPlanNode build3 = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_cross_join_hash_join.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_cross_join_hash_join.groovy new file mode 100644 index 00000000000..93e91355ef4 --- /dev/null +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_cross_join_hash_join.groovy @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/** + * DORIS-26101: FE local shuffle planner returns wrong result for + * aggregate → CROSS/NLJ → downstream partitioned hash join. + * + * Root cause: PARTITIONED hash join used generic requireHash() which + * resolved to LOCAL_EXECUTION_HASH_SHUFFLE. LOCAL hash has a different + * modulus (per-BE instance count) than the GLOBAL hash exchange on + * the other side of the join, causing instance mapping mismatch and + * missing rows. + * + * Fix: PARTITIONED join uses requireGlobalExecutionHash() so any + * inserted exchange matches the cross-fragment global exchange. + */ +suite("test_local_shuffle_cross_join_hash_join") { + + sql "DROP TABLE IF EXISTS ls_cross_a" + sql "DROP TABLE IF EXISTS ls_cross_dim" + + sql """ + CREATE TABLE ls_cross_a ( + id INT, + g INT, + v INT + ) ENGINE=OLAP DUPLICATE KEY(id, g) + DISTRIBUTED BY HASH(id) BUCKETS 13 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + CREATE TABLE ls_cross_dim ( + id INT, + g INT, + w INT + ) ENGINE=OLAP DUPLICATE KEY(id, g) + DISTRIBUTED BY HASH(g) BUCKETS 17 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + INSERT INTO ls_cross_a + SELECT CAST(number AS INT) id, CAST(number AS INT) g, CAST(number * 10 + 1 AS INT) v + FROM numbers("number" = "23") + """ + + sql """ + INSERT INTO ls_cross_dim + SELECT CAST(number AS INT) id, CAST(number % 23 AS INT) g, CAST(1000 + number AS INT) w + FROM numbers("number" = "713") + """ + + def commonHints = """/*+SET_VAR( + enable_sql_cache=false, + disable_join_reorder=true, + enable_local_exchange_before_agg=false, + experimental_force_to_local_shuffle=true, + experimental_enable_parallel_scan=false, + enable_runtime_filter_prune=false, + enable_runtime_filter_partition_prune=false, + runtime_filter_type='IN,MIN_MAX', + parallel_pipeline_task_num=8, + parallel_exchange_instance_num=8, + query_timeout=600, + prefer_join_method=shuffle + )*/""" + + def queryBody = """ + SELECT x.g, COUNT(*) c, SUM(d.w) sw + FROM ( + SELECT a.g, one.v + FROM ( + SELECT g, SUM(v) sv + FROM ls_cross_a + GROUP BY g + ) a + CROSS JOIN (SELECT 1 v) one + ) x + JOIN [shuffle] ls_cross_dim d ON x.g = d.g + GROUP BY x.g + ORDER BY x.g + """ + + // Baseline: local shuffle OFF + def baseline = sql """ + ${commonHints} + SELECT ${commonHints.contains('x') ? '' : ''} + /*+SET_VAR(enable_local_shuffle=false, enable_local_shuffle_planner=false)*/ + ${queryBody} + """.toString().replace('SELECT /*+SET_VAR', 'SELECT /*+SET_VAR') + + // Actually run them properly: + baseline = sql """ + SELECT /*+SET_VAR( + enable_sql_cache=false, + disable_join_reorder=true, + enable_local_exchange_before_agg=false, + experimental_force_to_local_shuffle=true, + experimental_enable_parallel_scan=false, + enable_runtime_filter_prune=false, + enable_runtime_filter_partition_prune=false, + runtime_filter_type='IN,MIN_MAX', + parallel_pipeline_task_num=8, + parallel_exchange_instance_num=8, + query_timeout=600, + prefer_join_method=shuffle, + enable_local_shuffle=false, + enable_local_shuffle_planner=false + )*/ x.g, COUNT(*) c, SUM(d.w) sw + FROM ( + SELECT a.g, one.v + FROM ( + SELECT g, SUM(v) sv FROM ls_cross_a GROUP BY g + ) a + CROSS JOIN (SELECT 1 v) one + ) x + JOIN [shuffle] ls_cross_dim d ON x.g = d.g + GROUP BY x.g + ORDER BY x.g + """ + + // FE local shuffle planner + def feResult = sql """ + SELECT /*+SET_VAR( + enable_sql_cache=false, + disable_join_reorder=true, + enable_local_exchange_before_agg=false, + experimental_force_to_local_shuffle=true, + experimental_enable_parallel_scan=false, + enable_runtime_filter_prune=false, + enable_runtime_filter_partition_prune=false, + runtime_filter_type='IN,MIN_MAX', + parallel_pipeline_task_num=8, + parallel_exchange_instance_num=8, + query_timeout=600, + prefer_join_method=shuffle, + enable_local_shuffle=true, + enable_local_shuffle_planner=true + )*/ x.g, COUNT(*) c, SUM(d.w) sw + FROM ( + SELECT a.g, one.v + FROM ( + SELECT g, SUM(v) sv FROM ls_cross_a GROUP BY g + ) a + CROSS JOIN (SELECT 1 v) one + ) x + JOIN [shuffle] ls_cross_dim d ON x.g = d.g + GROUP BY x.g + ORDER BY x.g + """ + + assertEquals(23, baseline.size(), "baseline should return 23 rows") + assertEquals(baseline, feResult, + "DORIS-26101: FE local shuffle planner should match baseline for aggregate->CROSS JOIN->shuffle join") + + // Also test with NLJ (non-cross, inner join with always-true condition) + def nljBaseline = sql """ + SELECT /*+SET_VAR( + enable_sql_cache=false, + disable_join_reorder=true, + enable_local_exchange_before_agg=false, + experimental_force_to_local_shuffle=true, + experimental_enable_parallel_scan=false, + enable_runtime_filter_prune=false, + enable_runtime_filter_partition_prune=false, + runtime_filter_type='IN,MIN_MAX', + parallel_pipeline_task_num=8, + parallel_exchange_instance_num=8, + query_timeout=600, + prefer_join_method=shuffle, + enable_local_shuffle=false, + enable_local_shuffle_planner=false + )*/ x.g, COUNT(*) c, SUM(d.w) sw + FROM ( + SELECT a.g + FROM ( + SELECT g, SUM(v) sv FROM ls_cross_a GROUP BY g + ) a, (SELECT 1 v) one + ) x + JOIN [shuffle] ls_cross_dim d ON x.g = d.g + GROUP BY x.g + ORDER BY x.g + """ + + def nljFeResult = sql """ + SELECT /*+SET_VAR( + enable_sql_cache=false, + disable_join_reorder=true, + enable_local_exchange_before_agg=false, + experimental_force_to_local_shuffle=true, + experimental_enable_parallel_scan=false, + enable_runtime_filter_prune=false, + enable_runtime_filter_partition_prune=false, + runtime_filter_type='IN,MIN_MAX', + parallel_pipeline_task_num=8, + parallel_exchange_instance_num=8, + query_timeout=600, + prefer_join_method=shuffle, + enable_local_shuffle=true, + enable_local_shuffle_planner=true + )*/ x.g, COUNT(*) c, SUM(d.w) sw + FROM ( + SELECT a.g + FROM ( + SELECT g, SUM(v) sv FROM ls_cross_a GROUP BY g + ) a, (SELECT 1 v) one + ) x + JOIN [shuffle] ls_cross_dim d ON x.g = d.g + GROUP BY x.g + ORDER BY x.g + """ + + assertEquals(nljBaseline, nljFeResult, + "DORIS-26101: NLJ variant should also match baseline") +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
