This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/fe_local_shuffle by this push:
new 31321c3bf48 [fix](local shuffle) fix DistinctStreamingAgg hash
requirement and add RQG regression tests
31321c3bf48 is described below
commit 31321c3bf48b08a6ac1c3d28409fae5527a58ff3
Author: 924060929 <[email protected]>
AuthorDate: Wed Apr 1 13:46:53 2026 +0800
[fix](local shuffle) fix DistinctStreamingAgg hash requirement and add RQG
regression tests
1. AggregationNode: fix canUseDistinctStreamingAgg branch condition from
size() > 1 to !isEmpty(), matching BE's DistinctStreamingAggOperatorX
which checks !_probe_expr_ctxs.empty(). With 1 group key and
disable_streaming_preaggregations=true, FE was not requiring hash
exchange while BE did.
2. Add Bug 13 (NLJ COREDUMP) test case to test_local_shuffle_rqg_bugs.
3. Add 3 RQG-derived cases to test_local_shuffle_fe_be_consistency:
serial NLJ RIGHT OUTER, GLOBAL_HASH self-join+NLJ, FULL OUTER JOIN.
---
.../org/apache/doris/planner/AggregationNode.java | 2 +-
.../test_local_shuffle_fe_be_consistency.groovy | 42 ++++
.../test_local_shuffle_rqg_bugs.groovy | 231 +++++++++++++++++++++
3 files changed, 274 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 3b1aa65230f..885d911bc71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -279,7 +279,7 @@ public class AggregationNode extends PlanNode {
LocalExchangeTypeRequire requireChild;
if (canUseDistinctStreamingAgg(sessionVariable)) {
// DistinctStreamingAggOperatorX in BE
- if (needsFinalize || (aggInfo.getGroupingExprs().size() > 1 &&
!useStreamingPreagg)) {
+ if (needsFinalize || (!aggInfo.getGroupingExprs().isEmpty() &&
!useStreamingPreagg)) {
if (AddLocalExchange.isColocated(this)) {
requireChild = LocalExchangeTypeRequire.requireHash();
} else {
diff --git
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
index 2ca0d09f880..c27be60927b 100644
---
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
+++
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
@@ -860,6 +860,48 @@ suite("test_local_shuffle_fe_be_consistency") {
GROUP BY GROUPING SETS ((k1), ())
ORDER BY k1, rn""")
+ // ============================================================
+ // 14. RQG bug cases — serial NLJ + pooling scan (Bug 13 from rqg_bugs)
+ // Serial NLJ (RIGHT_OUTER) with pooling scan. Previously crashed because
+ // FE inserted BROADCAST on build side inflating num_tasks while probe
stayed
+ // serial. Fixed: serial NLJ sets buildSideRequire=noRequire().
+ // ============================================================
+ checkConsistencyWithSql("rqg_serial_nlj_right_outer_pooling",
+ """SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=0,
+ ignore_storage_data_distribution=true,
+ enable_share_hash_table_for_broadcast_join=false,
+ disable_streaming_preaggregations=true,
+ disable_join_reorder=true)*/
+ b.k1 AS field1
+ FROM ls_serial a
+ RIGHT OUTER JOIN ls_serial b ON a.v1 > b.v1
+ GROUP BY field1
+ ORDER BY field1 ASC""")
+
+ // GLOBAL_HASH_SHUFFLE fix (Bug 10 from rqg_bugs) — self-join + NLJ with
serial exchange
+ checkConsistencyWithSql("rqg_global_hash_shuffle_self_join_nlj",
+ """SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
+ ignore_storage_data_distribution=true,
+ disable_join_reorder=true,
disable_colocate_plan=true)*/
+ a.k1 AS field1, a.v1 AS field2
+ FROM ls_t1 a
+ LEFT JOIN ls_t1 b ON a.k1 = b.k2
+ LEFT JOIN ls_t1 c ON a.k1 > b.k2
+ WHERE a.v1 > 5
+ GROUP BY field1, field2
+ ORDER BY field1, field2""")
+
+ // FULL OUTER JOIN + GROUP BY with serial exchange (Bug 11 from rqg_bugs)
+ checkConsistencyWithSql("rqg_global_hash_full_outer_join",
+ """SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
+ ignore_storage_data_distribution=true)*/
+ a.k1, b.k1, count(1)
+ FROM ls_t1 a
+ FULL OUTER JOIN ls_t2 b ON a.k1 = b.k1
+ WHERE b.k1 = 2
+ GROUP BY a.k1, b.k1
+ ORDER BY 1, 2, 3""")
+
// ================================================================
// Phase 2: Fetch all profiles and compare results
// By now, all queries have been executed and profiles are being
diff --git
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
index 7950f54ee50..402fb4edeb2 100644
---
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
+++
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
@@ -37,6 +37,8 @@ suite("test_local_shuffle_rqg_bugs") {
// ============================================================
sql "DROP TABLE IF EXISTS rqg_t1"
sql "DROP TABLE IF EXISTS rqg_t2"
+ sql "DROP TABLE IF EXISTS rqg_t3"
+ sql "DROP TABLE IF EXISTS rqg_t4"
sql """
CREATE TABLE rqg_t1 (
@@ -63,6 +65,52 @@ suite("test_local_shuffle_rqg_bugs") {
PROPERTIES ("replication_num" = "1")
"""
+ // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar +
bigint columns
+ sql """
+ CREATE TABLE rqg_t3 (
+ pk INT NOT NULL,
+ col_bigint_undef_signed BIGINT,
+ col_varchar_10__undef_signed VARCHAR(10),
+ col_varchar_64__undef_signed VARCHAR(64)
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ // Second table for FULL OUTER JOIN case (col_bigint_undef_signed_not_null)
+ sql """
+ CREATE TABLE rqg_t4 (
+ pk INT NOT NULL,
+ col_bigint_undef_signed_not_null BIGINT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ sql """
+ INSERT INTO rqg_t3 VALUES
+ (0, -94, 'Abc', 'hello world'),
+ (1, 672609, 'Xyz', null),
+ (2, -3766684, 'Pqr', 'test string'),
+ (3, 5070261, 'abc', 'another row'),
+ (4, null, 'def', 'value four'),
+ (5, -86, 'XgpxlHBLEM', null),
+ (6, 21910, 'abc', 'they'),
+ (7, -63, 'zzzz', 'some text'),
+ (8, -8276281, 'AHlvNtoGLO', 'longer string here'),
+ (9, -101, 'mid', 'final row')
+ """
+
+ sql """
+ INSERT INTO rqg_t4 VALUES
+ (0, 0), (1, 1), (2, 2), (3, 3), (4, 4),
+ (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),
+ (10, 2), (11, 2), (12, 2), (13, 3), (14, 4),
+ (15, 5), (16, 2), (17, 2), (18, 2), (19, 9)
+ """
+
// Insert enough rows to exercise multiple pipeline tasks
sql """
INSERT INTO rqg_t1 VALUES
@@ -452,5 +500,188 @@ suite("test_local_shuffle_rqg_bugs") {
assertEquals(bug9_be, bug9_fe, "Bug 9: FE/BE result mismatch for agg after
NLJ + pooling scan")
logger.info("Bug 9: PASSED (FE/BE results match)")
+ // ============================================================
+ // Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched — self-join + NLJ
+ // RQG case: 906784672 (build 184181)
+ // Root cause: HashJoinNode used requireGlobalExecutionHash() → GLOBAL
local exchange
+ // inserted when use_serial_exchange=true; shuffle_idx_to_instance_idx
map has only
+ // 4 entries (1/BE) but GLOBAL hash needs N*dop entries → most rows
unrouted (0 actual rows).
+ // Fixed: changed to requireHash() so resolveExchangeType() downgrades to
LOCAL hash.
+ // SQL: self-join (table1 LEFT JOIN table1 table2 ON
pk=col_bigint_undef_signed)
+ // then NLJ (LEFT JOIN table1 table3 ON pk > col_bigint_undef_signed)
+ // ============================================================
+
+ logger.info("=== Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched - self-join +
NLJ (build 184181 case 906784672) ===")
+ def bug10_fe = sql """
+ SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
+ enable_local_shuffle_planner=true,
+ ignore_storage_data_distribution=true,
+ enable_sql_cache=false,
+ disable_join_reorder=true,
disable_colocate_plan=true)*/
+ table1.pk AS field1, table1.col_bigint_undef_signed AS field2
+ FROM rqg_t3 AS table1
+ LEFT JOIN rqg_t3 AS table2 ON table1.pk =
table2.col_bigint_undef_signed
+ LEFT JOIN rqg_t3 AS table3 ON table1.pk >
table2.col_bigint_undef_signed
+ WHERE (table1.col_varchar_10__undef_signed > 'AHlvNtoGLO'
+ AND table1.col_varchar_10__undef_signed < 'zzzz')
+ OR (table1.col_bigint_undef_signed = table1.pk AND
table1.col_varchar_64__undef_signed IS NULL)
+ OR (table1.pk != table1.pk AND table1.pk <> 2)
+ GROUP BY field1, field2
+ ORDER BY field1, field2
+ """
+ def bug10_be = sql """
+ SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
+ enable_local_shuffle_planner=false,
+ ignore_storage_data_distribution=true,
+ enable_sql_cache=false,
+ disable_join_reorder=true,
disable_colocate_plan=true)*/
+ table1.pk AS field1, table1.col_bigint_undef_signed AS field2
+ FROM rqg_t3 AS table1
+ LEFT JOIN rqg_t3 AS table2 ON table1.pk =
table2.col_bigint_undef_signed
+ LEFT JOIN rqg_t3 AS table3 ON table1.pk >
table2.col_bigint_undef_signed
+ WHERE (table1.col_varchar_10__undef_signed > 'AHlvNtoGLO'
+ AND table1.col_varchar_10__undef_signed < 'zzzz')
+ OR (table1.col_bigint_undef_signed = table1.pk AND
table1.col_varchar_64__undef_signed IS NULL)
+ OR (table1.pk != table1.pk AND table1.pk <> 2)
+ GROUP BY field1, field2
+ ORDER BY field1, field2
+ """
+ logger.info("Bug 10 FE rows: ${bug10_fe.size()}, BE rows:
${bug10_be.size()}")
+ assertEquals(bug10_be.size(), bug10_fe.size(), "Bug 10: FE/BE row count
mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)")
+ assertEquals(bug10_be, bug10_fe, "Bug 10: FE/BE result mismatch for
self-join + NLJ")
+ logger.info("Bug 10: PASSED")
+
+ // ============================================================
+ // Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched — FULL OUTER JOIN + GROUP
BY
+ // RQG case: 11007681241 (build 184181)
+ // Same root cause as Bug 10.
+ // SQL: FULL OUTER JOIN on col_bigint_undef_signed_not_null with WHERE +
GROUP BY
+ // ============================================================
+
+ logger.info("=== Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched - FULL OUTER
JOIN + GROUP BY (build 184181 case 11007681241) ===")
+ def bug11_fe = sql """
+ SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
+ enable_local_shuffle_planner=true,
+ ignore_storage_data_distribution=true,
+ enable_sql_cache=false)*/
+ t1.col_bigint_undef_signed_not_null,
t2.col_bigint_undef_signed_not_null, count(1)
+ FROM rqg_t4 t1
+ FULL OUTER JOIN rqg_t2 t2
+ ON t1.col_bigint_undef_signed_not_null =
t2.col_bigint_undef_signed_not_null
+ WHERE t2.col_bigint_undef_signed_not_null = 2
+ GROUP BY t1.col_bigint_undef_signed_not_null,
t2.col_bigint_undef_signed_not_null
+ ORDER BY 1, 2, 3
+ """
+ def bug11_be = sql """
+ SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
+ enable_local_shuffle_planner=false,
+ ignore_storage_data_distribution=true,
+ enable_sql_cache=false)*/
+ t1.col_bigint_undef_signed_not_null,
t2.col_bigint_undef_signed_not_null, count(1)
+ FROM rqg_t4 t1
+ FULL OUTER JOIN rqg_t2 t2
+ ON t1.col_bigint_undef_signed_not_null =
t2.col_bigint_undef_signed_not_null
+ WHERE t2.col_bigint_undef_signed_not_null = 2
+ GROUP BY t1.col_bigint_undef_signed_not_null,
t2.col_bigint_undef_signed_not_null
+ ORDER BY 1, 2, 3
+ """
+ logger.info("Bug 11 FE rows: ${bug11_fe.size()}, BE rows:
${bug11_be.size()}")
+ assertEquals(bug11_be.size(), bug11_fe.size(), "Bug 11: FE/BE row count
mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)")
+ assertEquals(bug11_be, bug11_fe, "Bug 11: FE/BE result mismatch for FULL
OUTER JOIN + GROUP BY")
+ logger.info("Bug 11: PASSED")
+
+ // ============================================================
+ // Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched — LEFT JOIN + VARCHAR
predicates + MIN()
+ // RQG case: 906784662 (build 184181)
+ // Same root cause as Bug 10/11.
+ // SQL: LEFT JOIN on pk with VARCHAR NOT IN / BETWEEN / IN predicates,
MIN() aggregate
+ // ============================================================
+
+ logger.info("=== Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched - LEFT JOIN +
VARCHAR predicates (build 184181 case 906784662) ===")
+ def bug12_fe = sql """
+ SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
+ enable_local_shuffle_planner=true,
+ ignore_storage_data_distribution=true,
+ enable_sql_cache=false,
+ disable_join_reorder=true,
disable_colocate_plan=true)*/
+ table1.pk AS field1, MIN(table1.pk) AS field2
+ FROM rqg_t3 AS table1
+ LEFT JOIN rqg_t1 AS table2 ON table2.pk = table1.pk
+ WHERE table1.col_varchar_64__undef_signed NOT IN ('they')
+ AND table1.col_varchar_10__undef_signed BETWEEN 'AHlvNtoGLO' AND 'z'
+ AND table1.pk IN (3, 6, 8, 9, 2)
+ GROUP BY field1
+ ORDER BY field1, field2 ASC
+ """
+ def bug12_be = sql """
+ SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
+ enable_local_shuffle_planner=false,
+ ignore_storage_data_distribution=true,
+ enable_sql_cache=false,
+ disable_join_reorder=true,
disable_colocate_plan=true)*/
+ table1.pk AS field1, MIN(table1.pk) AS field2
+ FROM rqg_t3 AS table1
+ LEFT JOIN rqg_t1 AS table2 ON table2.pk = table1.pk
+ WHERE table1.col_varchar_64__undef_signed NOT IN ('they')
+ AND table1.col_varchar_10__undef_signed BETWEEN 'AHlvNtoGLO' AND 'z'
+ AND table1.pk IN (3, 6, 8, 9, 2)
+ GROUP BY field1
+ ORDER BY field1, field2 ASC
+ """
+ logger.info("Bug 12 FE rows: ${bug12_fe.size()}, BE rows:
${bug12_be.size()}")
+ assertEquals(bug12_be.size(), bug12_fe.size(), "Bug 12: FE/BE row count
mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)")
+ assertEquals(bug12_be, bug12_fe, "Bug 12: FE/BE result mismatch for LEFT
JOIN + VARCHAR predicates")
+ logger.info("Bug 12: PASSED")
+
+ // ============================================================
+ // Bug 13: NLJ COREDUMP — serial NLJ + pooling scan + BROADCAST build side
+ // RQG build 184430, query c0dafc1bed0f4910
+ // Root cause: serial NLJ (RIGHT_OUTER) with pooling scan inserted
BROADCAST
+ // local exchange on build side, inflating build pipeline num_tasks to
_num_instances
+ // while probe pipeline stayed at 1 task. Instance 1+ created build tasks
without
+ // corresponding probe tasks → source_deps empty → set_ready_to_read()
crash.
+ // Fixed: serial NLJ sets buildSideRequire=noRequire() to match BE-native
+ // num_tasks_of_parent()<=1 skip logic.
+ // ============================================================
+
+ logger.info("=== Bug 13: NLJ COREDUMP - serial NLJ + pooling scan (FE
planner) ===")
+ try {
+ def bug13_fe = sql """
+ SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=0,
+ enable_local_shuffle_planner=true,
+ ignore_storage_data_distribution=true,
+ enable_sql_cache=false,
+ enable_share_hash_table_for_broadcast_join=false,
+ disable_streaming_preaggregations=true,
+ disable_join_reorder=true)*/
+ t2.col_bigint_undef_signed_not_null AS field1
+ FROM rqg_t4 AS t1
+ RIGHT OUTER JOIN rqg_t2 AS t2 ON
t1.col_bigint_undef_signed_not_null > t2.col_bigint_undef_signed_not_null
+ GROUP BY field1
+ ORDER BY field1 ASC
+ """
+ def bug13_be = sql """
+ SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=0,
+ enable_local_shuffle_planner=false,
+ ignore_storage_data_distribution=true,
+ enable_sql_cache=false,
+ enable_share_hash_table_for_broadcast_join=false,
+ disable_streaming_preaggregations=true,
+ disable_join_reorder=true)*/
+ t2.col_bigint_undef_signed_not_null AS field1
+ FROM rqg_t4 AS t1
+ RIGHT OUTER JOIN rqg_t2 AS t2 ON
t1.col_bigint_undef_signed_not_null > t2.col_bigint_undef_signed_not_null
+ GROUP BY field1
+ ORDER BY field1 ASC
+ """
+ logger.info("Bug 13 FE rows: ${bug13_fe.size()}, BE rows:
${bug13_be.size()}")
+ assertEquals(bug13_be.size(), bug13_fe.size(), "Bug 13: FE/BE row
count mismatch (NLJ COREDUMP)")
+ assertEquals(bug13_be, bug13_fe, "Bug 13: FE/BE result mismatch for
serial NLJ + pooling scan")
+ logger.info("Bug 13: PASSED (no crash, results match)")
+ } catch (Throwable t) {
+ logger.error("Bug 13 FAILED: ${t.message}")
+ assertTrue(false, "Bug 13: NLJ COREDUMP (serial NLJ + pooling scan):
${t.message}")
+ }
+
logger.info("=== All RQG bug reproduction tests completed ===")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]