This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/fe_local_shuffle by this push:
     new 31321c3bf48 [fix](local shuffle) fix DistinctStreamingAgg hash 
requirement and add RQG regression tests
31321c3bf48 is described below

commit 31321c3bf48b08a6ac1c3d28409fae5527a58ff3
Author: 924060929 <[email protected]>
AuthorDate: Wed Apr 1 13:46:53 2026 +0800

    [fix](local shuffle) fix DistinctStreamingAgg hash requirement and add RQG 
regression tests
    
    1. AggregationNode: fix canUseDistinctStreamingAgg branch condition from
       size() > 1 to !isEmpty(), matching BE's DistinctStreamingAggOperatorX
       which checks !_probe_expr_ctxs.empty(). With 1 group key and
       disable_streaming_preaggregations=true, FE was not requiring hash
       exchange while BE did.
    
    2. Add Bug 13 (NLJ COREDUMP) test case to test_local_shuffle_rqg_bugs.
    
    3. Add 3 RQG-derived cases to test_local_shuffle_fe_be_consistency:
       serial NLJ RIGHT OUTER, GLOBAL_HASH self-join+NLJ, FULL OUTER JOIN.
---
 .../org/apache/doris/planner/AggregationNode.java  |   2 +-
 .../test_local_shuffle_fe_be_consistency.groovy    |  42 ++++
 .../test_local_shuffle_rqg_bugs.groovy             | 231 +++++++++++++++++++++
 3 files changed, 274 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 3b1aa65230f..885d911bc71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -279,7 +279,7 @@ public class AggregationNode extends PlanNode {
         LocalExchangeTypeRequire requireChild;
         if (canUseDistinctStreamingAgg(sessionVariable)) {
             // DistinctStreamingAggOperatorX in BE
-            if (needsFinalize || (aggInfo.getGroupingExprs().size() > 1 && 
!useStreamingPreagg)) {
+            if (needsFinalize || (!aggInfo.getGroupingExprs().isEmpty() && 
!useStreamingPreagg)) {
                 if (AddLocalExchange.isColocated(this)) {
                     requireChild = LocalExchangeTypeRequire.requireHash();
                 } else {
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
index 2ca0d09f880..c27be60927b 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
@@ -860,6 +860,48 @@ suite("test_local_shuffle_fe_be_consistency") {
            GROUP BY GROUPING SETS ((k1), ())
            ORDER BY k1, rn""")
 
+    // ============================================================
+    //  14. RQG bug cases — serial NLJ + pooling scan (Bug 13 from rqg_bugs)
+    //  Serial NLJ (RIGHT_OUTER) with pooling scan. Previously crashed because
+    //  FE inserted BROADCAST on build side inflating num_tasks while probe 
stayed
+    //  serial. Fixed: serial NLJ sets buildSideRequire=noRequire().
+    // ============================================================
+    checkConsistencyWithSql("rqg_serial_nlj_right_outer_pooling",
+        """SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=0,
+                             ignore_storage_data_distribution=true,
+                             enable_share_hash_table_for_broadcast_join=false,
+                             disable_streaming_preaggregations=true,
+                             disable_join_reorder=true)*/
+              b.k1 AS field1
+          FROM ls_serial a
+          RIGHT OUTER JOIN ls_serial b ON a.v1 > b.v1
+          GROUP BY field1
+          ORDER BY field1 ASC""")
+
+    // GLOBAL_HASH_SHUFFLE fix (Bug 10 from rqg_bugs) — self-join + NLJ with 
serial exchange
+    checkConsistencyWithSql("rqg_global_hash_shuffle_self_join_nlj",
+        """SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
+                             ignore_storage_data_distribution=true,
+                             disable_join_reorder=true, 
disable_colocate_plan=true)*/
+              a.k1 AS field1, a.v1 AS field2
+          FROM ls_t1 a
+          LEFT JOIN ls_t1 b ON a.k1 = b.k2
+          LEFT JOIN ls_t1 c ON a.k1 > b.k2
+          WHERE a.v1 > 5
+          GROUP BY field1, field2
+          ORDER BY field1, field2""")
+
+    // FULL OUTER JOIN + GROUP BY with serial exchange (Bug 11 from rqg_bugs)
+    checkConsistencyWithSql("rqg_global_hash_full_outer_join",
+        """SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
+                             ignore_storage_data_distribution=true)*/
+              a.k1, b.k1, count(1)
+          FROM ls_t1 a
+          FULL OUTER JOIN ls_t2 b ON a.k1 = b.k1
+          WHERE b.k1 = 2
+          GROUP BY a.k1, b.k1
+          ORDER BY 1, 2, 3""")
+
     // ================================================================
     //  Phase 2: Fetch all profiles and compare results
     //  By now, all queries have been executed and profiles are being
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
index 7950f54ee50..402fb4edeb2 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
@@ -37,6 +37,8 @@ suite("test_local_shuffle_rqg_bugs") {
     // ============================================================
     sql "DROP TABLE IF EXISTS rqg_t1"
     sql "DROP TABLE IF EXISTS rqg_t2"
+    sql "DROP TABLE IF EXISTS rqg_t3"
+    sql "DROP TABLE IF EXISTS rqg_t4"
 
     sql """
         CREATE TABLE rqg_t1 (
@@ -63,6 +65,52 @@ suite("test_local_shuffle_rqg_bugs") {
         PROPERTIES ("replication_num" = "1")
     """
 
+    // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar + 
bigint columns
+    sql """
+        CREATE TABLE rqg_t3 (
+            pk INT NOT NULL,
+            col_bigint_undef_signed BIGINT,
+            col_varchar_10__undef_signed VARCHAR(10),
+            col_varchar_64__undef_signed VARCHAR(64)
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    // Second table for FULL OUTER JOIN case (col_bigint_undef_signed_not_null)
+    sql """
+        CREATE TABLE rqg_t4 (
+            pk INT NOT NULL,
+            col_bigint_undef_signed_not_null BIGINT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    sql """
+        INSERT INTO rqg_t3 VALUES
+            (0, -94, 'Abc', 'hello world'),
+            (1, 672609, 'Xyz', null),
+            (2, -3766684, 'Pqr', 'test string'),
+            (3, 5070261, 'abc', 'another row'),
+            (4, null, 'def', 'value four'),
+            (5, -86, 'XgpxlHBLEM', null),
+            (6, 21910, 'abc', 'they'),
+            (7, -63, 'zzzz', 'some text'),
+            (8, -8276281, 'AHlvNtoGLO', 'longer string here'),
+            (9, -101, 'mid', 'final row')
+    """
+
+    sql """
+        INSERT INTO rqg_t4 VALUES
+            (0, 0), (1, 1), (2, 2), (3, 3), (4, 4),
+            (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),
+            (10, 2), (11, 2), (12, 2), (13, 3), (14, 4),
+            (15, 5), (16, 2), (17, 2), (18, 2), (19, 9)
+    """
+
     // Insert enough rows to exercise multiple pipeline tasks
     sql """
         INSERT INTO rqg_t1 VALUES
@@ -452,5 +500,188 @@ suite("test_local_shuffle_rqg_bugs") {
     assertEquals(bug9_be, bug9_fe, "Bug 9: FE/BE result mismatch for agg after 
NLJ + pooling scan")
     logger.info("Bug 9: PASSED (FE/BE results match)")
 
+    // ============================================================
+    //  Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched — self-join + NLJ
+    //  RQG case: 906784672 (build 184181)
+    //  Root cause: HashJoinNode used requireGlobalExecutionHash() → GLOBAL 
local exchange
+    //  inserted when use_serial_exchange=true; shuffle_idx_to_instance_idx 
map has only
+    //  4 entries (1/BE) but GLOBAL hash needs N*dop entries → most rows 
unrouted (0 actual rows).
+    //  Fixed: changed to requireHash() so resolveExchangeType() downgrades to 
LOCAL hash.
+    //  SQL: self-join (table1 LEFT JOIN table1 table2 ON 
pk=col_bigint_undef_signed)
+    //       then NLJ (LEFT JOIN table1 table3 ON pk > col_bigint_undef_signed)
+    // ============================================================
+
+    logger.info("=== Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched - self-join + 
NLJ (build 184181 case 906784672) ===")
+    def bug10_fe = sql """
+        SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
+                          enable_local_shuffle_planner=true,
+                          ignore_storage_data_distribution=true,
+                          enable_sql_cache=false,
+                          disable_join_reorder=true, 
disable_colocate_plan=true)*/
+            table1.pk AS field1, table1.col_bigint_undef_signed AS field2
+        FROM rqg_t3 AS table1
+        LEFT JOIN rqg_t3 AS table2 ON table1.pk = 
table2.col_bigint_undef_signed
+        LEFT JOIN rqg_t3 AS table3 ON table1.pk > 
table2.col_bigint_undef_signed
+        WHERE (table1.col_varchar_10__undef_signed > 'AHlvNtoGLO'
+               AND table1.col_varchar_10__undef_signed < 'zzzz')
+           OR (table1.col_bigint_undef_signed = table1.pk AND 
table1.col_varchar_64__undef_signed IS NULL)
+           OR (table1.pk != table1.pk AND table1.pk <> 2)
+        GROUP BY field1, field2
+        ORDER BY field1, field2
+    """
+    def bug10_be = sql """
+        SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
+                          enable_local_shuffle_planner=false,
+                          ignore_storage_data_distribution=true,
+                          enable_sql_cache=false,
+                          disable_join_reorder=true, 
disable_colocate_plan=true)*/
+            table1.pk AS field1, table1.col_bigint_undef_signed AS field2
+        FROM rqg_t3 AS table1
+        LEFT JOIN rqg_t3 AS table2 ON table1.pk = 
table2.col_bigint_undef_signed
+        LEFT JOIN rqg_t3 AS table3 ON table1.pk > 
table2.col_bigint_undef_signed
+        WHERE (table1.col_varchar_10__undef_signed > 'AHlvNtoGLO'
+               AND table1.col_varchar_10__undef_signed < 'zzzz')
+           OR (table1.col_bigint_undef_signed = table1.pk AND 
table1.col_varchar_64__undef_signed IS NULL)
+           OR (table1.pk != table1.pk AND table1.pk <> 2)
+        GROUP BY field1, field2
+        ORDER BY field1, field2
+    """
+    logger.info("Bug 10 FE rows: ${bug10_fe.size()}, BE rows: 
${bug10_be.size()}")
+    assertEquals(bug10_be.size(), bug10_fe.size(), "Bug 10: FE/BE row count 
mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)")
+    assertEquals(bug10_be, bug10_fe, "Bug 10: FE/BE result mismatch for 
self-join + NLJ")
+    logger.info("Bug 10: PASSED")
+
+    // ============================================================
+    //  Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched — FULL OUTER JOIN + GROUP 
BY
+    //  RQG case: 11007681241 (build 184181)
+    //  Same root cause as Bug 10.
+    //  SQL: FULL OUTER JOIN on col_bigint_undef_signed_not_null with WHERE + 
GROUP BY
+    // ============================================================
+
+    logger.info("=== Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched - FULL OUTER 
JOIN + GROUP BY (build 184181 case 11007681241) ===")
+    def bug11_fe = sql """
+        SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
+                          enable_local_shuffle_planner=true,
+                          ignore_storage_data_distribution=true,
+                          enable_sql_cache=false)*/
+            t1.col_bigint_undef_signed_not_null, 
t2.col_bigint_undef_signed_not_null, count(1)
+        FROM rqg_t4 t1
+        FULL OUTER JOIN rqg_t2 t2
+          ON t1.col_bigint_undef_signed_not_null = 
t2.col_bigint_undef_signed_not_null
+        WHERE t2.col_bigint_undef_signed_not_null = 2
+        GROUP BY t1.col_bigint_undef_signed_not_null, 
t2.col_bigint_undef_signed_not_null
+        ORDER BY 1, 2, 3
+    """
+    def bug11_be = sql """
+        SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
+                          enable_local_shuffle_planner=false,
+                          ignore_storage_data_distribution=true,
+                          enable_sql_cache=false)*/
+            t1.col_bigint_undef_signed_not_null, 
t2.col_bigint_undef_signed_not_null, count(1)
+        FROM rqg_t4 t1
+        FULL OUTER JOIN rqg_t2 t2
+          ON t1.col_bigint_undef_signed_not_null = 
t2.col_bigint_undef_signed_not_null
+        WHERE t2.col_bigint_undef_signed_not_null = 2
+        GROUP BY t1.col_bigint_undef_signed_not_null, 
t2.col_bigint_undef_signed_not_null
+        ORDER BY 1, 2, 3
+    """
+    logger.info("Bug 11 FE rows: ${bug11_fe.size()}, BE rows: 
${bug11_be.size()}")
+    assertEquals(bug11_be.size(), bug11_fe.size(), "Bug 11: FE/BE row count 
mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)")
+    assertEquals(bug11_be, bug11_fe, "Bug 11: FE/BE result mismatch for FULL 
OUTER JOIN + GROUP BY")
+    logger.info("Bug 11: PASSED")
+
+    // ============================================================
+    //  Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched — LEFT JOIN + VARCHAR 
predicates + MIN()
+    //  RQG case: 906784662 (build 184181)
+    //  Same root cause as Bug 10/11.
+    //  SQL: LEFT JOIN on pk with VARCHAR NOT IN / BETWEEN / IN predicates, 
MIN() aggregate
+    // ============================================================
+
+    logger.info("=== Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched - LEFT JOIN + 
VARCHAR predicates (build 184181 case 906784662) ===")
+    def bug12_fe = sql """
+        SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
+                          enable_local_shuffle_planner=true,
+                          ignore_storage_data_distribution=true,
+                          enable_sql_cache=false,
+                          disable_join_reorder=true, 
disable_colocate_plan=true)*/
+            table1.pk AS field1, MIN(table1.pk) AS field2
+        FROM rqg_t3 AS table1
+        LEFT JOIN rqg_t1 AS table2 ON table2.pk = table1.pk
+        WHERE table1.col_varchar_64__undef_signed NOT IN ('they')
+          AND table1.col_varchar_10__undef_signed BETWEEN 'AHlvNtoGLO' AND 'z'
+          AND table1.pk IN (3, 6, 8, 9, 2)
+        GROUP BY field1
+        ORDER BY field1, field2 ASC
+    """
+    def bug12_be = sql """
+        SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
+                          enable_local_shuffle_planner=false,
+                          ignore_storage_data_distribution=true,
+                          enable_sql_cache=false,
+                          disable_join_reorder=true, 
disable_colocate_plan=true)*/
+            table1.pk AS field1, MIN(table1.pk) AS field2
+        FROM rqg_t3 AS table1
+        LEFT JOIN rqg_t1 AS table2 ON table2.pk = table1.pk
+        WHERE table1.col_varchar_64__undef_signed NOT IN ('they')
+          AND table1.col_varchar_10__undef_signed BETWEEN 'AHlvNtoGLO' AND 'z'
+          AND table1.pk IN (3, 6, 8, 9, 2)
+        GROUP BY field1
+        ORDER BY field1, field2 ASC
+    """
+    logger.info("Bug 12 FE rows: ${bug12_fe.size()}, BE rows: 
${bug12_be.size()}")
+    assertEquals(bug12_be.size(), bug12_fe.size(), "Bug 12: FE/BE row count 
mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)")
+    assertEquals(bug12_be, bug12_fe, "Bug 12: FE/BE result mismatch for LEFT 
JOIN + VARCHAR predicates")
+    logger.info("Bug 12: PASSED")
+
+    // ============================================================
+    //  Bug 13: NLJ COREDUMP — serial NLJ + pooling scan + BROADCAST build side
+    //  RQG build 184430, query c0dafc1bed0f4910
+    //  Root cause: serial NLJ (RIGHT_OUTER) with pooling scan inserted 
BROADCAST
+    //  local exchange on build side, inflating build pipeline num_tasks to 
_num_instances
+    //  while probe pipeline stayed at 1 task. Instance 1+ created build tasks 
without
+    //  corresponding probe tasks → source_deps empty → set_ready_to_read() 
crash.
+    //  Fixed: serial NLJ sets buildSideRequire=noRequire() to match BE-native
+    //  num_tasks_of_parent()<=1 skip logic.
+    // ============================================================
+
+    logger.info("=== Bug 13: NLJ COREDUMP - serial NLJ + pooling scan (FE 
planner) ===")
+    try {
+        def bug13_fe = sql """
+            SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=0,
+                              enable_local_shuffle_planner=true,
+                              ignore_storage_data_distribution=true,
+                              enable_sql_cache=false,
+                              enable_share_hash_table_for_broadcast_join=false,
+                              disable_streaming_preaggregations=true,
+                              disable_join_reorder=true)*/
+                t2.col_bigint_undef_signed_not_null AS field1
+            FROM rqg_t4 AS t1
+            RIGHT OUTER JOIN rqg_t2 AS t2 ON 
t1.col_bigint_undef_signed_not_null > t2.col_bigint_undef_signed_not_null
+            GROUP BY field1
+            ORDER BY field1 ASC
+        """
+        def bug13_be = sql """
+            SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=0,
+                              enable_local_shuffle_planner=false,
+                              ignore_storage_data_distribution=true,
+                              enable_sql_cache=false,
+                              enable_share_hash_table_for_broadcast_join=false,
+                              disable_streaming_preaggregations=true,
+                              disable_join_reorder=true)*/
+                t2.col_bigint_undef_signed_not_null AS field1
+            FROM rqg_t4 AS t1
+            RIGHT OUTER JOIN rqg_t2 AS t2 ON 
t1.col_bigint_undef_signed_not_null > t2.col_bigint_undef_signed_not_null
+            GROUP BY field1
+            ORDER BY field1 ASC
+        """
+        logger.info("Bug 13 FE rows: ${bug13_fe.size()}, BE rows: 
${bug13_be.size()}")
+        assertEquals(bug13_be.size(), bug13_fe.size(), "Bug 13: FE/BE row 
count mismatch (NLJ COREDUMP)")
+        assertEquals(bug13_be, bug13_fe, "Bug 13: FE/BE result mismatch for 
serial NLJ + pooling scan")
+        logger.info("Bug 13: PASSED (no crash, results match)")
+    } catch (Throwable t) {
+        logger.error("Bug 13 FAILED: ${t.message}")
+        assertTrue(false, "Bug 13: NLJ COREDUMP (serial NLJ + pooling scan): 
${t.message}")
+    }
+
     logger.info("=== All RQG bug reproduction tests completed ===")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to