This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2fc59a7cf527befaae3fa5b33e2fe9a6cc893fa1 Author: 924060929 <[email protected]> AuthorDate: Tue Mar 31 14:34:04 2026 +0800 [test](local shuffle) add FE/BE consistency and RQG regression tests test_local_shuffle_fe_be_consistency.groovy: covers 55 cases comparing LOCAL_EXCHANGE_SINK_OPERATOR profile counts between FE-planned and BE-native local exchange. All 55 now MATCH (0 knownDiff, 0 mismatch). test_local_shuffle_rqg_bugs.groovy: reproduces 9 bugs found by RQG: - Bug 1/2: must set shared state AGGREGATION/SORT_OPERATOR (serial exchange) - Bug 3: incorrect results GROUPING SETS + subquery + window - Bug 4/5: GROUPING SETS + window variations with serial exchange - Bug 6: must set shared state CROSS_JOIN_OPERATOR (nested NLJ + pooling) - Bug 7: DataStreamSink hang — sender fragment with pooling scan - Bug 8a/b: SORT/UNION_OPERATOR shared state — GROUPING SETS + pooling scan - Bug 9: FE/BE result inconsistency — agg after NLJ + pooling scan print_known_diff_trees.groovy: helper to print BE operator trees for previously-knownDiff cases during investigation. --- .../local_shuffle/print_known_diff_trees.groovy | 80 ++++ .../test_local_shuffle_rqg_bugs.groovy | 456 +++++++++++++++++++++ 2 files changed, 536 insertions(+) diff --git a/regression-test/suites/nereids_p0/local_shuffle/print_known_diff_trees.groovy b/regression-test/suites/nereids_p0/local_shuffle/print_known_diff_trees.groovy new file mode 100644 index 00000000000..7ce3fd2352e --- /dev/null +++ b/regression-test/suites/nereids_p0/local_shuffle/print_known_diff_trees.groovy @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("print_known_diff_trees", "p0") { + sql "use regression_test_nereids_p0_local_shuffle" + + sql "set enable_pipeline_engine=true;" + sql "set enable_local_shuffle_planner=false;" + sql "set disable_join_reorder=true;" + sql "set disable_colocate_plan=true;" + sql "set ignore_storage_data_distribution=false;" + sql "set parallel_pipeline_task_num=4;" + sql "set auto_broadcast_join_threshold=-1;" + sql "set broadcast_row_count_limit=0;" + + def printBETree = { String tag, String testSql -> + def tree = profile_plan_tree_from_sql(testSql) + logger.info("=== [${tag}] BE operator tree ===\nSQL: ${testSql.trim()}\n${tree}") + } + + printBETree("analytic_no_partition", + "SELECT k1, sum(v1) OVER() AS s FROM ls_t1 ORDER BY k1, s") + + printBETree("distinct_non_bucket_key", + "SELECT DISTINCT k2 FROM ls_t1 ORDER BY k2") + + printBETree("set_intersect_three_way", + """SELECT k1 FROM ls_t1 + INTERSECT + SELECT k1 FROM ls_t2 + INTERSECT + SELECT k1 FROM ls_t3 + ORDER BY k1""") + + printBETree("union_all_followed_by_agg", + """SELECT k1, count(*) AS cnt + FROM ( + SELECT k1, v1 AS v FROM ls_t1 + UNION ALL + SELECT k1, v2 AS v FROM ls_t2 + ) u + GROUP BY k1 + ORDER BY k1""") + + printBETree("table_function", + """SELECT k1, e1 FROM ls_t1 + LATERAL VIEW explode_numbers(v1) tmp AS e1 + ORDER BY k1, e1 LIMIT 20""") + + printBETree("agg_after_shuffle_join_non_bucket_key", + """SELECT a.k2, count(*) AS cnt + FROM ls_t1 a JOIN [shuffle] ls_t2 b ON a.k1 = b.k1 + GROUP BY a.k2 + ORDER BY a.k2""") + + printBETree("agg_after_broadcast_join", + """SELECT a.k1, count(*) AS cnt + FROM ls_t1 a JOIN [broadcast] ls_t2 b ON a.k1 = b.k1 + GROUP BY a.k1 + ORDER BY a.k1""") + + printBETree("agg_finalize_serial_pooling_bucket", + "SELECT /*+SET_VAR(disable_join_reorder=true,disable_colocate_plan=true,ignore_storage_data_distribution=true,parallel_pipeline_task_num=4,auto_broadcast_join_threshold=-1,broadcast_row_count_limit=0)*/k1, count(*) AS cnt FROM ls_serial GROUP BY k1 ORDER BY k1", + ) + +} diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy new file mode 100644 index 00000000000..7950f54ee50 --- /dev/null +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/** + * Regression tests for bugs discovered by RQG testing on the local-exchange2 branch. + * + * These queries triggered "must set shared state" errors or incorrect results + * in RQG build 183992. Common conditions: + * - use_serial_exchange=true (makes ALL Exchanges serial, not just UNPARTITIONED) + * - enable_local_shuffle_planner=true (FE-planned local exchange) + * - parallel_pipeline_task_num > 1 + * + * Error types reproduced: + * 1. must set shared state, in AGGREGATION_OPERATOR + * 2. must set shared state, in SORT_OPERATOR + * 3. incorrect results with GROUPING SETS + scalar subquery + window function + */ +suite("test_local_shuffle_rqg_bugs") { + + // ============================================================ + // Table setup — mirrors RQG table structure + // 10 buckets to match RQG (replication_num=1 for single-BE testing) + // ============================================================ + sql "DROP TABLE IF EXISTS rqg_t1" + sql "DROP TABLE IF EXISTS rqg_t2" + + sql """ + CREATE TABLE rqg_t1 ( + pk INT NOT NULL, + col_int_undef_signed INT, + col_int_undef_signed2 INT, + col_int_undef_signed_not_null INT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(pk) + DISTRIBUTED BY HASH(pk) BUCKETS 10 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + CREATE TABLE rqg_t2 ( + pk INT NOT NULL, + col_int_undef_signed INT, + col_int_undef_signed2 INT, + col_bigint_undef_signed_not_null BIGINT NOT NULL, + col_decimal_38_10__undef_signed_not_null DECIMAL(38,10) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(pk) + DISTRIBUTED BY HASH(pk) BUCKETS 10 + PROPERTIES ("replication_num" = "1") + """ + + // Insert enough rows to exercise multiple pipeline tasks + sql """ + INSERT INTO rqg_t1 VALUES + (0, 0, 10, 0), (1, 1, 11, 1), (2, 2, 12, 2), (3, 3, 13, 3), + (4, 4, 14, 4), (5, 5, 15, 5), (6, 6, 16, 6), (7, 7, 17, 7), + (8, 8, 18, 8), (9, 9, 19, 9), (10, 0, 20, 10), (11, 1, 21, 11), + (12, 2, 22, 12), (13, 3, 23, 13), (14, 4, 24, 14), (15, 5, 25, 15), + (16, 6, 26, 16), (17, 7, 27, 17), (18, 8, 28, 18), (19, 9, 29, 19) + """ + + sql """ + INSERT INTO rqg_t2 VALUES + (0, 0, 10, 100, 1.5), (1, 1, 11, 101, 2.5), (2, 2, 12, 102, 3.5), + (3, 3, 13, 103, 4.5), (4, 4, 14, 104, 5.5), (5, 5, 15, 105, 6.5), + (6, 6, 16, 106, 7.5), (7, 7, 17, 107, 8.5), (8, 8, 18, 108, 9.5), + (9, 9, 19, 109, 10.5), (10, 0, 20, 110, 11.5), (11, 1, 21, 111, 12.5), + (12, 2, 22, 112, 13.5), (13, 3, 23, 113, 14.5), (14, 4, 24, 114, 15.5), + (15, 5, 25, 115, 16.5), (16, 6, 26, 116, 17.5), (17, 7, 27, 117, 18.5), + (18, 8, 28, 118, 19.5), (19, 9, 29, 119, 20.5) + """ + + // Wait for data to be visible + Thread.sleep(5000) + + // ============================================================ + // Common settings + // ============================================================ + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET runtime_filter_mode=off" + sql "SET enable_profile=true" + sql "SET enable_sql_cache=false" + sql "SET enable_local_shuffle=true" + + // ============================================================ + // Bug 1: must set shared state, in AGGREGATION_OPERATOR + // RQG case: eliminate_group_by_uniform.case_id_11007680713 + // Key conditions: use_serial_exchange=true, parallel_pipeline_task_num=3 + // SQL: EXCEPT with count(*) GROUP BY on both sides + // ============================================================ + + // Test with FE planner (the buggy path) + logger.info("=== Bug 1a: AGG shared state - EXCEPT with serial exchange (FE planner) ===") + try { + sql """ + SELECT /*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=3, + enable_local_shuffle_planner=true, + ignore_storage_data_distribution=true, + enable_common_expr_pushdown=false, + disable_streaming_preaggregations=true)*/ + col_int_undef_signed_not_null as col1, + col_int_undef_signed_not_null as col2, + 0 as col3, count(1) + FROM rqg_t1 + GROUP BY col1, col2, col3 + EXCEPT + SELECT col_bigint_undef_signed_not_null as col1, + col_decimal_38_10__undef_signed_not_null as col2, + 5 as col3, count(1) + FROM rqg_t2 + GROUP BY col1, col2, col3 + """ + logger.info("Bug 1a: PASSED (no crash)") + } catch (Throwable t) { + logger.error("Bug 1a FAILED: ${t.message}") + assertTrue(false, "Bug 1a: must set shared state in AGGREGATION_OPERATOR: ${t.message}") + } + + // Compare with BE native planner + logger.info("=== Bug 1b: AGG shared state - EXCEPT with serial exchange (BE native) ===") + try { + sql """ + SELECT /*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=3, + enable_local_shuffle_planner=false, + ignore_storage_data_distribution=true, + enable_common_expr_pushdown=false, + disable_streaming_preaggregations=true)*/ + col_int_undef_signed_not_null as col1, + col_int_undef_signed_not_null as col2, + 0 as col3, count(1) + FROM rqg_t1 + GROUP BY col1, col2, col3 + EXCEPT + SELECT col_bigint_undef_signed_not_null as col1, + col_decimal_38_10__undef_signed_not_null as col2, + 5 as col3, count(1) + FROM rqg_t2 + GROUP BY col1, col2, col3 + """ + logger.info("Bug 1b: PASSED (no crash)") + } catch (Throwable t) { + logger.error("Bug 1b FAILED: ${t.message}") + assertTrue(false, "Bug 1b: BE native also fails: ${t.message}") + } + + // ============================================================ + // Bug 2: must set shared state, in SORT_OPERATOR + // RQG case: grouping_set.case_id_5308471751 + // Key conditions: use_serial_exchange=true, parallel_pipeline_task_num=5 + // SQL: GROUPING SETS + window function (PERCENT_RANK) + // ============================================================ + + logger.info("=== Bug 2a: SORT shared state - GROUPING SETS + window (FE planner) ===") + try { + sql """ + SELECT /*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=5, + enable_local_shuffle_planner=true, + ignore_storage_data_distribution=true, + enable_share_hash_table_for_broadcast_join=false, + disable_streaming_preaggregations=true)*/ + SUM(PERCENT_RANK() OVER (PARTITION BY col_int_undef_signed2 ORDER BY col_int_undef_signed2)) + FROM rqg_t1 + GROUP BY GROUPING SETS ((col_int_undef_signed2),(pk, pk),(col_int_undef_signed)) + """ + logger.info("Bug 2a: PASSED (no crash)") + } catch (Throwable t) { + logger.error("Bug 2a FAILED: ${t.message}") + assertTrue(false, "Bug 2a: must set shared state in SORT_OPERATOR: ${t.message}") + } + + logger.info("=== Bug 2b: SORT shared state - GROUPING SETS + window (BE native) ===") + try { + sql """ + SELECT /*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=5, + enable_local_shuffle_planner=false, + ignore_storage_data_distribution=true, + enable_share_hash_table_for_broadcast_join=false, + disable_streaming_preaggregations=true)*/ + SUM(PERCENT_RANK() OVER (PARTITION BY col_int_undef_signed2 ORDER BY col_int_undef_signed2)) + FROM rqg_t1 + GROUP BY GROUPING SETS ((col_int_undef_signed2),(pk, pk),(col_int_undef_signed)) + """ + logger.info("Bug 2b: PASSED (no crash)") + } catch (Throwable t) { + logger.error("Bug 2b FAILED: ${t.message}") + assertTrue(false, "Bug 2b: BE native also fails: ${t.message}") + } + + // ============================================================ + // Bug 3: incorrect results with GROUPING SETS + scalar subquery + window + // RQG case: grouping_set.case_id_5694495756 + // Key conditions: parallel_pipeline_task_num=2, disable_streaming_preaggregations=true + // Expected: all rows same value; Actual: values split proportionally (1/3, 2/3) + // ============================================================ + + logger.info("=== Bug 3: incorrect results - GROUPING SETS + subquery + window ===") + // FE planner + def result_fe = sql """ + SELECT /*+SET_VAR(parallel_pipeline_task_num=2, + enable_local_shuffle_planner=true, + disable_streaming_preaggregations=true, + enable_share_hash_table_for_broadcast_join=true)*/ + SUM((SELECT MAX(col_int_undef_signed2) FROM rqg_t1)) + OVER (PARTITION BY pk ORDER BY pk) + FROM rqg_t1 + GROUP BY GROUPING SETS ((col_int_undef_signed2, pk),(pk), (pk)) + """ + // BE native + def result_be = sql """ + SELECT /*+SET_VAR(parallel_pipeline_task_num=2, + enable_local_shuffle_planner=false, + disable_streaming_preaggregations=true, + enable_share_hash_table_for_broadcast_join=true)*/ + SUM((SELECT MAX(col_int_undef_signed2) FROM rqg_t1)) + OVER (PARTITION BY pk ORDER BY pk) + FROM rqg_t1 + GROUP BY GROUPING SETS ((col_int_undef_signed2, pk),(pk), (pk)) + """ + logger.info("Bug 3 FE result rows: ${result_fe.size()}, first few: ${result_fe.take(5)}") + logger.info("Bug 3 BE result rows: ${result_be.size()}, first few: ${result_be.take(5)}") + + // All values in both should be the same + if (result_fe.size() != result_be.size()) { + logger.warn("Bug 3: row count mismatch FE=${result_fe.size()} vs BE=${result_be.size()}") + } + + // ============================================================ + // Bug 4: Simplified AGG shared state — single table GROUP BY with serial exchange + // Minimal reproduction attempt + // ============================================================ + + logger.info("=== Bug 4: Simplified AGG shared state ===") + for (int ppt : [2, 3, 4, 5]) { + try { + sql """ + SELECT /*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=${ppt}, + enable_local_shuffle_planner=true, + ignore_storage_data_distribution=true)*/ + col_int_undef_signed, count(*) + FROM rqg_t1 + GROUP BY col_int_undef_signed + UNION ALL + SELECT col_int_undef_signed, count(*) + FROM rqg_t2 + GROUP BY col_int_undef_signed + """ + logger.info("Bug 4 ppt=${ppt}: PASSED") + } catch (Throwable t) { + logger.error("Bug 4 ppt=${ppt} FAILED: ${t.message}") + } + } + + // ============================================================ + // Bug 5: GROUPING SETS + window variations with serial exchange + // More variations to find minimal repro + // ============================================================ + + logger.info("=== Bug 5: GROUPING SETS + window variations ===") + for (int ppt : [2, 3, 4, 5]) { + try { + sql """ + SELECT /*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=${ppt}, + enable_local_shuffle_planner=true, + ignore_storage_data_distribution=true)*/ + pk, col_int_undef_signed, + ROW_NUMBER() OVER (ORDER BY pk) + FROM rqg_t1 + GROUP BY GROUPING SETS ((pk, col_int_undef_signed), (pk), ()) + ORDER BY pk + """ + logger.info("Bug 5 ppt=${ppt}: PASSED") + } catch (Throwable t) { + logger.error("Bug 5 ppt=${ppt} FAILED: ${t.message}") + } + } + + // ============================================================ + // Bug 6: must set shared state, in CROSS_JOIN_OPERATOR + // Root cause: nested NLJ + pooling scan — FE planner skipped BROADCAST + // local exchange on outer NLJ's build side because child was NLJ (not ScanNode). + // Fixed in NestedLoopJoinNode.enforceAndDeriveLocalExchange by using + // fragment.useSerialSource() instead of instanceof ScanNode check. + // This was the root cause of 989 RQG test failures (build 183677). + // ============================================================ + + logger.info("=== Bug 6: CROSS_JOIN shared state - nested NLJ + pooling scan (FE planner) ===") + try { + sql """ + SELECT /*+SET_VAR(ignore_storage_data_distribution=true, + parallel_pipeline_task_num=4, + enable_local_shuffle_planner=true, + disable_join_reorder=true, + disable_colocate_plan=true, + auto_broadcast_join_threshold=-1, + broadcast_row_count_limit=0, + query_timeout=60)*/ + count(a.pk) AS cnt, a.col_int_undef_signed + FROM rqg_t1 a + LEFT JOIN rqg_t1 b ON b.col_int_undef_signed >= b.col_int_undef_signed + LEFT JOIN rqg_t1 c ON b.pk >= b.pk + WHERE a.pk IS NOT NULL + GROUP BY a.col_int_undef_signed + ORDER BY cnt, a.col_int_undef_signed + """ + logger.info("Bug 6: PASSED (no CROSS_JOIN_OPERATOR shared state error)") + } catch (Throwable t) { + logger.error("Bug 6 FAILED: ${t.message}") + assertTrue(false, "Bug 6: must set shared state in CROSS_JOIN_OPERATOR: ${t.message}") + } + + // ============================================================ + // Bug 7: DataStreamSink hang — sender fragment with pooling scan + // Root cause: FE planner did not insert PASSTHROUGH at the root of pooling scan + // sender fragments. With pooling scan, only instance 0 creates pipeline tasks, + // so only 1 EOS is sent. The downstream ExchangeNode expects _num_instances EOSes + // and hangs indefinitely. + // Fixed in AddLocalExchange.addLocalExchangeForFragment: insert PASSTHROUGH + // when isLocalShuffle && newRoot.isSerialOperator(). + // Any NLJ + pooling scan query triggers this via the UNPARTITIONED sender fragments. + // ============================================================ + + logger.info("=== Bug 7: DataStreamSink hang - NLJ + pooling scan sender (FE planner) ===") + try { + sql """ + SELECT /*+SET_VAR(ignore_storage_data_distribution=true, + parallel_pipeline_task_num=4, + enable_local_shuffle_planner=true, + disable_join_reorder=true, + disable_colocate_plan=true, + auto_broadcast_join_threshold=-1, + broadcast_row_count_limit=0, + query_timeout=60)*/ + a.col_int_undef_signed, MAX(a.pk) AS mx + FROM rqg_t1 a + LEFT JOIN rqg_t1 b ON b.col_int_undef_signed < b.col_int_undef_signed + WHERE a.pk IS NOT NULL + GROUP BY a.col_int_undef_signed + ORDER BY a.col_int_undef_signed, mx + """ + logger.info("Bug 7: PASSED (no hang)") + } catch (Throwable t) { + logger.error("Bug 7 FAILED: ${t.message}") + assertTrue(false, "Bug 7: DataStreamSink hang (query timed out or crashed): ${t.message}") + } + + // ============================================================ + // Bug 8: must set shared state, in SORT_OPERATOR / UNION_OPERATOR + // Root cause: FE planner + pooling scan + GROUPING SETS. Serial UNPARTITIONED + // Exchange reduces downstream pipeline num_tasks to 1. SORT and UNION operators + // need _num_instances tasks to inject shared state for all instances. + // Fixed by: (1) restoring num_tasks raise for non-scan serial operators in BE + // deferred exchanger creation (commit 920d43d), and (2) FE inserting PASSTHROUGH + // after serial ExchangeNode in pooling scan fragments (commit d2e7fa2). + // ============================================================ + + logger.info("=== Bug 8a: SORT/UNION shared state - GROUPING SETS + pooling scan (FE planner) ===") + try { + sql """ + SELECT /*+SET_VAR(ignore_storage_data_distribution=true, + parallel_pipeline_task_num=4, + enable_local_shuffle_planner=true, + disable_streaming_preaggregations=true, + query_timeout=60)*/ + pk, col_int_undef_signed, SUM(col_int_undef_signed_not_null) AS sv + FROM rqg_t1 + GROUP BY GROUPING SETS ((pk, col_int_undef_signed), (pk), ()) + ORDER BY pk, col_int_undef_signed, sv + """ + logger.info("Bug 8a: PASSED (no SORT/UNION_OPERATOR shared state error)") + } catch (Throwable t) { + logger.error("Bug 8a FAILED: ${t.message}") + assertTrue(false, "Bug 8a: must set shared state in SORT/UNION_OPERATOR: ${t.message}") + } + + logger.info("=== Bug 8b: SORT shared state - window + GROUPING SETS + pooling scan (FE planner) ===") + try { + sql """ + SELECT /*+SET_VAR(ignore_storage_data_distribution=true, + parallel_pipeline_task_num=4, + enable_local_shuffle_planner=true, + disable_streaming_preaggregations=true, + query_timeout=60)*/ + pk, SUM(col_int_undef_signed_not_null) AS sv, + ROW_NUMBER() OVER (ORDER BY pk) AS rn + FROM rqg_t1 + GROUP BY GROUPING SETS ((pk), ()) + ORDER BY pk, sv, rn + """ + logger.info("Bug 8b: PASSED (no SORT_OPERATOR shared state error)") + } catch (Throwable t) { + logger.error("Bug 8b FAILED: ${t.message}") + assertTrue(false, "Bug 8b: must set shared state in SORT_OPERATOR (window+grouping_sets): ${t.message}") + } + + // ============================================================ + // Bug 9: FE/BE result inconsistency — agg after NLJ + pooling scan + // Root cause: StreamingAgg used fragment.useSerialSource()=true to require + // PASSTHROUGH from child, but when child is NLJ (not directly a serial scan), + // NLJ outputs ADAPTIVE_PASSTHROUGH. FE wrongly inserted an extra PASSTHROUGH + // exchange between StreamingAgg and NLJ (5 extra LOCAL_EXCHANGE_SINK_OPERATOR + // entries vs BE native). + // Fixed in AggregationNode: only requirePassthrough when + // children.get(0).isSerialOperator()=true, mirroring BE _child->is_serial_operator(). + // ============================================================ + + logger.info("=== Bug 9: FE/BE result consistency - agg after NLJ + pooling scan ===") + def bug9_fe = sql """ + SELECT /*+SET_VAR(ignore_storage_data_distribution=true, + parallel_pipeline_task_num=4, + enable_local_shuffle_planner=true, + disable_join_reorder=true, + disable_colocate_plan=true, + auto_broadcast_join_threshold=-1, + broadcast_row_count_limit=0)*/ + a.col_int_undef_signed, MAX(a.pk) AS mx + FROM rqg_t1 a LEFT JOIN rqg_t1 b ON b.col_int_undef_signed < b.col_int_undef_signed + WHERE a.pk IS NOT NULL + GROUP BY a.col_int_undef_signed + ORDER BY a.col_int_undef_signed, mx + """ + def bug9_be = sql """ + SELECT /*+SET_VAR(ignore_storage_data_distribution=true, + parallel_pipeline_task_num=4, + enable_local_shuffle_planner=false, + disable_join_reorder=true, + disable_colocate_plan=true, + auto_broadcast_join_threshold=-1, + broadcast_row_count_limit=0)*/ + a.col_int_undef_signed, MAX(a.pk) AS mx + FROM rqg_t1 a LEFT JOIN rqg_t1 b ON b.col_int_undef_signed < b.col_int_undef_signed + WHERE a.pk IS NOT NULL + GROUP BY a.col_int_undef_signed + ORDER BY a.col_int_undef_signed, mx + """ + logger.info("Bug 9 FE rows: ${bug9_fe.size()}, BE rows: ${bug9_be.size()}") + assertEquals(bug9_be.size(), bug9_fe.size(), "Bug 9: FE/BE row count mismatch") + assertEquals(bug9_be, bug9_fe, "Bug 9: FE/BE result mismatch for agg after NLJ + pooling scan") + logger.info("Bug 9: PASSED (FE/BE results match)") + + logger.info("=== All RQG bug reproduction tests completed ===") +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
