This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2fc59a7cf527befaae3fa5b33e2fe9a6cc893fa1
Author: 924060929 <[email protected]>
AuthorDate: Tue Mar 31 14:34:04 2026 +0800

    [test](local shuffle) add FE/BE consistency and RQG regression tests
    
    test_local_shuffle_fe_be_consistency.groovy: covers 55 cases comparing
    LOCAL_EXCHANGE_SINK_OPERATOR profile counts between FE-planned and
    BE-native local exchange. All 55 now MATCH (0 knownDiff, 0 mismatch).
    
    test_local_shuffle_rqg_bugs.groovy: reproduces 9 bugs found by RQG:
    - Bug 1/2: must set shared state AGGREGATION/SORT_OPERATOR (serial exchange)
    - Bug 3:   incorrect results GROUPING SETS + subquery + window
    - Bug 4/5: GROUPING SETS + window variations with serial exchange
    - Bug 6:   must set shared state CROSS_JOIN_OPERATOR (nested NLJ + pooling)
    - Bug 7:   DataStreamSink hang — sender fragment with pooling scan
    - Bug 8a/b: SORT/UNION_OPERATOR shared state — GROUPING SETS + pooling scan
    - Bug 9:   FE/BE result inconsistency — agg after NLJ + pooling scan
    
    print_known_diff_trees.groovy: helper to print BE operator trees for
    previously-knownDiff cases during investigation.
---
 .../local_shuffle/print_known_diff_trees.groovy    |  80 ++++
 .../test_local_shuffle_rqg_bugs.groovy             | 456 +++++++++++++++++++++
 2 files changed, 536 insertions(+)

diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/print_known_diff_trees.groovy 
b/regression-test/suites/nereids_p0/local_shuffle/print_known_diff_trees.groovy
new file mode 100644
index 00000000000..7ce3fd2352e
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/print_known_diff_trees.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("print_known_diff_trees", "p0") {
+    sql "use regression_test_nereids_p0_local_shuffle"
+
+    sql "set enable_pipeline_engine=true;"
+    sql "set enable_local_shuffle_planner=false;"
+    sql "set disable_join_reorder=true;"
+    sql "set disable_colocate_plan=true;"
+    sql "set ignore_storage_data_distribution=false;"
+    sql "set parallel_pipeline_task_num=4;"
+    sql "set auto_broadcast_join_threshold=-1;"
+    sql "set broadcast_row_count_limit=0;"
+
+    def printBETree = { String tag, String testSql ->
+        def tree = profile_plan_tree_from_sql(testSql)
+        logger.info("=== [${tag}] BE operator tree ===\nSQL: 
${testSql.trim()}\n${tree}")
+    }
+
+    printBETree("analytic_no_partition",
+        "SELECT k1, sum(v1) OVER() AS s FROM ls_t1 ORDER BY k1, s")
+
+    printBETree("distinct_non_bucket_key",
+        "SELECT DISTINCT k2 FROM ls_t1 ORDER BY k2")
+
+    printBETree("set_intersect_three_way",
+        """SELECT k1 FROM ls_t1
+           INTERSECT
+           SELECT k1 FROM ls_t2
+           INTERSECT
+           SELECT k1 FROM ls_t3
+           ORDER BY k1""")
+
+    printBETree("union_all_followed_by_agg",
+        """SELECT k1, count(*) AS cnt
+           FROM (
+               SELECT k1, v1 AS v FROM ls_t1
+               UNION ALL
+               SELECT k1, v2 AS v FROM ls_t2
+           ) u
+           GROUP BY k1
+           ORDER BY k1""")
+
+    printBETree("table_function",
+        """SELECT k1, e1 FROM ls_t1
+           LATERAL VIEW explode_numbers(v1) tmp AS e1
+           ORDER BY k1, e1 LIMIT 20""")
+
+    printBETree("agg_after_shuffle_join_non_bucket_key",
+        """SELECT a.k2, count(*) AS cnt
+           FROM ls_t1 a JOIN [shuffle] ls_t2 b ON a.k1 = b.k1
+           GROUP BY a.k2
+           ORDER BY a.k2""")
+
+    printBETree("agg_after_broadcast_join",
+        """SELECT a.k1, count(*) AS cnt
+           FROM ls_t1 a JOIN [broadcast] ls_t2 b ON a.k1 = b.k1
+           GROUP BY a.k1
+           ORDER BY a.k1""")
+
+    printBETree("agg_finalize_serial_pooling_bucket",
+            "SELECT 
/*+SET_VAR(disable_join_reorder=true,disable_colocate_plan=true,ignore_storage_data_distribution=true,parallel_pipeline_task_num=4,auto_broadcast_join_threshold=-1,broadcast_row_count_limit=0)*/k1,
 count(*) AS cnt FROM ls_serial GROUP BY k1 ORDER BY k1",
+            )
+
+}
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
new file mode 100644
index 00000000000..7950f54ee50
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
@@ -0,0 +1,456 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * Regression tests for bugs discovered by RQG testing on the local-exchange2 
branch.
+ *
+ * These queries triggered "must set shared state" errors or incorrect results
+ * in RQG build 183992.  Common conditions:
+ *   - use_serial_exchange=true  (makes ALL Exchanges serial, not just 
UNPARTITIONED)
+ *   - enable_local_shuffle_planner=true (FE-planned local exchange)
+ *   - parallel_pipeline_task_num > 1
+ *
+ * Error types reproduced:
+ *   1. must set shared state, in AGGREGATION_OPERATOR
+ *   2. must set shared state, in SORT_OPERATOR
+ *   3. incorrect results with GROUPING SETS + scalar subquery + window 
function
+ */
+suite("test_local_shuffle_rqg_bugs") {
+
+    // ============================================================
+    //  Table setup — mirrors RQG table structure
+    //  10 buckets to match RQG (replication_num=1 for single-BE testing)
+    // ============================================================
+    sql "DROP TABLE IF EXISTS rqg_t1"
+    sql "DROP TABLE IF EXISTS rqg_t2"
+
+    sql """
+        CREATE TABLE rqg_t1 (
+            pk INT NOT NULL,
+            col_int_undef_signed INT,
+            col_int_undef_signed2 INT,
+            col_int_undef_signed_not_null INT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    sql """
+        CREATE TABLE rqg_t2 (
+            pk INT NOT NULL,
+            col_int_undef_signed INT,
+            col_int_undef_signed2 INT,
+            col_bigint_undef_signed_not_null BIGINT NOT NULL,
+            col_decimal_38_10__undef_signed_not_null DECIMAL(38,10) NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    // Insert enough rows to exercise multiple pipeline tasks
+    sql """
+        INSERT INTO rqg_t1 VALUES
+            (0, 0, 10, 0), (1, 1, 11, 1), (2, 2, 12, 2), (3, 3, 13, 3),
+            (4, 4, 14, 4), (5, 5, 15, 5), (6, 6, 16, 6), (7, 7, 17, 7),
+            (8, 8, 18, 8), (9, 9, 19, 9), (10, 0, 20, 10), (11, 1, 21, 11),
+            (12, 2, 22, 12), (13, 3, 23, 13), (14, 4, 24, 14), (15, 5, 25, 15),
+            (16, 6, 26, 16), (17, 7, 27, 17), (18, 8, 28, 18), (19, 9, 29, 19)
+    """
+
+    sql """
+        INSERT INTO rqg_t2 VALUES
+            (0, 0, 10, 100, 1.5), (1, 1, 11, 101, 2.5), (2, 2, 12, 102, 3.5),
+            (3, 3, 13, 103, 4.5), (4, 4, 14, 104, 5.5), (5, 5, 15, 105, 6.5),
+            (6, 6, 16, 106, 7.5), (7, 7, 17, 107, 8.5), (8, 8, 18, 108, 9.5),
+            (9, 9, 19, 109, 10.5), (10, 0, 20, 110, 11.5), (11, 1, 21, 111, 
12.5),
+            (12, 2, 22, 112, 13.5), (13, 3, 23, 113, 14.5), (14, 4, 24, 114, 
15.5),
+            (15, 5, 25, 115, 16.5), (16, 6, 26, 116, 17.5), (17, 7, 27, 117, 
18.5),
+            (18, 8, 28, 118, 19.5), (19, 9, 29, 119, 20.5)
+    """
+
+    // Wait for data to be visible
+    Thread.sleep(5000)
+
+    // ============================================================
+    //  Common settings
+    // ============================================================
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+    sql "SET runtime_filter_mode=off"
+    sql "SET enable_profile=true"
+    sql "SET enable_sql_cache=false"
+    sql "SET enable_local_shuffle=true"
+
+    // ============================================================
+    //  Bug 1: must set shared state, in AGGREGATION_OPERATOR
+    //  RQG case: eliminate_group_by_uniform.case_id_11007680713
+    //  Key conditions: use_serial_exchange=true, parallel_pipeline_task_num=3
+    //  SQL: EXCEPT with count(*) GROUP BY on both sides
+    // ============================================================
+
+    // Test with FE planner (the buggy path)
+    logger.info("=== Bug 1a: AGG shared state - EXCEPT with serial exchange 
(FE planner) ===")
+    try {
+        sql """
+            SELECT 
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=3,
+                              enable_local_shuffle_planner=true,
+                              ignore_storage_data_distribution=true,
+                              enable_common_expr_pushdown=false,
+                              disable_streaming_preaggregations=true)*/
+                col_int_undef_signed_not_null as col1,
+                col_int_undef_signed_not_null as col2,
+                0 as col3, count(1)
+            FROM rqg_t1
+            GROUP BY col1, col2, col3
+            EXCEPT
+            SELECT col_bigint_undef_signed_not_null as col1,
+                   col_decimal_38_10__undef_signed_not_null as col2,
+                   5 as col3, count(1)
+            FROM rqg_t2
+            GROUP BY col1, col2, col3
+        """
+        logger.info("Bug 1a: PASSED (no crash)")
+    } catch (Throwable t) {
+        logger.error("Bug 1a FAILED: ${t.message}")
+        assertTrue(false, "Bug 1a: must set shared state in 
AGGREGATION_OPERATOR: ${t.message}")
+    }
+
+    // Compare with BE native planner
+    logger.info("=== Bug 1b: AGG shared state - EXCEPT with serial exchange 
(BE native) ===")
+    try {
+        sql """
+            SELECT 
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=3,
+                              enable_local_shuffle_planner=false,
+                              ignore_storage_data_distribution=true,
+                              enable_common_expr_pushdown=false,
+                              disable_streaming_preaggregations=true)*/
+                col_int_undef_signed_not_null as col1,
+                col_int_undef_signed_not_null as col2,
+                0 as col3, count(1)
+            FROM rqg_t1
+            GROUP BY col1, col2, col3
+            EXCEPT
+            SELECT col_bigint_undef_signed_not_null as col1,
+                   col_decimal_38_10__undef_signed_not_null as col2,
+                   5 as col3, count(1)
+            FROM rqg_t2
+            GROUP BY col1, col2, col3
+        """
+        logger.info("Bug 1b: PASSED (no crash)")
+    } catch (Throwable t) {
+        logger.error("Bug 1b FAILED: ${t.message}")
+        assertTrue(false, "Bug 1b: BE native also fails: ${t.message}")
+    }
+
+    // ============================================================
+    //  Bug 2: must set shared state, in SORT_OPERATOR
+    //  RQG case: grouping_set.case_id_5308471751
+    //  Key conditions: use_serial_exchange=true, parallel_pipeline_task_num=5
+    //  SQL: GROUPING SETS + window function (PERCENT_RANK)
+    // ============================================================
+
+    logger.info("=== Bug 2a: SORT shared state - GROUPING SETS + window (FE 
planner) ===")
+    try {
+        sql """
+            SELECT 
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=5,
+                              enable_local_shuffle_planner=true,
+                              ignore_storage_data_distribution=true,
+                              enable_share_hash_table_for_broadcast_join=false,
+                              disable_streaming_preaggregations=true)*/
+                SUM(PERCENT_RANK() OVER (PARTITION BY col_int_undef_signed2 
ORDER BY col_int_undef_signed2))
+            FROM rqg_t1
+            GROUP BY GROUPING SETS ((col_int_undef_signed2),(pk, 
pk),(col_int_undef_signed))
+        """
+        logger.info("Bug 2a: PASSED (no crash)")
+    } catch (Throwable t) {
+        logger.error("Bug 2a FAILED: ${t.message}")
+        assertTrue(false, "Bug 2a: must set shared state in SORT_OPERATOR: 
${t.message}")
+    }
+
+    logger.info("=== Bug 2b: SORT shared state - GROUPING SETS + window (BE 
native) ===")
+    try {
+        sql """
+            SELECT 
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=5,
+                              enable_local_shuffle_planner=false,
+                              ignore_storage_data_distribution=true,
+                              enable_share_hash_table_for_broadcast_join=false,
+                              disable_streaming_preaggregations=true)*/
+                SUM(PERCENT_RANK() OVER (PARTITION BY col_int_undef_signed2 
ORDER BY col_int_undef_signed2))
+            FROM rqg_t1
+            GROUP BY GROUPING SETS ((col_int_undef_signed2),(pk, 
pk),(col_int_undef_signed))
+        """
+        logger.info("Bug 2b: PASSED (no crash)")
+    } catch (Throwable t) {
+        logger.error("Bug 2b FAILED: ${t.message}")
+        assertTrue(false, "Bug 2b: BE native also fails: ${t.message}")
+    }
+
+    // ============================================================
+    //  Bug 3: incorrect results with GROUPING SETS + scalar subquery + window
+    //  RQG case: grouping_set.case_id_5694495756
+    //  Key conditions: parallel_pipeline_task_num=2, 
disable_streaming_preaggregations=true
+    //  Expected: all rows same value; Actual: values split proportionally 
(1/3, 2/3)
+    // ============================================================
+
+    logger.info("=== Bug 3: incorrect results - GROUPING SETS + subquery + 
window ===")
+    // FE planner
+    def result_fe = sql """
+        SELECT /*+SET_VAR(parallel_pipeline_task_num=2,
+                          enable_local_shuffle_planner=true,
+                          disable_streaming_preaggregations=true,
+                          enable_share_hash_table_for_broadcast_join=true)*/
+            SUM((SELECT MAX(col_int_undef_signed2) FROM rqg_t1))
+                OVER (PARTITION BY pk ORDER BY pk)
+        FROM rqg_t1
+        GROUP BY GROUPING SETS ((col_int_undef_signed2, pk),(pk), (pk))
+    """
+    // BE native
+    def result_be = sql """
+        SELECT /*+SET_VAR(parallel_pipeline_task_num=2,
+                          enable_local_shuffle_planner=false,
+                          disable_streaming_preaggregations=true,
+                          enable_share_hash_table_for_broadcast_join=true)*/
+            SUM((SELECT MAX(col_int_undef_signed2) FROM rqg_t1))
+                OVER (PARTITION BY pk ORDER BY pk)
+        FROM rqg_t1
+        GROUP BY GROUPING SETS ((col_int_undef_signed2, pk),(pk), (pk))
+    """
+    logger.info("Bug 3 FE result rows: ${result_fe.size()}, first few: 
${result_fe.take(5)}")
+    logger.info("Bug 3 BE result rows: ${result_be.size()}, first few: 
${result_be.take(5)}")
+
+    // All values in both should be the same
+    if (result_fe.size() != result_be.size()) {
+        logger.warn("Bug 3: row count mismatch FE=${result_fe.size()} vs 
BE=${result_be.size()}")
+    }
+
+    // ============================================================
+    //  Bug 4: Simplified AGG shared state — single table GROUP BY with serial 
exchange
+    //  Minimal reproduction attempt
+    // ============================================================
+
+    logger.info("=== Bug 4: Simplified AGG shared state ===")
+    for (int ppt : [2, 3, 4, 5]) {
+        try {
+            sql """
+                SELECT 
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=${ppt},
+                                  enable_local_shuffle_planner=true,
+                                  ignore_storage_data_distribution=true)*/
+                    col_int_undef_signed, count(*)
+                FROM rqg_t1
+                GROUP BY col_int_undef_signed
+                UNION ALL
+                SELECT col_int_undef_signed, count(*)
+                FROM rqg_t2
+                GROUP BY col_int_undef_signed
+            """
+            logger.info("Bug 4 ppt=${ppt}: PASSED")
+        } catch (Throwable t) {
+            logger.error("Bug 4 ppt=${ppt} FAILED: ${t.message}")
+        }
+    }
+
+    // ============================================================
+    //  Bug 5: GROUPING SETS + window variations with serial exchange
+    //  More variations to find minimal repro
+    // ============================================================
+
+    logger.info("=== Bug 5: GROUPING SETS + window variations ===")
+    for (int ppt : [2, 3, 4, 5]) {
+        try {
+            sql """
+                SELECT 
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=${ppt},
+                                  enable_local_shuffle_planner=true,
+                                  ignore_storage_data_distribution=true)*/
+                    pk, col_int_undef_signed,
+                    ROW_NUMBER() OVER (ORDER BY pk)
+                FROM rqg_t1
+                GROUP BY GROUPING SETS ((pk, col_int_undef_signed), (pk), ())
+                ORDER BY pk
+            """
+            logger.info("Bug 5 ppt=${ppt}: PASSED")
+        } catch (Throwable t) {
+            logger.error("Bug 5 ppt=${ppt} FAILED: ${t.message}")
+        }
+    }
+
+    // ============================================================
+    //  Bug 6: must set shared state, in CROSS_JOIN_OPERATOR
+    //  Root cause: nested NLJ + pooling scan — FE planner skipped BROADCAST
+    //  local exchange on outer NLJ's build side because child was NLJ (not 
ScanNode).
+    //  Fixed in NestedLoopJoinNode.enforceAndDeriveLocalExchange by using
+    //  fragment.useSerialSource() instead of instanceof ScanNode check.
+    //  This was the root cause of 989 RQG test failures (build 183677).
+    // ============================================================
+
+    logger.info("=== Bug 6: CROSS_JOIN shared state - nested NLJ + pooling 
scan (FE planner) ===")
+    try {
+        sql """
+            SELECT /*+SET_VAR(ignore_storage_data_distribution=true,
+                              parallel_pipeline_task_num=4,
+                              enable_local_shuffle_planner=true,
+                              disable_join_reorder=true,
+                              disable_colocate_plan=true,
+                              auto_broadcast_join_threshold=-1,
+                              broadcast_row_count_limit=0,
+                              query_timeout=60)*/
+                count(a.pk) AS cnt, a.col_int_undef_signed
+            FROM rqg_t1 a
+            LEFT JOIN rqg_t1 b ON b.col_int_undef_signed >= 
b.col_int_undef_signed
+            LEFT JOIN rqg_t1 c ON b.pk >= b.pk
+            WHERE a.pk IS NOT NULL
+            GROUP BY a.col_int_undef_signed
+            ORDER BY cnt, a.col_int_undef_signed
+        """
+        logger.info("Bug 6: PASSED (no CROSS_JOIN_OPERATOR shared state 
error)")
+    } catch (Throwable t) {
+        logger.error("Bug 6 FAILED: ${t.message}")
+        assertTrue(false, "Bug 6: must set shared state in 
CROSS_JOIN_OPERATOR: ${t.message}")
+    }
+
+    // ============================================================
+    //  Bug 7: DataStreamSink hang — sender fragment with pooling scan
+    //  Root cause: FE planner did not insert PASSTHROUGH at the root of 
pooling scan
+    //  sender fragments. With pooling scan, only instance 0 creates pipeline 
tasks,
+    //  so only 1 EOS is sent. The downstream ExchangeNode expects 
_num_instances EOSes
+    //  and hangs indefinitely.
+    //  Fixed in AddLocalExchange.addLocalExchangeForFragment: insert 
PASSTHROUGH
+    //  when isLocalShuffle && newRoot.isSerialOperator().
+    //  Any NLJ + pooling scan query triggers this via the UNPARTITIONED 
sender fragments.
+    // ============================================================
+
+    logger.info("=== Bug 7: DataStreamSink hang - NLJ + pooling scan sender 
(FE planner) ===")
+    try {
+        sql """
+            SELECT /*+SET_VAR(ignore_storage_data_distribution=true,
+                              parallel_pipeline_task_num=4,
+                              enable_local_shuffle_planner=true,
+                              disable_join_reorder=true,
+                              disable_colocate_plan=true,
+                              auto_broadcast_join_threshold=-1,
+                              broadcast_row_count_limit=0,
+                              query_timeout=60)*/
+                a.col_int_undef_signed, MAX(a.pk) AS mx
+            FROM rqg_t1 a
+            LEFT JOIN rqg_t1 b ON b.col_int_undef_signed < 
b.col_int_undef_signed
+            WHERE a.pk IS NOT NULL
+            GROUP BY a.col_int_undef_signed
+            ORDER BY a.col_int_undef_signed, mx
+        """
+        logger.info("Bug 7: PASSED (no hang)")
+    } catch (Throwable t) {
+        logger.error("Bug 7 FAILED: ${t.message}")
+        assertTrue(false, "Bug 7: DataStreamSink hang (query timed out or 
crashed): ${t.message}")
+    }
+
+    // ============================================================
+    //  Bug 8: must set shared state, in SORT_OPERATOR / UNION_OPERATOR
+    //  Root cause: FE planner + pooling scan + GROUPING SETS. Serial 
UNPARTITIONED
+    //  Exchange reduces downstream pipeline num_tasks to 1. SORT and UNION 
operators
+    //  need _num_instances tasks to inject shared state for all instances.
+    //  Fixed by: (1) restoring num_tasks raise for non-scan serial operators 
in BE
+    //  deferred exchanger creation (commit 920d43d), and (2) FE inserting 
PASSTHROUGH
+    //  after serial ExchangeNode in pooling scan fragments (commit d2e7fa2).
+    // ============================================================
+
+    logger.info("=== Bug 8a: SORT/UNION shared state - GROUPING SETS + pooling 
scan (FE planner) ===")
+    try {
+        sql """
+            SELECT /*+SET_VAR(ignore_storage_data_distribution=true,
+                              parallel_pipeline_task_num=4,
+                              enable_local_shuffle_planner=true,
+                              disable_streaming_preaggregations=true,
+                              query_timeout=60)*/
+                pk, col_int_undef_signed, SUM(col_int_undef_signed_not_null) 
AS sv
+            FROM rqg_t1
+            GROUP BY GROUPING SETS ((pk, col_int_undef_signed), (pk), ())
+            ORDER BY pk, col_int_undef_signed, sv
+        """
+        logger.info("Bug 8a: PASSED (no SORT/UNION_OPERATOR shared state 
error)")
+    } catch (Throwable t) {
+        logger.error("Bug 8a FAILED: ${t.message}")
+        assertTrue(false, "Bug 8a: must set shared state in 
SORT/UNION_OPERATOR: ${t.message}")
+    }
+
+    logger.info("=== Bug 8b: SORT shared state - window + GROUPING SETS + 
pooling scan (FE planner) ===")
+    try {
+        sql """
+            SELECT /*+SET_VAR(ignore_storage_data_distribution=true,
+                              parallel_pipeline_task_num=4,
+                              enable_local_shuffle_planner=true,
+                              disable_streaming_preaggregations=true,
+                              query_timeout=60)*/
+                pk, SUM(col_int_undef_signed_not_null) AS sv,
+                ROW_NUMBER() OVER (ORDER BY pk) AS rn
+            FROM rqg_t1
+            GROUP BY GROUPING SETS ((pk), ())
+            ORDER BY pk, sv, rn
+        """
+        logger.info("Bug 8b: PASSED (no SORT_OPERATOR shared state error)")
+    } catch (Throwable t) {
+        logger.error("Bug 8b FAILED: ${t.message}")
+        assertTrue(false, "Bug 8b: must set shared state in SORT_OPERATOR 
(window+grouping_sets): ${t.message}")
+    }
+
+    // ============================================================
+    //  Bug 9: FE/BE result inconsistency — agg after NLJ + pooling scan
+    //  Root cause: StreamingAgg used fragment.useSerialSource()=true to 
require
+    //  PASSTHROUGH from child, but when child is NLJ (not directly a serial 
scan),
+    //  NLJ outputs ADAPTIVE_PASSTHROUGH. FE wrongly inserted an extra 
PASSTHROUGH
+    //  exchange between StreamingAgg and NLJ (5 extra 
LOCAL_EXCHANGE_SINK_OPERATOR
+    //  entries vs BE native).
+    //  Fixed in AggregationNode: only requirePassthrough when
+    //  children.get(0).isSerialOperator()=true, mirroring BE 
_child->is_serial_operator().
+    // ============================================================
+
+    logger.info("=== Bug 9: FE/BE result consistency - agg after NLJ + pooling 
scan ===")
+    def bug9_fe = sql """
+        SELECT /*+SET_VAR(ignore_storage_data_distribution=true,
+                          parallel_pipeline_task_num=4,
+                          enable_local_shuffle_planner=true,
+                          disable_join_reorder=true,
+                          disable_colocate_plan=true,
+                          auto_broadcast_join_threshold=-1,
+                          broadcast_row_count_limit=0)*/
+            a.col_int_undef_signed, MAX(a.pk) AS mx
+        FROM rqg_t1 a LEFT JOIN rqg_t1 b ON b.col_int_undef_signed < 
b.col_int_undef_signed
+        WHERE a.pk IS NOT NULL
+        GROUP BY a.col_int_undef_signed
+        ORDER BY a.col_int_undef_signed, mx
+    """
+    def bug9_be = sql """
+        SELECT /*+SET_VAR(ignore_storage_data_distribution=true,
+                          parallel_pipeline_task_num=4,
+                          enable_local_shuffle_planner=false,
+                          disable_join_reorder=true,
+                          disable_colocate_plan=true,
+                          auto_broadcast_join_threshold=-1,
+                          broadcast_row_count_limit=0)*/
+            a.col_int_undef_signed, MAX(a.pk) AS mx
+        FROM rqg_t1 a LEFT JOIN rqg_t1 b ON b.col_int_undef_signed < 
b.col_int_undef_signed
+        WHERE a.pk IS NOT NULL
+        GROUP BY a.col_int_undef_signed
+        ORDER BY a.col_int_undef_signed, mx
+    """
+    logger.info("Bug 9 FE rows: ${bug9_fe.size()}, BE rows: ${bug9_be.size()}")
+    assertEquals(bug9_be.size(), bug9_fe.size(), "Bug 9: FE/BE row count 
mismatch")
+    assertEquals(bug9_be, bug9_fe, "Bug 9: FE/BE result mismatch for agg after 
NLJ + pooling scan")
+    logger.info("Bug 9: PASSED (FE/BE results match)")
+
+    logger.info("=== All RQG bug reproduction tests completed ===")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to