This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 774f8f32b84 [fix](group commit) fix some group commit problem (#48621)
774f8f32b84 is described below

commit 774f8f32b8422d9e8990debbbb9a2f159e88394d
Author: meiyi <me...@selectdb.com>
AuthorDate: Mon Mar 10 19:39:01 2025 +0800

    [fix](group commit) fix some group commit problem (#48621)
    
    
    
    Problem Summary:
    1. fix stream load with `unique_key_update_mode`
    2. fix `insert into t select * from ctas1 union all select * from
    ctas2;`
---
 be/src/http/action/stream_load.cpp                 |   7 ++-
 .../insert/OlapGroupCommitInsertExecutor.java      |  15 +++---
 .../insert_group_commit_with_large_data.out        | Bin 221 -> 383 bytes
 .../data/load_p0/stream_load/test1.json            |   8 +++
 .../stream_load/test_group_commit_stream_load.out  | Bin 276 -> 548 bytes
 .../ddl_p0/test_create_table_properties.groovy     |   3 +-
 regression-test/suites/ddl_p0/test_ctas.groovy     |   2 +-
 .../insert_group_commit_with_large_data.groovy     |  54 +++++++++++++++++++--
 .../test_group_commit_stream_load.groovy           |  36 ++++++++++++++
 .../ddl/clean/test_clean_label_nereids.groovy      |   2 +-
 .../test_varchar_sc_in_complex.groovy              |   3 +-
 11 files changed, 115 insertions(+), 15 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 98d4c905644..d3720778d85 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -859,7 +859,12 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* 
req,
                            iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
     auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
     auto partitions = !req->header(HTTP_PARTITIONS).empty();
-    if (!partial_columns && !partitions && !temp_partitions && 
!ctx->two_phase_commit) {
+    auto update_mode =
+            !req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
+            (iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), 
"UPDATE_FIXED_COLUMNS") ||
+             iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), 
"UPDATE_FLEXIBLE_COLUMNS"));
+    if (!partial_columns && !partitions && !temp_partitions && 
!ctx->two_phase_commit &&
+        !update_mode) {
         if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) 
{
             return Status::InvalidArgument("label and group_commit can't be 
set at the same time");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index 050fc791328..029c497b178 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -30,6 +30,7 @@ import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.InlineTable;
 import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
 import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
@@ -133,12 +134,14 @@ public class OlapGroupCommitInsertExecutor extends 
OlapInsertExecutor {
                     () -> "not allowModifyMTMVData"));
             conditions.add(Pair.of(() -> !(insertCtx.isPresent() && 
insertCtx.get() instanceof OlapInsertCommandContext
                     && ((OlapInsertCommandContext) 
insertCtx.get()).isOverwrite()), () -> "is overwrite command"));
-            conditions.add(Pair.of(
-                    () -> tableSink.child() instanceof OneRowRelation
-                            || tableSink.child() instanceof LogicalUnion
-                            || tableSink.child() instanceof InlineTable,
-                    () -> "not one row relation or union or inline table, 
class: "
-                            + tableSink.child().getClass().getName()));
+            Plan tableSinkChild = tableSink.child();
+            conditions.add(Pair.of(() -> tableSinkChild instanceof 
OneRowRelation
+                            || (tableSinkChild instanceof LogicalUnion
+                            && tableSinkChild.getExpressions().size() > 0)
+                            || tableSinkChild instanceof InlineTable,
+                    () -> "should be one row relation or union or inline 
table, class: "
+                            + tableSinkChild.getClass().getName() + 
(tableSinkChild instanceof LogicalUnion
+                            ? ", expression size is 0" : "")));
             ctx.setGroupCommit(conditions.stream().allMatch(p -> 
p.first.getAsBoolean()));
             if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) {
                 for (Pair<BooleanSupplier, Supplier<String>> pair : 
conditions) {
diff --git 
a/regression-test/data/insert_p0/insert_group_commit_with_large_data.out 
b/regression-test/data/insert_p0/insert_group_commit_with_large_data.out
index 06acc23ec64..5e28e927c1c 100644
Binary files 
a/regression-test/data/insert_p0/insert_group_commit_with_large_data.out and 
b/regression-test/data/insert_p0/insert_group_commit_with_large_data.out differ
diff --git a/regression-test/data/load_p0/stream_load/test1.json 
b/regression-test/data/load_p0/stream_load/test1.json
new file mode 100644
index 00000000000..b61fbb04364
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test1.json
@@ -0,0 +1,8 @@
+{"k": 1, "v1": 10}
+{"k": 2, "v2": 20, "v5": 25}
+{"k": 3, "v3": 30}
+{"k": 4, "v4": 20, "v1": 43, "v3": 99}
+{"k": 5, "v5": null}
+{"k": 6, "v1": 999, "v3": 777}
+{"k": 2, "v4": 222}
+{"k": 1, "v2": 111, "v3": 111}
\ No newline at end of file
diff --git 
a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out 
b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
index 1f1afae813f..3e5d73452d0 100644
Binary files 
a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out 
and 
b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out 
differ
diff --git a/regression-test/suites/ddl_p0/test_create_table_properties.groovy 
b/regression-test/suites/ddl_p0/test_create_table_properties.groovy
index 32fd0cabcaf..4a64ac55bcb 100644
--- a/regression-test/suites/ddl_p0/test_create_table_properties.groovy
+++ b/regression-test/suites/ddl_p0/test_create_table_properties.groovy
@@ -300,7 +300,8 @@ suite("test_create_table_properties") {
         assertTrue(false, "should not be able to execute")
     }
        catch (Exception ex) {
-        assertTrue(ex.getMessage().contains("Insert has filtered data in 
strict mode"))
+        def exception_str = isGroupCommitMode() ? "too many filtered rows" : 
"Insert has filtered data in strict mode"
+        assertTrue(ex.getMessage().contains(exception_str))
        } finally {
     }
     // alter table add default partition
diff --git a/regression-test/suites/ddl_p0/test_ctas.groovy 
b/regression-test/suites/ddl_p0/test_ctas.groovy
index 3dc0afd73af..6d176e30b4f 100644
--- a/regression-test/suites/ddl_p0/test_ctas.groovy
+++ b/regression-test/suites/ddl_p0/test_ctas.groovy
@@ -106,7 +106,7 @@ suite("test_ctas") {
 
         test {
             sql """show load from ${dbname}"""
-            rowNum 6
+            rowNum isGroupCommitMode() ? 4: 6
         }
 
         sql """
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
index 39773f887cb..d43ad340f87 100644
--- 
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
+++ 
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
@@ -101,9 +101,10 @@ suite("insert_group_commit_with_large_data") {
     sql """ drop table if exists ${testTable}; """
     sql """create table ${testTable}(a int,b int,c double generated always as 
(abs(a+b)) not null)
     DISTRIBUTED BY HASH(a) PROPERTIES("replication_num" = "1", 
"group_commit_interval_ms" = "40");"""
-    sql "INSERT INTO ${testTable} values(6,7,default);"
-    sql "INSERT INTO ${testTable}(a,b) values(1,2);"
-    sql "INSERT INTO ${testTable} values(3,5,default);"
+    sql " set group_commit = async_mode; "
+    group_commit_insert "INSERT INTO ${testTable} values(6,7,default);", 1
+    group_commit_insert "INSERT INTO ${testTable}(a,b) values(1,2);", 1
+    group_commit_insert "INSERT INTO ${testTable} values(3,5,default);", 1
     getRowCount(3)
     qt_select1  "select * from ${testTable} order by 1,2,3;"
 
@@ -123,7 +124,6 @@ suite("insert_group_commit_with_large_data") {
             if (exception != null) {
                 throw exception
             }
-            log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
             assertEquals(4, json.NumberTotalRows)
@@ -131,4 +131,50 @@ suite("insert_group_commit_with_large_data") {
     }
     getRowCount(7)
     qt_select2  "select * from ${testTable} order by 1,2,3;"
+
+    try {
+        sql """set group_commit = off_mode;"""
+        sql "drop table if exists gc_ctas1"
+        sql "drop table if exists gc_ctas2"
+        sql "drop table if exists gc_ctas3"
+        sql '''
+            CREATE TABLE IF NOT EXISTS `gc_ctas1` (
+                `k1` varchar(5) NULL,
+                `k2` varchar(5) NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1"
+            );
+        '''
+        sql '''
+            CREATE TABLE IF NOT EXISTS `gc_ctas2` (
+                `k1` varchar(10) NULL,
+                `k2` varchar(10) NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1"
+            );
+        '''
+        sql ''' insert into gc_ctas1 values('11111','11111'); '''
+        sql ''' insert into gc_ctas2 values('1111111111','1111111111'); '''
+        sql "sync"
+        order_qt_select_cte1 """ select * from gc_ctas1; """
+        order_qt_select_cte2 """ select * from gc_ctas2; """
+        sql """set group_commit = async_mode;"""
+        sql '''
+            create table `gc_ctas3`(k1, k2) 
+            PROPERTIES("replication_num" = "1") 
+            as select * from gc_ctas1
+                union all 
+                select * from gc_ctas2;
+        '''
+        sql  " insert into gc_ctas3 select * from gc_ctas1 union all select * 
from gc_ctas2;"
+        sql "sync"
+        order_qt_select_cte3 """ select * from gc_ctas3; """
+    } finally {
+    }
 }
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index be483c4dd48..a2de2a04b98 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -306,4 +306,40 @@ suite("test_group_commit_stream_load") {
     } finally {
         // try_sql("DROP TABLE ${tableName}")
     }
+
+    // stream load with unique_key_update_mode
+    tableName = "test_group_commit_stream_load_update"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE ${tableName} (
+            `k` int(11) NULL, 
+            `v1` BIGINT NULL,
+            `v2` BIGINT NULL DEFAULT "9876",
+            `v3` BIGINT NOT NULL,
+            `v4` BIGINT NOT NULL DEFAULT "1234",
+            `v5` BIGINT NULL
+            ) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
+            PROPERTIES(
+            "replication_num" = "1",
+            "enable_unique_key_merge_on_write" = "true",
+            "light_schema_change" = "true",
+            "enable_unique_key_skip_bitmap_column" = "true"); """
+
+    def show_res = sql "show create table ${tableName}"
+    
assertTrue(show_res.toString().contains('"enable_unique_key_skip_bitmap_column" 
= "true"'))
+    sql """insert into ${tableName} select number, number, number, number, 
number, number from numbers("number" = "6"); """
+    qt_sql "select 
k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} 
order by k;"
+
+    streamLoad {
+        table "${tableName}"
+        set 'group_commit', 'async_mode'
+        set 'format', 'json'
+        set 'read_json_by_line', 'true'
+        set 'strict_mode', 'false'
+        set 'unique_key_update_mode', 'update_FLEXIBLE_COLUMNS'
+        file "test1.json"
+        time 20000
+        unset 'label'
+    }
+    qt_read_json_by_line "select 
k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} 
order by k;"
+
 }
diff --git 
a/regression-test/suites/nereids_p0/ddl/clean/test_clean_label_nereids.groovy 
b/regression-test/suites/nereids_p0/ddl/clean/test_clean_label_nereids.groovy
index d751ba8cd9a..a5abb874476 100644
--- 
a/regression-test/suites/nereids_p0/ddl/clean/test_clean_label_nereids.groovy
+++ 
b/regression-test/suites/nereids_p0/ddl/clean/test_clean_label_nereids.groovy
@@ -77,7 +77,7 @@ suite("test_clean_label_nereids") {
     //
     def totalLabel = sql """show load from ${dbName}"""
     println totalLabel
-    assert totalLabel.size() == 4
+    assert totalLabel.size() == isGroupCommitMode() ? 1 : 4
     // clean label
     checkNereidsExecute("clean label ${insertLabel} from ${dbName};")
     checkNereidsExecute("clean label  from ${dbName};")
diff --git 
a/regression-test/suites/schema_change_p0/test_varchar_sc_in_complex.groovy 
b/regression-test/suites/schema_change_p0/test_varchar_sc_in_complex.groovy
index 584a422f2a7..682ef2f1e20 100644
--- a/regression-test/suites/schema_change_p0/test_varchar_sc_in_complex.groovy
+++ b/regression-test/suites/schema_change_p0/test_varchar_sc_in_complex.groovy
@@ -52,11 +52,12 @@ suite ("test_varchar_sc_in_complex") {
                 (1,['2025-01-02'], {'doris':'better'}, 
named_struct('col','amory'));
             """
         // this can be insert but with cut off the left string to 10
+        def exception_str = isGroupCommitMode() ? "too many filtered rows" : 
"Insert has filtered data in strict mode"
         test {
             sql """ insert into ${tableName} values
                 (11, ['2025-01-03-22-33'], 
{'doris111111111':'better2222222222'}, named_struct('col','amoryIsBetter'));
             """
-            exception "Insert has filtered data in strict mode"
+            exception exception_str
         }
 
         // case1. can not alter modify column to shorten string length for 
array/map/struct


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to