This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new cd4b8666e74 branch-3.0: [fix](group commit) fix some group commit problem (#48621) (#48896) cd4b8666e74 is described below commit cd4b8666e7483c2ba2be24081cfd2e8b7b34a32a Author: meiyi <me...@selectdb.com> AuthorDate: Tue Mar 11 17:34:48 2025 +0800 branch-3.0: [fix](group commit) fix some group commit problem (#48621) (#48896) pick https://github.com/apache/doris/pull/48621 --- .../insert/OlapGroupCommitInsertExecutor.java | 12 +++-- .../insert_group_commit_with_large_data.out | Bin 221 -> 383 bytes .../ddl_p0/test_create_table_properties.groovy | 3 +- regression-test/suites/ddl_p0/test_ctas.groovy | 2 +- .../insert_group_commit_with_large_data.groovy | 54 +++++++++++++++++++-- 5 files changed, 61 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 0387f308b15..b90a616cd7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -32,6 +32,7 @@ import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; @@ -131,11 +132,14 @@ public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor { () -> "not allowModifyMTMVData")); conditions.add(Pair.of(() -> !(insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext && ((OlapInsertCommandContext) insertCtx.get()).isOverwrite()), () -> "is overwrite command")); + Plan tableSinkChild = tableSink.child(); conditions.add(Pair.of( - () -> tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion - || tableSink.child() instanceof LogicalInlineTable, - () -> "not one row relation or union or inline table, class: " + tableSink.child().getClass() - .getName())); + () -> tableSinkChild instanceof OneRowRelation || (tableSinkChild instanceof LogicalUnion + && tableSinkChild.getExpressions().size() > 0) + || tableSinkChild instanceof LogicalInlineTable, + () -> "should be one row relation or union or inline table, class: " + + tableSinkChild.getClass().getName() + (tableSinkChild instanceof LogicalUnion + ? ", expression size is 0" : ""))); ctx.setGroupCommit(conditions.stream().allMatch(p -> p.first.getAsBoolean())); if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) { for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) { diff --git a/regression-test/data/insert_p0/insert_group_commit_with_large_data.out b/regression-test/data/insert_p0/insert_group_commit_with_large_data.out index 06acc23ec64..5e28e927c1c 100644 Binary files a/regression-test/data/insert_p0/insert_group_commit_with_large_data.out and b/regression-test/data/insert_p0/insert_group_commit_with_large_data.out differ diff --git a/regression-test/suites/ddl_p0/test_create_table_properties.groovy b/regression-test/suites/ddl_p0/test_create_table_properties.groovy index 32fd0cabcaf..4a64ac55bcb 100644 --- a/regression-test/suites/ddl_p0/test_create_table_properties.groovy +++ b/regression-test/suites/ddl_p0/test_create_table_properties.groovy @@ -300,7 +300,8 @@ suite("test_create_table_properties") { assertTrue(false, "should not be able to execute") } catch (Exception ex) { - assertTrue(ex.getMessage().contains("Insert has filtered data in strict mode")) + def exception_str = isGroupCommitMode() ? "too many filtered rows" : "Insert has filtered data in strict mode" + assertTrue(ex.getMessage().contains(exception_str)) } finally { } // alter table add default partition diff --git a/regression-test/suites/ddl_p0/test_ctas.groovy b/regression-test/suites/ddl_p0/test_ctas.groovy index 3dc0afd73af..6d176e30b4f 100644 --- a/regression-test/suites/ddl_p0/test_ctas.groovy +++ b/regression-test/suites/ddl_p0/test_ctas.groovy @@ -106,7 +106,7 @@ suite("test_ctas") { test { sql """show load from ${dbname}""" - rowNum 6 + rowNum isGroupCommitMode() ? 4: 6 } sql """ diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy index 39773f887cb..d43ad340f87 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy @@ -101,9 +101,10 @@ suite("insert_group_commit_with_large_data") { sql """ drop table if exists ${testTable}; """ sql """create table ${testTable}(a int,b int,c double generated always as (abs(a+b)) not null) DISTRIBUTED BY HASH(a) PROPERTIES("replication_num" = "1", "group_commit_interval_ms" = "40");""" - sql "INSERT INTO ${testTable} values(6,7,default);" - sql "INSERT INTO ${testTable}(a,b) values(1,2);" - sql "INSERT INTO ${testTable} values(3,5,default);" + sql " set group_commit = async_mode; " + group_commit_insert "INSERT INTO ${testTable} values(6,7,default);", 1 + group_commit_insert "INSERT INTO ${testTable}(a,b) values(1,2);", 1 + group_commit_insert "INSERT INTO ${testTable} values(3,5,default);", 1 getRowCount(3) qt_select1 "select * from ${testTable} order by 1,2,3;" @@ -123,7 +124,6 @@ suite("insert_group_commit_with_large_data") { if (exception != null) { throw exception } - log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) assertEquals(4, json.NumberTotalRows) @@ -131,4 +131,50 @@ suite("insert_group_commit_with_large_data") { } getRowCount(7) qt_select2 "select * from ${testTable} order by 1,2,3;" + + try { + sql """set group_commit = off_mode;""" + sql "drop table if exists gc_ctas1" + sql "drop table if exists gc_ctas2" + sql "drop table if exists gc_ctas3" + sql ''' + CREATE TABLE IF NOT EXISTS `gc_ctas1` ( + `k1` varchar(5) NULL, + `k2` varchar(5) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + ''' + sql ''' + CREATE TABLE IF NOT EXISTS `gc_ctas2` ( + `k1` varchar(10) NULL, + `k2` varchar(10) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + ''' + sql ''' insert into gc_ctas1 values('11111','11111'); ''' + sql ''' insert into gc_ctas2 values('1111111111','1111111111'); ''' + sql "sync" + order_qt_select_cte1 """ select * from gc_ctas1; """ + order_qt_select_cte2 """ select * from gc_ctas2; """ + sql """set group_commit = async_mode;""" + sql ''' + create table `gc_ctas3`(k1, k2) + PROPERTIES("replication_num" = "1") + as select * from gc_ctas1 + union all + select * from gc_ctas2; + ''' + sql " insert into gc_ctas3 select * from gc_ctas1 union all select * from gc_ctas2;" + sql "sync" + order_qt_select_cte3 """ select * from gc_ctas3; """ + } finally { + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org