This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0985f5b344b6f4fd995aeb283aecbaadd8e0e787 Author: 924060929 <[email protected]> AuthorDate: Wed Apr 1 13:46:53 2026 +0800 [fix](local shuffle) fix DistinctStreamingAgg hash requirement and add RQG regression tests 1. AggregationNode: fix canUseDistinctStreamingAgg branch condition from size() > 1 to !isEmpty(), matching BE's DistinctStreamingAggOperatorX which checks !_probe_expr_ctxs.empty(). With 1 group key and disable_streaming_preaggregations=true, FE was not requiring hash exchange while BE did. 2. Add Bug 13 (NLJ COREDUMP) test case to test_local_shuffle_rqg_bugs. 3. Add 3 RQG-derived cases to test_local_shuffle_fe_be_consistency: serial NLJ RIGHT OUTER, GLOBAL_HASH self-join+NLJ, FULL OUTER JOIN. --- .../org/apache/doris/planner/AggregationNode.java | 2 +- .../org/apache/doris/planner/HashJoinNode.java | 11 +- .../test_local_shuffle_fe_be_consistency.groovy | 42 +++ .../test_local_shuffle_rqg_bugs.groovy | 323 +++++++++++++++++++++ 4 files changed, 376 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index 3b1aa65230f..885d911bc71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -279,7 +279,7 @@ public class AggregationNode extends PlanNode { LocalExchangeTypeRequire requireChild; if (canUseDistinctStreamingAgg(sessionVariable)) { // DistinctStreamingAggOperatorX in BE - if (needsFinalize || (aggInfo.getGroupingExprs().size() > 1 && !useStreamingPreagg)) { + if (needsFinalize || (!aggInfo.getGroupingExprs().isEmpty() && !useStreamingPreagg)) { if (AddLocalExchange.isColocated(this)) { requireChild = LocalExchangeTypeRequire.requireHash(); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index b65d8355888..dfac5dcab58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -322,7 +322,16 @@ public class HashJoinNode extends JoinNodeBase { // without inserting a redundant local exchange. outputType = probeChildSerial ? LocalExchangeType.PASSTHROUGH : null; } else if (AddLocalExchange.isColocated(this) || isBucketShuffle()) { - buildSideRequire = probeSideRequire = LocalExchangeTypeRequire.requireBucketHash(); + boolean buildChildSerial = isSerialChildInThrift(translatorContext, children.get(1)); + probeSideRequire = LocalExchangeTypeRequire.requireBucketHash(); + // When build child is serial (pooling scan), the serial Exchange reduces the build + // pipeline's num_tasks to _parallel_instances=1 via add_operator(). This causes + // instance 1+ to have probe tasks without build tasks → shared state injection + // fails with "must set shared state". Use requirePassToOne to insert a local + // exchange that restores num_tasks, matching BE-native's PASSTHROUGH insertion. + buildSideRequire = buildChildSerial + ? LocalExchangeTypeRequire.requirePassToOne() + : LocalExchangeTypeRequire.requireBucketHash(); outputType = AddLocalExchange.resolveExchangeType( LocalExchangeTypeRequire.requireBucketHash(), translatorContext, this, children.get(0)); diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy index 2ca0d09f880..c27be60927b 100644 --- a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy @@ -860,6 +860,48 @@ suite("test_local_shuffle_fe_be_consistency") { GROUP BY GROUPING SETS ((k1), ()) ORDER BY k1, rn""") + // ============================================================ + // 14. RQG bug cases — serial NLJ + pooling scan (Bug 13 from rqg_bugs) + // Serial NLJ (RIGHT_OUTER) with pooling scan. Previously crashed because + // FE inserted BROADCAST on build side inflating num_tasks while probe stayed + // serial. Fixed: serial NLJ sets buildSideRequire=noRequire(). + // ============================================================ + checkConsistencyWithSql("rqg_serial_nlj_right_outer_pooling", + """SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=0, + ignore_storage_data_distribution=true, + enable_share_hash_table_for_broadcast_join=false, + disable_streaming_preaggregations=true, + disable_join_reorder=true)*/ + b.k1 AS field1 + FROM ls_serial a + RIGHT OUTER JOIN ls_serial b ON a.v1 > b.v1 + GROUP BY field1 + ORDER BY field1 ASC""") + + // GLOBAL_HASH_SHUFFLE fix (Bug 10 from rqg_bugs) — self-join + NLJ with serial exchange + checkConsistencyWithSql("rqg_global_hash_shuffle_self_join_nlj", + """SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=4, + ignore_storage_data_distribution=true, + disable_join_reorder=true, disable_colocate_plan=true)*/ + a.k1 AS field1, a.v1 AS field2 + FROM ls_t1 a + LEFT JOIN ls_t1 b ON a.k1 = b.k2 + LEFT JOIN ls_t1 c ON a.k1 > b.k2 + WHERE a.v1 > 5 + GROUP BY field1, field2 + ORDER BY field1, field2""") + + // FULL OUTER JOIN + GROUP BY with serial exchange (Bug 11 from rqg_bugs) + checkConsistencyWithSql("rqg_global_hash_full_outer_join", + """SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=4, + ignore_storage_data_distribution=true)*/ + a.k1, b.k1, count(1) + FROM ls_t1 a + FULL OUTER JOIN ls_t2 b ON a.k1 = b.k1 + WHERE b.k1 = 2 + GROUP BY a.k1, b.k1 + ORDER BY 1, 2, 3""") + // ================================================================ // Phase 2: Fetch all profiles and compare results // By now, all queries have been executed and profiles are being diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy index 7950f54ee50..c69d376549f 100644 --- a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy @@ -37,6 +37,8 @@ suite("test_local_shuffle_rqg_bugs") { // ============================================================ sql "DROP TABLE IF EXISTS rqg_t1" sql "DROP TABLE IF EXISTS rqg_t2" + sql "DROP TABLE IF EXISTS rqg_t3" + sql "DROP TABLE IF EXISTS rqg_t4" sql """ CREATE TABLE rqg_t1 ( @@ -63,6 +65,52 @@ suite("test_local_shuffle_rqg_bugs") { PROPERTIES ("replication_num" = "1") """ + // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar + bigint columns + sql """ + CREATE TABLE rqg_t3 ( + pk INT NOT NULL, + col_bigint_undef_signed BIGINT, + col_varchar_10__undef_signed VARCHAR(10), + col_varchar_64__undef_signed VARCHAR(64) + ) ENGINE=OLAP + DUPLICATE KEY(pk) + DISTRIBUTED BY HASH(pk) BUCKETS 10 + PROPERTIES ("replication_num" = "1") + """ + + // Second table for FULL OUTER JOIN case (col_bigint_undef_signed_not_null) + sql """ + CREATE TABLE rqg_t4 ( + pk INT NOT NULL, + col_bigint_undef_signed_not_null BIGINT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(pk) + DISTRIBUTED BY HASH(pk) BUCKETS 10 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + INSERT INTO rqg_t3 VALUES + (0, -94, 'Abc', 'hello world'), + (1, 672609, 'Xyz', null), + (2, -3766684, 'Pqr', 'test string'), + (3, 5070261, 'abc', 'another row'), + (4, null, 'def', 'value four'), + (5, -86, 'XgpxlHBLEM', null), + (6, 21910, 'abc', 'they'), + (7, -63, 'zzzz', 'some text'), + (8, -8276281, 'AHlvNtoGLO', 'longer string here'), + (9, -101, 'mid', 'final row') + """ + + sql """ + INSERT INTO rqg_t4 VALUES + (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), + (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), + (10, 2), (11, 2), (12, 2), (13, 3), (14, 4), + (15, 5), (16, 2), (17, 2), (18, 2), (19, 9) + """ + // Insert enough rows to exercise multiple pipeline tasks sql """ INSERT INTO rqg_t1 VALUES @@ -452,5 +500,280 @@ suite("test_local_shuffle_rqg_bugs") { assertEquals(bug9_be, bug9_fe, "Bug 9: FE/BE result mismatch for agg after NLJ + pooling scan") logger.info("Bug 9: PASSED (FE/BE results match)") + // ============================================================ + // Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched — self-join + NLJ + // RQG case: 906784672 (build 184181) + // Root cause: HashJoinNode used requireGlobalExecutionHash() → GLOBAL local exchange + // inserted when use_serial_exchange=true; shuffle_idx_to_instance_idx map has only + // 4 entries (1/BE) but GLOBAL hash needs N*dop entries → most rows unrouted (0 actual rows). + // Fixed: changed to requireHash() so resolveExchangeType() downgrades to LOCAL hash. + // SQL: self-join (table1 LEFT JOIN table1 table2 ON pk=col_bigint_undef_signed) + // then NLJ (LEFT JOIN table1 table3 ON pk > col_bigint_undef_signed) + // ============================================================ + + logger.info("=== Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched - self-join + NLJ (build 184181 case 906784672) ===") + def bug10_fe = sql """ + SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=4, + enable_local_shuffle_planner=true, + ignore_storage_data_distribution=true, + enable_sql_cache=false, + disable_join_reorder=true, disable_colocate_plan=true)*/ + table1.pk AS field1, table1.col_bigint_undef_signed AS field2 + FROM rqg_t3 AS table1 + LEFT JOIN rqg_t3 AS table2 ON table1.pk = table2.col_bigint_undef_signed + LEFT JOIN rqg_t3 AS table3 ON table1.pk > table2.col_bigint_undef_signed + WHERE (table1.col_varchar_10__undef_signed > 'AHlvNtoGLO' + AND table1.col_varchar_10__undef_signed < 'zzzz') + OR (table1.col_bigint_undef_signed = table1.pk AND table1.col_varchar_64__undef_signed IS NULL) + OR (table1.pk != table1.pk AND table1.pk <> 2) + GROUP BY field1, field2 + ORDER BY field1, field2 + """ + def bug10_be = sql """ + SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=4, + enable_local_shuffle_planner=false, + ignore_storage_data_distribution=true, + enable_sql_cache=false, + disable_join_reorder=true, disable_colocate_plan=true)*/ + table1.pk AS field1, table1.col_bigint_undef_signed AS field2 + FROM rqg_t3 AS table1 + LEFT JOIN rqg_t3 AS table2 ON table1.pk = table2.col_bigint_undef_signed + LEFT JOIN rqg_t3 AS table3 ON table1.pk > table2.col_bigint_undef_signed + WHERE (table1.col_varchar_10__undef_signed > 'AHlvNtoGLO' + AND table1.col_varchar_10__undef_signed < 'zzzz') + OR (table1.col_bigint_undef_signed = table1.pk AND table1.col_varchar_64__undef_signed IS NULL) + OR (table1.pk != table1.pk AND table1.pk <> 2) + GROUP BY field1, field2 + ORDER BY field1, field2 + """ + logger.info("Bug 10 FE rows: ${bug10_fe.size()}, BE rows: ${bug10_be.size()}") + assertEquals(bug10_be.size(), bug10_fe.size(), "Bug 10: FE/BE row count mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)") + assertEquals(bug10_be, bug10_fe, "Bug 10: FE/BE result mismatch for self-join + NLJ") + logger.info("Bug 10: PASSED") + + // ============================================================ + // Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched — FULL OUTER JOIN + GROUP BY + // RQG case: 11007681241 (build 184181) + // Same root cause as Bug 10. + // SQL: FULL OUTER JOIN on col_bigint_undef_signed_not_null with WHERE + GROUP BY + // ============================================================ + + logger.info("=== Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched - FULL OUTER JOIN + GROUP BY (build 184181 case 11007681241) ===") + def bug11_fe = sql """ + SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=4, + enable_local_shuffle_planner=true, + ignore_storage_data_distribution=true, + enable_sql_cache=false)*/ + t1.col_bigint_undef_signed_not_null, t2.col_bigint_undef_signed_not_null, count(1) + FROM rqg_t4 t1 + FULL OUTER JOIN rqg_t2 t2 + ON t1.col_bigint_undef_signed_not_null = t2.col_bigint_undef_signed_not_null + WHERE t2.col_bigint_undef_signed_not_null = 2 + GROUP BY t1.col_bigint_undef_signed_not_null, t2.col_bigint_undef_signed_not_null + ORDER BY 1, 2, 3 + """ + def bug11_be = sql """ + SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=4, + enable_local_shuffle_planner=false, + ignore_storage_data_distribution=true, + enable_sql_cache=false)*/ + t1.col_bigint_undef_signed_not_null, t2.col_bigint_undef_signed_not_null, count(1) + FROM rqg_t4 t1 + FULL OUTER JOIN rqg_t2 t2 + ON t1.col_bigint_undef_signed_not_null = t2.col_bigint_undef_signed_not_null + WHERE t2.col_bigint_undef_signed_not_null = 2 + GROUP BY t1.col_bigint_undef_signed_not_null, t2.col_bigint_undef_signed_not_null + ORDER BY 1, 2, 3 + """ + logger.info("Bug 11 FE rows: ${bug11_fe.size()}, BE rows: ${bug11_be.size()}") + assertEquals(bug11_be.size(), bug11_fe.size(), "Bug 11: FE/BE row count mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)") + assertEquals(bug11_be, bug11_fe, "Bug 11: FE/BE result mismatch for FULL OUTER JOIN + GROUP BY") + logger.info("Bug 11: PASSED") + + // ============================================================ + // Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched — LEFT JOIN + VARCHAR predicates + MIN() + // RQG case: 906784662 (build 184181) + // Same root cause as Bug 10/11. + // SQL: LEFT JOIN on pk with VARCHAR NOT IN / BETWEEN / IN predicates, MIN() aggregate + // ============================================================ + + logger.info("=== Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched - LEFT JOIN + VARCHAR predicates (build 184181 case 906784662) ===") + def bug12_fe = sql """ + SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=4, + enable_local_shuffle_planner=true, + ignore_storage_data_distribution=true, + enable_sql_cache=false, + disable_join_reorder=true, disable_colocate_plan=true)*/ + table1.pk AS field1, MIN(table1.pk) AS field2 + FROM rqg_t3 AS table1 + LEFT JOIN rqg_t1 AS table2 ON table2.pk = table1.pk + WHERE table1.col_varchar_64__undef_signed NOT IN ('they') + AND table1.col_varchar_10__undef_signed BETWEEN 'AHlvNtoGLO' AND 'z' + AND table1.pk IN (3, 6, 8, 9, 2) + GROUP BY field1 + ORDER BY field1, field2 ASC + """ + def bug12_be = sql """ + SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=4, + enable_local_shuffle_planner=false, + ignore_storage_data_distribution=true, + enable_sql_cache=false, + disable_join_reorder=true, disable_colocate_plan=true)*/ + table1.pk AS field1, MIN(table1.pk) AS field2 + FROM rqg_t3 AS table1 + LEFT JOIN rqg_t1 AS table2 ON table2.pk = table1.pk + WHERE table1.col_varchar_64__undef_signed NOT IN ('they') + AND table1.col_varchar_10__undef_signed BETWEEN 'AHlvNtoGLO' AND 'z' + AND table1.pk IN (3, 6, 8, 9, 2) + GROUP BY field1 + ORDER BY field1, field2 ASC + """ + logger.info("Bug 12 FE rows: ${bug12_fe.size()}, BE rows: ${bug12_be.size()}") + assertEquals(bug12_be.size(), bug12_fe.size(), "Bug 12: FE/BE row count mismatch (GLOBAL_HASH_SHUFFLE Rows mismatched)") + assertEquals(bug12_be, bug12_fe, "Bug 12: FE/BE result mismatch for LEFT JOIN + VARCHAR predicates") + logger.info("Bug 12: PASSED") + + // ============================================================ + // Bug 13: NLJ COREDUMP — serial NLJ + pooling scan + BROADCAST build side + // RQG build 184430, query c0dafc1bed0f4910 + // Root cause: serial NLJ (RIGHT_OUTER) with pooling scan inserted BROADCAST + // local exchange on build side, inflating build pipeline num_tasks to _num_instances + // while probe pipeline stayed at 1 task. Instance 1+ created build tasks without + // corresponding probe tasks → source_deps empty → set_ready_to_read() crash. + // Fixed: serial NLJ sets buildSideRequire=noRequire() to match BE-native + // num_tasks_of_parent()<=1 skip logic. + // ============================================================ + + logger.info("=== Bug 13: NLJ COREDUMP - serial NLJ + pooling scan (FE planner) ===") + try { + def bug13_fe = sql """ + SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=0, + enable_local_shuffle_planner=true, + ignore_storage_data_distribution=true, + enable_sql_cache=false, + enable_share_hash_table_for_broadcast_join=false, + disable_streaming_preaggregations=true, + disable_join_reorder=true)*/ + t2.col_bigint_undef_signed_not_null AS field1 + FROM rqg_t4 AS t1 + RIGHT OUTER JOIN rqg_t2 AS t2 ON t1.col_bigint_undef_signed_not_null > t2.col_bigint_undef_signed_not_null + GROUP BY field1 + ORDER BY field1 ASC + """ + def bug13_be = sql """ + SELECT /*+SET_VAR(use_serial_exchange=true, parallel_pipeline_task_num=0, + enable_local_shuffle_planner=false, + ignore_storage_data_distribution=true, + enable_sql_cache=false, + enable_share_hash_table_for_broadcast_join=false, + disable_streaming_preaggregations=true, + disable_join_reorder=true)*/ + t2.col_bigint_undef_signed_not_null AS field1 + FROM rqg_t4 AS t1 + RIGHT OUTER JOIN rqg_t2 AS t2 ON t1.col_bigint_undef_signed_not_null > t2.col_bigint_undef_signed_not_null + GROUP BY field1 + ORDER BY field1 ASC + """ + logger.info("Bug 13 FE rows: ${bug13_fe.size()}, BE rows: ${bug13_be.size()}") + assertEquals(bug13_be.size(), bug13_fe.size(), "Bug 13: FE/BE row count mismatch (NLJ COREDUMP)") + assertEquals(bug13_be, bug13_fe, "Bug 13: FE/BE result mismatch for serial NLJ + pooling scan") + logger.info("Bug 13: PASSED (no crash, results match)") + } catch (Throwable t) { + logger.error("Bug 13 FAILED: ${t.message}") + assertTrue(false, "Bug 13: NLJ COREDUMP (serial NLJ + pooling scan): ${t.message}") + } + + // ============================================================ + // Bug 14: BUCKET_SHUFFLE join + serial build Exchange — must set shared state + // RQG build 184563, cases 906784706/906784783/906784987/906785006 + // Root cause: BUCKET_SHUFFLE join build side ExchangeNode marked serial in + // pooling scan fragment → build pipeline num_tasks reduced to 1 → + // instance 1+ have probe tasks without build tasks → shared state injection + // fails. Fixed: isBucketShuffle() branch checks buildChildSerial and uses + // requirePassToOne() to restore num_tasks, matching BE-native behavior. + // Requires replication_num=3 + [shuffle] hint to force BUCKET_SHUFFLE plan. + // ============================================================ + + logger.info("=== Bug 14: BUCKET_SHUFFLE join + serial build Exchange (FE planner) ===") + // Need replication_num=3 for BUCKET_SHUFFLE. Check if allow_replica_on_same_host is enabled. + def allowSameHost = sql "ADMIN SHOW FRONTEND CONFIG LIKE 'allow_replica_on_same_host'" + if (allowSameHost[0][1].toString() == "true") { + sql "DROP TABLE IF EXISTS rqg_t5_rep3" + sql "DROP TABLE IF EXISTS rqg_t6_rep3" + try { + sql """ + CREATE TABLE rqg_t5_rep3 ( + pk INT NULL, + col_varchar_10__undef_signed VARCHAR(10) NULL, + col_bigint_undef_signed BIGINT NULL, + col_varchar_64__undef_signed VARCHAR(64) NULL + ) DUPLICATE KEY(pk, col_varchar_10__undef_signed) + DISTRIBUTED BY HASH(pk) BUCKETS 10 + PROPERTIES ("replication_num" = "3") + """ + sql """ + CREATE TABLE rqg_t6_rep3 ( + pk INT NULL, + col_varchar_10__undef_signed VARCHAR(10) NULL, + col_bigint_undef_signed BIGINT NULL, + col_varchar_64__undef_signed VARCHAR(64) NULL + ) DUPLICATE KEY(pk, col_varchar_10__undef_signed) + DISTRIBUTED BY HASH(pk) BUCKETS 10 + PROPERTIES ("replication_num" = "3") + """ + sql """ + INSERT INTO rqg_t5_rep3 VALUES + (0,'abc',-94,'hello'),(1,'xyz',672609,null),(2,'pqr',-3766684,'test'), + (3,'abc',5070261,'another'),(4,'def',null,'value'),(5,'so',-86,null), + (6,'abc',21910,'they'),(7,'zzzz',-63,'some'),(8,'xPLflvBEcW',-8276281,'longer'), + (9,'mid',-101,'final') + """ + sql """ + INSERT INTO rqg_t6_rep3 VALUES + (0,'aaa',100,'world'),(1,'bbb',200,null),(2,'ccc',300,'foo'), + (3,'ddd',400,'bar'),(4,'eee',500,'baz'),(5,'fff',600,null), + (6,'ggg',700,'qux'),(7,'hhh',800,'quux'),(8,'iii',900,'corge'), + (9,'jjj',1000,'grault') + """ + Thread.sleep(3000) + + def bug14_fe = sql """ + SELECT /*+SET_VAR(enable_local_shuffle_planner=true, + parallel_pipeline_task_num=3, + disable_streaming_preaggregations=true, + enable_sql_cache=false, + disable_join_reorder=true)*/ + table1.pk AS field1 + FROM rqg_t5_rep3 AS table1 + LEFT OUTER JOIN [shuffle] rqg_t6_rep3 AS table2 ON table1.pk = table2.pk + WHERE table1.col_varchar_10__undef_signed >= 'so' + GROUP BY field1 + ORDER BY field1 + """ + def bug14_be = sql """ + SELECT /*+SET_VAR(enable_local_shuffle_planner=false, + parallel_pipeline_task_num=3, + disable_streaming_preaggregations=true, + enable_sql_cache=false, + disable_join_reorder=true)*/ + table1.pk AS field1 + FROM rqg_t5_rep3 AS table1 + LEFT OUTER JOIN [shuffle] rqg_t6_rep3 AS table2 ON table1.pk = table2.pk + WHERE table1.col_varchar_10__undef_signed >= 'so' + GROUP BY field1 + ORDER BY field1 + """ + logger.info("Bug 14 FE rows: ${bug14_fe.size()}, BE rows: ${bug14_be.size()}") + assertEquals(bug14_be.size(), bug14_fe.size(), "Bug 14: FE/BE row count mismatch (BUCKET_SHUFFLE must set shared state)") + assertEquals(bug14_be, bug14_fe, "Bug 14: FE/BE result mismatch for BUCKET_SHUFFLE + serial build Exchange") + logger.info("Bug 14: PASSED (no crash, results match)") + } catch (Throwable t) { + logger.error("Bug 14 FAILED: ${t.message}") + assertTrue(false, "Bug 14: BUCKET_SHUFFLE must set shared state: ${t.message}") + } + } else { + logger.info("Bug 14: SKIPPED (allow_replica_on_same_host not enabled, cannot create replication_num=3 tables)") + } + logger.info("=== All RQG bug reproduction tests completed ===") } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
