This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 774f8f32b84 [fix](group commit) fix some group commit problem (#48621) 774f8f32b84 is described below commit 774f8f32b8422d9e8990debbbb9a2f159e88394d Author: meiyi <me...@selectdb.com> AuthorDate: Mon Mar 10 19:39:01 2025 +0800 [fix](group commit) fix some group commit problem (#48621) Problem Summary: 1. fix stream load with `unique_key_update_mode` 2. fix `insert into t select * from ctas1 union all select * from ctas2;` --- be/src/http/action/stream_load.cpp | 7 ++- .../insert/OlapGroupCommitInsertExecutor.java | 15 +++--- .../insert_group_commit_with_large_data.out | Bin 221 -> 383 bytes .../data/load_p0/stream_load/test1.json | 8 +++ .../stream_load/test_group_commit_stream_load.out | Bin 276 -> 548 bytes .../ddl_p0/test_create_table_properties.groovy | 3 +- regression-test/suites/ddl_p0/test_ctas.groovy | 2 +- .../insert_group_commit_with_large_data.groovy | 54 +++++++++++++++++++-- .../test_group_commit_stream_load.groovy | 36 ++++++++++++++ .../ddl/clean/test_clean_label_nereids.groovy | 2 +- .../test_varchar_sc_in_complex.groovy | 3 +- 11 files changed, 115 insertions(+), 15 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 98d4c905644..d3720778d85 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -859,7 +859,12 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); auto partitions = !req->header(HTTP_PARTITIONS).empty(); - if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { + auto update_mode = + !req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() && + (iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FIXED_COLUMNS") || + iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS")); + if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit && + !update_mode) { if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { return Status::InvalidArgument("label and group_commit can't be set at the same time"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 050fc791328..029c497b178 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -30,6 +30,7 @@ import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.InlineTable; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; @@ -133,12 +134,14 @@ public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor { () -> "not allowModifyMTMVData")); conditions.add(Pair.of(() -> !(insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext && ((OlapInsertCommandContext) insertCtx.get()).isOverwrite()), () -> "is overwrite command")); - conditions.add(Pair.of( - () -> tableSink.child() instanceof OneRowRelation - || tableSink.child() instanceof LogicalUnion - || tableSink.child() instanceof InlineTable, - () -> "not one row relation or union or inline table, class: " - + tableSink.child().getClass().getName())); + Plan tableSinkChild = tableSink.child(); + conditions.add(Pair.of(() -> tableSinkChild instanceof OneRowRelation + || (tableSinkChild instanceof LogicalUnion + && tableSinkChild.getExpressions().size() > 0) + || tableSinkChild instanceof InlineTable, + () -> "should be one row relation or union or inline table, class: " + + tableSinkChild.getClass().getName() + (tableSinkChild instanceof LogicalUnion + ? ", expression size is 0" : ""))); ctx.setGroupCommit(conditions.stream().allMatch(p -> p.first.getAsBoolean())); if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) { for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) { diff --git a/regression-test/data/insert_p0/insert_group_commit_with_large_data.out b/regression-test/data/insert_p0/insert_group_commit_with_large_data.out index 06acc23ec64..5e28e927c1c 100644 Binary files a/regression-test/data/insert_p0/insert_group_commit_with_large_data.out and b/regression-test/data/insert_p0/insert_group_commit_with_large_data.out differ diff --git a/regression-test/data/load_p0/stream_load/test1.json b/regression-test/data/load_p0/stream_load/test1.json new file mode 100644 index 00000000000..b61fbb04364 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test1.json @@ -0,0 +1,8 @@ +{"k": 1, "v1": 10} +{"k": 2, "v2": 20, "v5": 25} +{"k": 3, "v3": 30} +{"k": 4, "v4": 20, "v1": 43, "v3": 99} +{"k": 5, "v5": null} +{"k": 6, "v1": 999, "v3": 777} +{"k": 2, "v4": 222} +{"k": 1, "v2": 111, "v3": 111} \ No newline at end of file diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out index 1f1afae813f..3e5d73452d0 100644 Binary files a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out and b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out differ diff --git a/regression-test/suites/ddl_p0/test_create_table_properties.groovy b/regression-test/suites/ddl_p0/test_create_table_properties.groovy index 32fd0cabcaf..4a64ac55bcb 100644 --- a/regression-test/suites/ddl_p0/test_create_table_properties.groovy +++ b/regression-test/suites/ddl_p0/test_create_table_properties.groovy @@ -300,7 +300,8 @@ suite("test_create_table_properties") { assertTrue(false, "should not be able to execute") } catch (Exception ex) { - assertTrue(ex.getMessage().contains("Insert has filtered data in strict mode")) + def exception_str = isGroupCommitMode() ? "too many filtered rows" : "Insert has filtered data in strict mode" + assertTrue(ex.getMessage().contains(exception_str)) } finally { } // alter table add default partition diff --git a/regression-test/suites/ddl_p0/test_ctas.groovy b/regression-test/suites/ddl_p0/test_ctas.groovy index 3dc0afd73af..6d176e30b4f 100644 --- a/regression-test/suites/ddl_p0/test_ctas.groovy +++ b/regression-test/suites/ddl_p0/test_ctas.groovy @@ -106,7 +106,7 @@ suite("test_ctas") { test { sql """show load from ${dbname}""" - rowNum 6 + rowNum isGroupCommitMode() ? 4: 6 } sql """ diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy index 39773f887cb..d43ad340f87 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy @@ -101,9 +101,10 @@ suite("insert_group_commit_with_large_data") { sql """ drop table if exists ${testTable}; """ sql """create table ${testTable}(a int,b int,c double generated always as (abs(a+b)) not null) DISTRIBUTED BY HASH(a) PROPERTIES("replication_num" = "1", "group_commit_interval_ms" = "40");""" - sql "INSERT INTO ${testTable} values(6,7,default);" - sql "INSERT INTO ${testTable}(a,b) values(1,2);" - sql "INSERT INTO ${testTable} values(3,5,default);" + sql " set group_commit = async_mode; " + group_commit_insert "INSERT INTO ${testTable} values(6,7,default);", 1 + group_commit_insert "INSERT INTO ${testTable}(a,b) values(1,2);", 1 + group_commit_insert "INSERT INTO ${testTable} values(3,5,default);", 1 getRowCount(3) qt_select1 "select * from ${testTable} order by 1,2,3;" @@ -123,7 +124,6 @@ suite("insert_group_commit_with_large_data") { if (exception != null) { throw exception } - log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) assertEquals(4, json.NumberTotalRows) @@ -131,4 +131,50 @@ suite("insert_group_commit_with_large_data") { } getRowCount(7) qt_select2 "select * from ${testTable} order by 1,2,3;" + + try { + sql """set group_commit = off_mode;""" + sql "drop table if exists gc_ctas1" + sql "drop table if exists gc_ctas2" + sql "drop table if exists gc_ctas3" + sql ''' + CREATE TABLE IF NOT EXISTS `gc_ctas1` ( + `k1` varchar(5) NULL, + `k2` varchar(5) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + ''' + sql ''' + CREATE TABLE IF NOT EXISTS `gc_ctas2` ( + `k1` varchar(10) NULL, + `k2` varchar(10) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + ''' + sql ''' insert into gc_ctas1 values('11111','11111'); ''' + sql ''' insert into gc_ctas2 values('1111111111','1111111111'); ''' + sql "sync" + order_qt_select_cte1 """ select * from gc_ctas1; """ + order_qt_select_cte2 """ select * from gc_ctas2; """ + sql """set group_commit = async_mode;""" + sql ''' + create table `gc_ctas3`(k1, k2) + PROPERTIES("replication_num" = "1") + as select * from gc_ctas1 + union all + select * from gc_ctas2; + ''' + sql " insert into gc_ctas3 select * from gc_ctas1 union all select * from gc_ctas2;" + sql "sync" + order_qt_select_cte3 """ select * from gc_ctas3; """ + } finally { + } } diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index be483c4dd48..a2de2a04b98 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -306,4 +306,40 @@ suite("test_group_commit_stream_load") { } finally { // try_sql("DROP TABLE ${tableName}") } + + // stream load with unique_key_update_mode + tableName = "test_group_commit_stream_load_update" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + `k` int(11) NULL, + `v1` BIGINT NULL, + `v2` BIGINT NULL DEFAULT "9876", + `v3` BIGINT NOT NULL, + `v4` BIGINT NOT NULL DEFAULT "1234", + `v5` BIGINT NULL + ) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true"); """ + + def show_res = sql "show create table ${tableName}" + assertTrue(show_res.toString().contains('"enable_unique_key_skip_bitmap_column" = "true"')) + sql """insert into ${tableName} select number, number, number, number, number, number from numbers("number" = "6"); """ + qt_sql "select k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} order by k;" + + streamLoad { + table "${tableName}" + set 'group_commit', 'async_mode' + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'strict_mode', 'false' + set 'unique_key_update_mode', 'update_FLEXIBLE_COLUMNS' + file "test1.json" + time 20000 + unset 'label' + } + qt_read_json_by_line "select k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} order by k;" + } diff --git a/regression-test/suites/nereids_p0/ddl/clean/test_clean_label_nereids.groovy b/regression-test/suites/nereids_p0/ddl/clean/test_clean_label_nereids.groovy index d751ba8cd9a..a5abb874476 100644 --- a/regression-test/suites/nereids_p0/ddl/clean/test_clean_label_nereids.groovy +++ b/regression-test/suites/nereids_p0/ddl/clean/test_clean_label_nereids.groovy @@ -77,7 +77,7 @@ suite("test_clean_label_nereids") { // def totalLabel = sql """show load from ${dbName}""" println totalLabel - assert totalLabel.size() == 4 + assert totalLabel.size() == isGroupCommitMode() ? 1 : 4 // clean label checkNereidsExecute("clean label ${insertLabel} from ${dbName};") checkNereidsExecute("clean label from ${dbName};") diff --git a/regression-test/suites/schema_change_p0/test_varchar_sc_in_complex.groovy b/regression-test/suites/schema_change_p0/test_varchar_sc_in_complex.groovy index 584a422f2a7..682ef2f1e20 100644 --- a/regression-test/suites/schema_change_p0/test_varchar_sc_in_complex.groovy +++ b/regression-test/suites/schema_change_p0/test_varchar_sc_in_complex.groovy @@ -52,11 +52,12 @@ suite ("test_varchar_sc_in_complex") { (1,['2025-01-02'], {'doris':'better'}, named_struct('col','amory')); """ // this can be insert but with cut off the left string to 10 + def exception_str = isGroupCommitMode() ? "too many filtered rows" : "Insert has filtered data in strict mode" test { sql """ insert into ${tableName} values (11, ['2025-01-03-22-33'], {'doris111111111':'better2222222222'}, named_struct('col','amoryIsBetter')); """ - exception "Insert has filtered data in strict mode" + exception exception_str } // case1. can not alter modify column to shorten string length for array/map/struct --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org