This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new add160b768e [improvement](regression-test)  add more group commit 
regression-test (#26952)
add160b768e is described below

commit add160b768ea68776ecd256772acd9de861cee34
Author: huanghaibin <284824...@qq.com>
AuthorDate: Wed Nov 15 00:01:13 2023 +0800

    [improvement](regression-test)  add more group commit regression-test 
(#26952)
---
 ...commit_http_stream_lineitem_multiple_client.out |   4 +
 ..._commit_http_stream_lineitem_multiple_table.out |  31 ++
 ...st_group_commit_http_stream_lineitem_normal.out |  10 +
 ...p_commit_http_stream_lineitem_schema_change.out |  16 +
 ...commit_insert_into_lineitem_multiple_client.out |   4 +
 ..._commit_insert_into_lineitem_multiple_table.out |  31 ++
 ...st_group_commit_insert_into_lineitem_normal.out |   4 +
 ...p_commit_insert_into_lineitem_scheme_change.out |  16 +
 ...commit_stream_load_lineitem_multiple_client.out |   4 +
 ..._commit_stream_load_lineitem_multiple_table.out |  31 ++
 ...st_group_commit_stream_load_lineitem_normal.out |  10 +
 ...p_commit_stream_load_lineitem_schema_change.out |  16 +
 .../org/apache/doris/regression/suite/Suite.groovy |   6 +
 ...mit_http_stream_lineitem_multiple_client.groovy | 149 +++++++
 ...mmit_http_stream_lineitem_multiple_table.groovy | 140 +++++++
 ...group_commit_http_stream_lineitem_normal.groovy | 122 ++++++
 ...ommit_http_stream_lineitem_schema_change.groovy | 385 ++++++++++++++++++
 ...mit_insert_into_lineitem_multiple_client.groovy | 219 ++++++++++
 ...mmit_insert_into_lineitem_multiple_table.groovy | 225 +++++++++++
 ...group_commit_insert_into_lineitem_normal.groovy | 192 +++++++++
 ...ommit_insert_into_lineitem_scheme_change.groovy | 448 +++++++++++++++++++++
 ...mit_stream_load_lineitem_multiple_client.groovy | 146 +++++++
 ...mmit_stream_load_lineitem_multiple_table.groovy | 137 +++++++
 ...group_commit_stream_load_lineitem_normal.groovy | 119 ++++++
 ...ommit_stream_load_lineitem_schema_change.groovy | 353 ++++++++++++++++
 25 files changed, 2818 insertions(+)

diff --git 
a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out
 
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out
new file mode 100644
index 00000000000..ef23823e23e
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+6001215
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out
 
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out
new file mode 100644
index 00000000000..7495e1a6b6b
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+600572
+
+-- !sql --
+599397
+
+-- !sql --
+600124
+
+-- !sql --
+599647
+
+-- !sql --
+599931
+
+-- !sql --
+601365
+
+-- !sql --
+599301
+
+-- !sql --
+600504
+
+-- !sql --
+599715
+
+-- !sql --
+600659
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out
 
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out
new file mode 100644
index 00000000000..bac125e32fc
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+6001215
+
+-- !sql --
+12002430
+
+-- !sql --
+18003645
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out
 
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out
new file mode 100644
index 00000000000..1be3e26c88e
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+3601475
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out
 
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out
new file mode 100644
index 00000000000..ef23823e23e
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+6001215
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out
 
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out
new file mode 100644
index 00000000000..86534c17502
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+600572
+
+-- !sql --
+600659
+
+-- !sql --
+599397
+
+-- !sql --
+600124
+
+-- !sql --
+599647
+
+-- !sql --
+599931
+
+-- !sql --
+601365
+
+-- !sql --
+599301
+
+-- !sql --
+600504
+
+-- !sql --
+599715
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out
 
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out
new file mode 100644
index 00000000000..ef23823e23e
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+6001215
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out
 
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out
new file mode 100644
index 00000000000..4f530919417
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+3000816
+
+-- !sql --
+6001215
+
+-- !sql --
+4673070
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out
 
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out
new file mode 100644
index 00000000000..ef23823e23e
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+6001215
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out
 
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out
new file mode 100644
index 00000000000..7495e1a6b6b
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+600572
+
+-- !sql --
+599397
+
+-- !sql --
+600124
+
+-- !sql --
+599647
+
+-- !sql --
+599931
+
+-- !sql --
+601365
+
+-- !sql --
+599301
+
+-- !sql --
+600504
+
+-- !sql --
+599715
+
+-- !sql --
+600659
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out
 
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out
new file mode 100644
index 00000000000..bac125e32fc
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+6001215
+
+-- !sql --
+12002430
+
+-- !sql --
+18003645
+
diff --git 
a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out
 
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out
new file mode 100644
index 00000000000..1be3e26c88e
--- /dev/null
+++ 
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+3601475
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index c9e8fe4215e..9913a5381c8 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -266,6 +266,12 @@ class Suite implements GroovyInterceptable {
         return result
     }
 
+    List<List<Object>> insert_into_sql(String sqlStr, int num) {
+        logger.info("insert into " + num + " records")
+        def (result, meta) = JdbcUtils.executeToList(context.getConnection(), 
sqlStr)
+        return result
+    }
+
     def sql_return_maparray(String sqlStr) {
         logger.info("Execute sql: ${sqlStr}".toString())
         def (result, meta) = JdbcUtils.executeToList(context.getConnection(), 
sqlStr)
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy
new file mode 100644
index 00000000000..1bf941fb57a
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+suite("test_group_commit_http_stream_lineitem_multiple_client") {
+    def db = "regression_test_insert_p2"
+    def stream_load_table = "test_http_stream_lineitem_multiple_client_sf1"
+    int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 
601365, 599301, 600504, 599715, 600659};
+    def total = 0;
+    def rwLock = new ReentrantReadWriteLock();
+    def wlock = rwLock.writeLock();
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            retry++
+            sleep(5000)
+        }
+    }
+
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
+    def create_stream_load_table = {
+        sql """ drop table if exists ${stream_load_table}; """
+
+        sql """
+   CREATE TABLE ${stream_load_table} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+    }
+
+    def do_stream_load = { i ->
+        logger.info("file:" + i)
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${stream_load_table}(l_orderkey, 
l_partkey, l_suppkey, l_linenumber, l_quantity, 
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,
+l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, 
c12, c13, c14, c15, c16 from http_stream
+                    ("format"="csv", "column_separator"="|")
+            """
+
+            set 'group_commit', 'true'
+            file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+            unset 'label'
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, rowCountArray[i - 1], 
rowCountArray[i - 1], 0, 0)
+            }
+        }
+
+        logger.info("load num: " + rowCountArray[i - 1])
+        wlock.lock()
+        total += rowCountArray[i - 1];
+        wlock.unlock()
+
+    }
+
+    def process = {
+        def threads = []
+        for (int k = 1; k <= 10; k++) {
+            int n = k;
+            logger.info("insert into file:" + n)
+            threads.add(Thread.startDaemon {
+                do_stream_load(n)
+            })
+        }
+        for (Thread th in threads) {
+            th.join()
+        }
+
+        logger.info("total:" + total)
+        getRowCount(total, stream_load_table)
+
+        qt_sql """ select count(*) from ${stream_load_table}; """
+    }
+
+    try {
+        create_stream_load_table()
+        process()
+    } finally {
+
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy
new file mode 100644
index 00000000000..03932d96cd3
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_http_stream_lineitem_multiple_table") {
+    def db = "regression_test_insert_p2"
+    def stream_load_table_base = "test_http_stream_lineitem_multiple_table"
+    int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 
601365, 599301, 600504, 599715, 600659};
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            retry++
+            sleep(5000)
+        }
+    }
+
+    def create_stream_load_table = { table_name ->
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+    }
+
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
+    def do_stream_load = { i, table_name ->
+        logger.info("file:" + i)
+
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${table_name}(l_orderkey, l_partkey, 
l_suppkey, l_linenumber, l_quantity, 
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,
+l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, 
c12, c13, c14, c15, c16 from http_stream
+                    ("format"="csv", "column_separator"="|")
+            """
+
+            set 'group_commit', 'true'
+            file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+            unset 'label'
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, rowCountArray[i - 1], 
rowCountArray[i - 1], 0, 0)
+            }
+        }
+        getRowCount(rowCountArray[i - 1], table_name)
+    }
+
+    def process = {
+        def threads = []
+        for (int k = 1; k <= 10; k++) {
+            int n = k;
+            String table_name = stream_load_table_base + "_" + n;
+            create_stream_load_table(table_name)
+            threads.add(Thread.startDaemon {
+                do_stream_load(n, table_name)
+            })
+        }
+        for (Thread th in threads) {
+            th.join()
+        }
+
+        for (int k = 1; k <= 10; k++) {
+            String table_name = stream_load_table_base + "_" + k;
+            qt_sql """ select count(*) from ${table_name}; """
+        }
+    }
+
+    try {
+        process()
+    } finally {
+
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy
new file mode 100644
index 00000000000..76ff680af11
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_http_stream_lineitem_normal") {
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            retry++
+            Thread.sleep(5000)
+        }
+    }
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+    def db = "regression_test_insert_p2"
+    def stream_load_table = "test_stream_load_lineitem_normal_sf1"
+    def create_stream_load_table = {
+        sql """ drop table if exists ${stream_load_table}; """
+
+        sql """
+   CREATE TABLE ${stream_load_table} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+    }
+    int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 
601365, 599301, 600504, 599715, 600659};
+    def process = {
+        int total = 0;
+        for (int k = 0; k < 3; k++) {
+            for (int i = 1; i <= 10; i++) {
+                streamLoad {
+                    set 'version', '1'
+                    set 'sql', """
+                    insert into ${db}.${stream_load_table}(l_orderkey, 
l_partkey, l_suppkey, l_linenumber, l_quantity, 
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,
+l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, 
c12, c13, c14, c15, c16 from http_stream
+                    ("format"="csv", "column_separator"="|")
+            """
+
+                    set 'group_commit', 'true'
+                    file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" 
+ i
+                    unset 'label'
+
+                    check { result, exception, startTime, endTime ->
+                        checkStreamLoadResult(exception, result, 
rowCountArray[i - 1], rowCountArray[i - 1], 0, 0)
+                    }
+                }
+                total += rowCountArray[i - 1];
+            }
+            getRowCount(total, stream_load_table)
+            qt_sql """ select count(*) from ${stream_load_table} """
+        }
+    }
+    try {
+        create_stream_load_table()
+        process()
+    } finally {
+
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy
new file mode 100644
index 00000000000..4a8359fb9c4
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+enum SC {
+    TRUNCATE_TABLE(1),
+    ADD_COLUMN(2),
+    DELETE(3),
+    DROP_COLUMN(4),
+    CHANGE_ORDER(5)
+    private int value
+
+    SC(int value) {
+        this.value = value
+    }
+
+    int getValue() {
+        return value
+    }
+}
+
+enum STATE {
+    NORMAL(1),
+    BEFORE_ADD_COLUMN(2),
+    DROP_COLUMN(3)
+    private int value
+
+    STATE(int value) {
+        this.value = value
+    }
+
+    int getValue() {
+        return value
+    }
+}
+
+suite("test_group_commit_http_stream_lineitem_schema_change") {
+    def db = "regression_test_insert_p2"
+    def stream_load_table = "test_http_stream_lineitem_schema_change_sf1"
+    int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 
601365, 599301, 600504, 599715, 600659};
+    def total = 0;
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            retry++
+            Thread.sleep(5000)
+        }
+    }
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
+    def create_stream_load_table = { table_name ->
+        // create table
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+    }
+
+    def create_stream_load_table_less_column = { table_name ->
+        // create table
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+
+    }
+
+    def insert_data = { i, table_name ->
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${stream_load_table}(l_orderkey, 
l_partkey, l_suppkey, l_linenumber, l_quantity, 
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,
+l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, 
c12, c13, c14, c15, c16 from http_stream
+                    ("format"="csv", "column_separator"="|")
+            """
+
+            set 'group_commit', 'true'
+            file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+            unset 'label'
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, rowCountArray[i - 1], 
rowCountArray[i - 1], 0, 0)
+            }
+        }
+        total += rowCountArray[i - 1];
+    }
+
+    def insert_data_less_column = { i, table_name ->
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${stream_load_table}(l_orderkey, 
l_partkey, l_suppkey, l_linenumber, l_quantity, 
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_shipinstruct,l_shipmode,
+l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c14, c15, 
c16 from http_stream
+                    ("format"="csv", "column_separator"="|")
+            """
+
+            set 'group_commit', 'true'
+            file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+            unset 'label'
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, rowCountArray[i - 1], 
rowCountArray[i - 1], 0, 0)
+            }
+        }
+        total += rowCountArray[i - 1];
+    }
+
+    def getAlterTableState = { table_name ->
+        def retry = 0
+        while (true) {
+            def state = sql "show alter table column where tablename = 
'${table_name}' order by CreateTime desc "
+            logger.info("alter table state: ${state}")
+            logger.info("state:" + state[0][9]);
+            if (state.size() > 0 && state[0][9] == "FINISHED") {
+                return true
+            }
+            retry++
+            if (retry >= 60) {
+                return false
+            }
+            Thread.sleep(5000)
+        }
+        return false
+    }
+
+    def truncate = { table_name ->
+        create_stream_load_table(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 5) {
+                getRowCount(total, table_name)
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ truncate table ${table_name}; """
+                        break
+                    } catch (Exception e) {
+                        logger.info("select count get exception", e);
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+                total = 0;
+            }
+            insert_data(i, table_name)
+        }
+        logger.info("process truncate total:" + total)
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def delete = { table_name ->
+        create_stream_load_table(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        def rowCount = sql """select count(*) from 
${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;"""
+                        logger.info("rowCount:" + rowCount)
+                        sql """ delete from ${table_name} where l_orderkey 
>=1000000 and l_orderkey <=5000000; """
+                        total -= rowCount[0][0]
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+            }
+            insert_data(i, table_name)
+        }
+        logger.info("process delete total:" + total)
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def drop_column = { table_name ->
+        create_stream_load_table(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} DROP column 
l_receiptdate; """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+            }
+            if (i < 5) {
+                insert_data(i, table_name)
+            } else {
+                insert_data_less_column(i, table_name)
+            }
+        }
+        logger.info("process drop column total:" + total)
+        assertTrue(getAlterTableState(table_name), "drop column should 
success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def add_column = { table_name ->
+        create_stream_load_table_less_column(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} ADD column 
l_receiptdate DATEV2 after l_commitdate; """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+            }
+            if (i < 5) {
+                insert_data_less_column(i, table_name)
+            } else {
+                insert_data(i, table_name)
+            }
+        }
+        logger.info("process add column total:" + total)
+        assertTrue(getAlterTableState(table_name), "add column should success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def change_order = { table_name ->
+        create_stream_load_table(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 2) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} order by 
(l_orderkey,l_shipdate,l_linenumber, 
l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment);
 """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+            }
+            insert_data(i, table_name)
+        }
+        logger.info("process change order total:" + total)
+        assertTrue(getAlterTableState(table_name), "modify column order should 
success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+
+    def process = { table_name ->
+        for (int i = 1; i <= 5; i++) {
+            switch (i) {
+                case SC.TRUNCATE_TABLE.value:
+                    truncate(table_name)
+                    break
+                case SC.DELETE.value:
+                    delete(table_name)
+                    break
+                case SC.DROP_COLUMN.value:
+                    drop_column(table_name)
+                    break
+                case SC.ADD_COLUMN.value:
+                    add_column(table_name)
+                    break
+                case SC.CHANGE_ORDER.value:
+                    change_order(table_name)
+                    break
+            }
+        }
+    }
+
+    try {
+        process(stream_load_table)
+    } finally {
+
+    }
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy
new file mode 100644
index 00000000000..64bb09f271c
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+String[] getFiles(String dirName, int num) {
+    File[] datas = new File(dirName).listFiles()
+    if (num != datas.length) {
+        throw new Exception("num not equals,expect:" + num + " vs real:" + 
datas.length)
+    }
+    String[] array = new String[datas.length];
+    for (int i = 0; i < datas.length; i++) {
+        array[i] = datas[i].getPath();
+    }
+    Arrays.sort(array);
+    return array;
+}
+
+suite("test_group_commit_insert_into_lineitem_multiple_client") {
+    String[] file_array;
+    def prepare = {
+        def dataDir = "${context.config.cacheDataPath}/lineitem/"
+        File dir = new File(dataDir)
+        if (!dir.exists()) {
+            new File("${context.config.cacheDataPath}/lineitem/").mkdir()
+            for (int i = 1; i <= 10; i++) {
+                logger.info("download lineitem.tbl.${i}")
+                def download_file = """/usr/bin/curl 
${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
+--output 
${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
+            }
+        }
+        file_array = getFiles(dataDir, 10)
+        for (String s : file_array) {
+            logger.info(s)
+        }
+    }
+    def insert_table = "test_insert_into_lineitem_multiple_client_sf1"
+    def batch = 100;
+    def total = 0;
+    def rwLock = new ReentrantReadWriteLock();
+    def wlock = rwLock.writeLock();
+
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            Thread.sleep(5000)
+            retry++
+        }
+    }
+
+    def create_insert_table = {
+        // create table
+        sql """ drop table if exists ${insert_table}; """
+
+        sql """
+   CREATE TABLE ${insert_table} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+        sql """ set enable_insert_group_commit = true; """
+        sql """ set enable_nereids_dml = false; """
+    }
+
+    def do_insert_into = { file_name ->
+        logger.info("file:" + file_name)
+        sql """ set enable_insert_group_commit = true; """
+        sql """ set enable_nereids_dml = false; """
+        //read and insert
+        BufferedReader reader;
+        try {
+            reader = new BufferedReader(new FileReader(file_name));
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+
+        String s = null;
+        StringBuilder sb = null;
+        int c = 0;
+        int t = 0;
+        while (true) {
+            try {
+                if (c == batch) {
+                    sb.append(";");
+                    String exp = sb.toString();
+                    while (true) {
+                        try {
+                            def result = insert_into_sql(exp, c);
+                            logger.info("result:" + result);
+                            break
+                        } catch (Exception e) {
+                            logger.info("got exception:" + e)
+                        }
+                    }
+                    c = 0;
+                }
+                s = reader.readLine();
+                if (s != null) {
+                    if (c == 0) {
+                        sb = new StringBuilder();
+                        sb.append("insert into ${insert_table} (l_orderkey, 
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, 
l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+                    }
+                    if (c > 0) {
+                        sb.append(",");
+                    }
+                    String[] array = s.split("\\|");
+                    sb.append("(");
+                    for (int i = 0; i < array.length; i++) {
+                        sb.append("\"" + array[i] + "\"");
+                        if (i != array.length - 1) {
+                            sb.append(",");
+                        }
+                    }
+                    sb.append(")");
+                    c++;
+                    t++;
+                } else if (c > 0) {
+                    sb.append(";");
+                    String exp = sb.toString();
+                    while (true) {
+                        try {
+                            def result = insert_into_sql(exp, c);
+                            logger.info("result:" + result);
+                            break
+                        } catch (Exception e) {
+                            logger.info("got exception:" + e)
+                        }
+                    }
+                    break;
+                } else {
+                    break;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        {
+            logger.info("t: " + t)
+            wlock.lock()
+            total += t;
+            wlock.unlock()
+        }
+
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+    def process = {
+        def threads = []
+        for (int k = 0; k < file_array.length; k++) {
+            int n = k;
+            String file_name = file_array[n]
+            logger.info("insert into file:" + file_name)
+            threads.add(Thread.startDaemon {
+                do_insert_into(file_name)
+            })
+        }
+        for (Thread th in threads) {
+            th.join()
+        }
+
+        getRowCount(total, insert_table)
+        qt_sql """ select count(*) from ${insert_table}; """
+    }
+
+    try {
+        prepare()
+        create_insert_table()
+        process()
+    } finally {
+
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy
new file mode 100644
index 00000000000..722b6973356
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+String[] getFiles(String dirName, int num) {
+    File[] datas = new File(dirName).listFiles()
+    if (num != datas.length) {
+        throw new Exception("num not equals,expect:" + num + " vs real:" + 
datas.length)
+    }
+    String[] array = new String[datas.length];
+    for (int i = 0; i < datas.length; i++) {
+        array[i] = datas[i].getPath();
+    }
+    Arrays.sort(array);
+    return array;
+}
+
+suite("test_group_commit_insert_into_lineitem_multiple_table") {
+    String[] file_array;
+    def prepare = {
+        def dataDir = "${context.config.cacheDataPath}/lineitem/"
+        File dir = new File(dataDir)
+        if (!dir.exists()) {
+            new File("${context.config.cacheDataPath}/lineitem/").mkdir()
+            for (int i = 1; i <= 10; i++) {
+                logger.info("download lineitem.tbl.${i}")
+                def download_file = """/usr/bin/curl 
${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
+--output 
${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
+            }
+        }
+        file_array = getFiles(dataDir, 10)
+        for (String s : file_array) {
+            logger.info(s)
+        }
+    }
+    def insert_table_base = "test_insert_into_lineitem_multiple_table_sf1"
+    def batch = 100;
+    def total = 0;
+    def rwLock = new ReentrantReadWriteLock();
+    def wlock = rwLock.writeLock();
+
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            Thread.sleep(5000)
+            retry++
+        }
+    }
+
+    def create_insert_table = { table_name ->
+        // create table
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+        sql """ set enable_insert_group_commit = true; """
+        sql """ set enable_nereids_dml = false; """
+    }
+
+    def do_insert_into = { file_name, table_name ->
+        sql """ set enable_insert_group_commit = true; """
+        sql """ set enable_nereids_dml = false; """
+        logger.info("file:" + file_name)
+        //read and insert
+        BufferedReader reader;
+        try {
+            reader = new BufferedReader(new FileReader(file_name));
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+
+        String s = null;
+        StringBuilder sb = null;
+        int c = 0;
+        int t = 0;
+        while (true) {
+            try {
+                if (c == batch) {
+                    sb.append(";");
+                    String exp = sb.toString();
+                    while (true) {
+                        try {
+                            def result = insert_into_sql(exp, c);
+                            logger.info("result:" + result);
+                            break
+                        } catch (Exception e) {
+                            logger.info("got exception:" + e)
+                        }
+                    }
+                    c = 0;
+                }
+                s = reader.readLine();
+                if (s != null) {
+                    if (c == 0) {
+                        sb = new StringBuilder();
+                        sb.append("insert into ${table_name} (l_orderkey, 
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, 
l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+                    }
+                    if (c > 0) {
+                        sb.append(",");
+                    }
+                    String[] array = s.split("\\|");
+                    sb.append("(");
+                    for (int i = 0; i < array.length; i++) {
+                        sb.append("\"" + array[i] + "\"");
+                        if (i != array.length - 1) {
+                            sb.append(",");
+                        }
+                    }
+                    sb.append(")");
+                    c++;
+                    t++;
+                } else if (c > 0) {
+                    sb.append(";");
+                    String exp = sb.toString();
+                    while (true) {
+                        try {
+                            def result = insert_into_sql(exp, c);
+                            logger.info("result:" + result);
+                            break
+                        } catch (Exception e) {
+                            logger.info("got exception:" + e)
+                        }
+                    }
+                    break;
+                } else {
+                    break;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        {
+            logger.info("t: " + t)
+            wlock.lock()
+            total += t;
+            wlock.unlock()
+        }
+
+        if (reader != null) {
+            reader.close();
+        }
+        getRowCount(t, table_name)
+    }
+
+    def process = {
+        def threads = []
+        for (int k = 0; k < file_array.length; k++) {
+            int n = k;
+            String file_name = file_array[n]
+            String table_name = insert_table_base + "_" + n;
+            create_insert_table(table_name)
+            logger.info("insert into file:" + file_name)
+            threads.add(Thread.startDaemon {
+                do_insert_into(file_name, table_name)
+            })
+        }
+        for (Thread th in threads) {
+            th.join()
+        }
+
+        for (int k = 0; k < file_array.length; k++) {
+            String table_name = insert_table_base + "_" + k;
+            qt_sql """ select count(*) from ${table_name}; """
+        }
+    }
+
+    try {
+        prepare()
+        process()
+    } finally {
+
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy
new file mode 100644
index 00000000000..250255958ee
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+String[] getFiles(String dirName, int num) {
+    File[] datas = new File(dirName).listFiles()
+    if (num != datas.length) {
+        throw new Exception("num not equals,expect:" + num + " vs real:" + 
datas.length)
+    }
+    String[] array = new String[datas.length];
+    for (int i = 0; i < datas.length; i++) {
+        array[i] = datas[i].getPath();
+    }
+    Arrays.sort(array);
+    return array;
+}
+
+suite("test_group_commit_insert_into_lineitem_normal") {
+    String[] file_array;
+    def prepare = {
+        def dataDir = "${context.config.cacheDataPath}/lineitem/"
+        File dir = new File(dataDir)
+        if (!dir.exists()) {
+            new File("${context.config.cacheDataPath}/lineitem/").mkdir()
+            for (int i = 1; i <= 10; i++) {
+                logger.info("download lineitem.tbl.${i}")
+                def download_file = """/usr/bin/curl 
${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
+--output 
${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
+            }
+        }
+        file_array = getFiles(dataDir, 10)
+        for (String s : file_array) {
+            logger.info(s)
+        }
+    }
+    def insert_table = "test_insert_into_lineitem_sf1"
+    def batch = 100;
+    def count = 0;
+    def total = 0;
+
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            Thread.sleep(5000)
+            retry++
+        }
+    }
+
+    def create_insert_table = {
+        // create table
+        sql """ drop table if exists ${insert_table}; """
+
+        sql """
+   CREATE TABLE ${insert_table} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+        sql """ set enable_insert_group_commit = true; """
+        sql """ set enable_nereids_dml = false; """
+    }
+
+    def process = {
+        for (String file : file_array) {
+            logger.info("insert into file: " + file)
+            BufferedReader reader;
+            try {
+                reader = new BufferedReader(new FileReader(file));
+            } catch (FileNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+
+            String s = null;
+            StringBuilder sb = null;
+            count = 0;
+            while (true) {
+                try {
+                    if (count == batch) {
+                        sb.append(";");
+                        String exp = sb.toString();
+                        while (true) {
+                            try {
+                                def result = insert_into_sql(exp, count);
+                                logger.info("result:" + result);
+                                break
+                            } catch (Exception e) {
+                                logger.info("got exception:" + e)
+                            }
+                        }
+                        count = 0;
+                    }
+                    s = reader.readLine();
+                    if (s != null) {
+                        if (count == 0) {
+                            sb = new StringBuilder();
+                            sb.append("insert into ${insert_table} 
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, 
l_discount, l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+                        }
+                        if (count > 0) {
+                            sb.append(",");
+                        }
+                        String[] array = s.split("\\|");
+                        sb.append("(");
+                        for (int i = 0; i < array.length; i++) {
+                            sb.append("\"" + array[i] + "\"");
+                            if (i != array.length - 1) {
+                                sb.append(",");
+                            }
+                        }
+                        sb.append(")");
+                        count++;
+                        total++;
+                    } else if (count > 0) {
+                        sb.append(";");
+                        String exp = sb.toString();
+                        while (true) {
+                            try {
+                                def result = insert_into_sql(exp, count);
+                                logger.info("result:" + result);
+                                break
+                            } catch (Exception e) {
+                                logger.info("got exception:" + e)
+                            }
+                        }
+                        break;
+                    } else {
+                        break;
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            if (reader != null) {
+                reader.close();
+            }
+        }
+        logger.info("total: " + total)
+        getRowCount(total, insert_table)
+
+        qt_sql """select count(*) from ${insert_table};"""
+    }
+
+    try {
+        prepare()
+        create_insert_table()
+        for (int i = 0; i < 1; i++) {
+            process()
+        }
+    } finally {
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy
new file mode 100644
index 00000000000..fba562f819e
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy
@@ -0,0 +1,448 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+enum SC {
+    TRUNCATE_TABLE(1),
+    ADD_COLUMN(2),
+    DELETE(3),
+    DROP_COLUMN(4),
+    CHANGE_ORDER(5)
+    private int value
+
+    SC(int value) {
+        this.value = value
+    }
+
+    int getValue() {
+        return value
+    }
+}
+
+enum STATE {
+    NORMAL(1),
+    BEFORE_ADD_COLUMN(2),
+    DROP_COLUMN(3)
+    private int value
+
+    STATE(int value) {
+        this.value = value
+    }
+
+    int getValue() {
+        return value
+    }
+}
+
+String[] getFiles(String dirName, int num) {
+    File[] datas = new File(dirName).listFiles()
+    if (num != datas.length) {
+        throw new Exception("num not equals,expect:" + num + " vs real:" + 
datas.length)
+    }
+    String[] array = new String[datas.length];
+    for (int i = 0; i < datas.length; i++) {
+        array[i] = datas[i].getPath();
+    }
+    Arrays.sort(array);
+    return array;
+}
+
+suite("test_group_commit_insert_into_lineitem_scheme_change") {
+    String[] file_array;
+    def prepare = {
+        def dataDir = "${context.config.cacheDataPath}/lineitem/"
+        File dir = new File(dataDir)
+        if (!dir.exists()) {
+            new File("${context.config.cacheDataPath}/lineitem/").mkdir()
+            for (int i = 1; i <= 10; i++) {
+                logger.info("download lineitem.tbl.${i}")
+                def download_file = """/usr/bin/curl 
${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
+--output 
${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
+            }
+        }
+        file_array = getFiles(dataDir, 10)
+        for (String s : file_array) {
+            logger.info(s)
+        }
+    }
+    def insert_table = "test_lineitem_scheme_change_sf1"
+    def batch = 100;
+    def count = 0;
+    def total = 0;
+
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            Thread.sleep(5000)
+            retry++
+        }
+    }
+
+    def create_insert_table = { table_name ->
+        // create table
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+        sql """ set enable_insert_group_commit = true; """
+        sql """ set enable_nereids_dml = false; """
+    }
+
+    def create_insert_table_less_column = { table_name ->
+        // create table
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+        sql """ set enable_insert_group_commit = true; """
+        sql """ set enable_nereids_dml = false; """
+
+    }
+
+    def insert_data = { file_name, table_name, index ->
+        BufferedReader reader;
+        try {
+            reader = new BufferedReader(new FileReader(file_name));
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+
+        String s = null;
+        StringBuilder sb = null;
+        count = 0;
+        while (true) {
+            try {
+                if (count == batch) {
+                    sb.append(";");
+                    String exp = sb.toString();
+                    while (true) {
+                        try {
+                            def result = insert_into_sql(exp, count);
+                            logger.info("result:" + result);
+                            break
+                        } catch (Exception e) {
+                            logger.info("got exception:" + e)
+                        }
+                    }
+                    count = 0;
+                }
+                s = reader.readLine();
+                if (s != null) {
+                    if (count == 0) {
+                        sb = new StringBuilder();
+                        if (index == STATE.BEFORE_ADD_COLUMN.value) {
+                            sb.append("insert into ${table_name} (l_orderkey, 
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, 
l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+                        } else if (index == STATE.NORMAL.value) {
+                            sb.append("insert into ${table_name} (l_orderkey, 
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, 
l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+                        } else if (index == STATE.DROP_COLUMN.value) {
+                            sb.append("insert into ${table_name} (l_orderkey, 
l_partkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, 
l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+                        }
+                    }
+                    if (count > 0) {
+                        sb.append(",");
+                    }
+                    String[] array = s.split("\\|");
+                    sb.append("(");
+                    for (int i = 0; i < array.length; i++) {
+                        if (index == STATE.BEFORE_ADD_COLUMN.value && i == 11) 
{
+                            continue;
+                        } else if (index == STATE.DROP_COLUMN.value && i == 2) 
{
+                            continue;
+                        }
+                        sb.append("\"" + array[i] + "\"");
+                        if (i != array.length - 1) {
+                            sb.append(",");
+                        }
+                    }
+                    sb.append(")");
+                    count++;
+                    total++;
+                } else if (count > 0) {
+                    sb.append(";");
+                    String exp = sb.toString();
+                    while (true) {
+                        try {
+                            def result = insert_into_sql(exp, count);
+                            logger.info("result:" + result);
+                            break
+                        } catch (Exception e) {
+                            logger.info("got exception:" + e)
+                        }
+                    }
+                    break;
+                } else {
+                    break;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+    def getAlterTableState = { table_name ->
+        def retry = 0
+        while (true) {
+            def state = sql "show alter table column where tablename = 
'${table_name}' order by CreateTime desc "
+            logger.info("alter table state: ${state}")
+            logger.info("state:" + state[0][9]);
+            if (state.size() > 0 && state[0][9] == "FINISHED") {
+                return true
+            }
+            retry++
+            if (retry >= 60) {
+                return false
+            }
+            Thread.sleep(5000)
+        }
+        return false
+    }
+
+    def truncate = { table_name ->
+        create_insert_table(table_name)
+        total = 0;
+        for (int i = 0; i < file_array.length; i++) {
+            String fileName = file_array[i]
+            logger.info("process file:" + fileName)
+            if (i == 5) {
+                getRowCount(total, table_name)
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ truncate table ${table_name}; """
+                        break
+                    } catch (Exception e) {
+                        logger.info("select count get exception", e);
+                    }
+                    retry++
+                    Thread.sleep(2000)
+                }
+                total = 0;
+            }
+            insert_data(fileName, table_name, STATE.NORMAL.value)
+        }
+        logger.info("process truncate total:" + total)
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def delete = { table_name ->
+        create_insert_table(table_name)
+        total = 0;
+        for (int i = 0; i < file_array.length; i++) {
+            String fileName = file_array[i]
+            logger.info("process file:" + fileName)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        def rowCount = sql """select count(*) from 
${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;"""
+                        log.info("rowCount:" + rowCount[0][0])
+                        sql """ delete from ${table_name} where l_orderkey 
>=1000000 and l_orderkey <=5000000; """
+                        total -= rowCount[0][0]
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    retry++
+                    Thread.sleep(2000)
+                }
+            }
+            insert_data(fileName, table_name, STATE.NORMAL.value)
+        }
+        logger.info("process delete total:" + total)
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def drop_column = { table_name ->
+        create_insert_table(table_name)
+        total = 0;
+        for (int i = 0; i < file_array.length; i++) {
+            String fileName = file_array[i]
+            logger.info("process file:" + fileName)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} DROP column 
l_suppkey; """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    retry++
+                    Thread.sleep(2000)
+                }
+            }
+            if (i < 5) {
+                insert_data(fileName, table_name, STATE.NORMAL.value)
+            } else {
+                insert_data(fileName, table_name, STATE.DROP_COLUMN.value)
+            }
+        }
+        logger.info("process drop column total:" + total)
+        assertTrue(getAlterTableState(table_name), "drop column should 
success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def add_column = { table_name ->
+        create_insert_table_less_column(table_name)
+        total = 0;
+        for (int i = 0; i < file_array.length; i++) {
+            String fileName = file_array[i]
+            logger.info("process file:" + fileName)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} ADD column 
l_receiptdate DATEV2 after l_commitdate; """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    retry++
+                    Thread.sleep(2000)
+                }
+            }
+            if (i < 5) {
+                insert_data(fileName, table_name, 
STATE.BEFORE_ADD_COLUMN.value)
+            } else {
+                insert_data(fileName, table_name, STATE.NORMAL.value)
+            }
+        }
+        logger.info("process add column total:" + total)
+        assertTrue(getAlterTableState(table_name), "add column should success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def change_order = { table_name ->
+        create_insert_table(table_name)
+        total = 0;
+        for (int i = 0; i < file_array.length; i++) {
+            String fileName = file_array[i]
+            logger.info("process file:" + fileName)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} order by 
(l_orderkey,l_shipdate,l_linenumber, 
l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment);
 """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    retry++
+                    Thread.sleep(2000)
+                }
+            }
+            insert_data(fileName, table_name, STATE.NORMAL.value)
+        }
+        logger.info("process change order total:" + total)
+        assertTrue(getAlterTableState(table_name), "modify column order should 
success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+
+    def process = { table_name ->
+        for (int i = 1; i <= 5; i++) {
+            switch (i) {
+                case SC.TRUNCATE_TABLE.value:
+                    truncate(table_name)
+                    break
+                case SC.DELETE.value:
+                    delete(table_name)
+                    break
+                case SC.DROP_COLUMN.value:
+                    drop_column(table_name)
+                    break
+                case SC.ADD_COLUMN.value:
+                    add_column(table_name)
+                    break
+                case SC.CHANGE_ORDER.value:
+                    change_order(table_name)
+                    break
+            }
+        }
+    }
+
+    try {
+        prepare()
+        process(insert_table)
+    } finally {
+
+    }
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy
new file mode 100644
index 00000000000..aa7c6ffc571
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+suite("test_group_commit_stream_load_lineitem_multiple_client") {
+    def stream_load_table = "test_stream_load_lineitem_multiple_client_sf1"
+    def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity, l_extendedprice, l_discount, 
+l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment"""
+    int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 
601365, 599301, 600504, 599715, 600659};
+    def total = 0;
+    def rwLock = new ReentrantReadWriteLock();
+    def wlock = rwLock.writeLock();
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            retry++
+            sleep(5000)
+        }
+    }
+
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
+    def create_stream_load_table = {
+        sql """ drop table if exists ${stream_load_table}; """
+
+        sql """
+   CREATE TABLE ${stream_load_table} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+    }
+
+    def do_stream_load = { i ->
+        logger.info("file:" + i)
+        streamLoad {
+            table stream_load_table
+
+            set 'column_separator', '|'
+            set 'columns', columns + ",lo_dummy"
+            set 'group_commit', 'true'
+            unset 'label'
+            file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, rowCountArray[i - 1], 
rowCountArray[i - 1], 0, 0)
+            }
+        }
+
+        logger.info("load num: " + rowCountArray[i - 1])
+        wlock.lock()
+        total += rowCountArray[i - 1];
+        wlock.unlock()
+
+    }
+
+    def process = {
+        def threads = []
+        for (int k = 1; k <= 10; k++) {
+            int n = k;
+            logger.info("insert into file:" + n)
+            threads.add(Thread.startDaemon {
+                do_stream_load(n)
+            })
+        }
+        for (Thread th in threads) {
+            th.join()
+        }
+
+        logger.info("total:" + total)
+        getRowCount(total, stream_load_table)
+
+        qt_sql """ select count(*) from ${stream_load_table}; """
+    }
+
+    try {
+        create_stream_load_table()
+        process()
+    } finally {
+
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy
new file mode 100644
index 00000000000..58d00e1f163
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_stream_load_lineitem_multiple_table") {
+    def stream_load_table_base = "test_stream_load_lineitem_multiple_table"
+    def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity, l_extendedprice, l_discount, 
+l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment"""
+    int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 
601365, 599301, 600504, 599715, 600659};
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            retry++
+            sleep(5000)
+        }
+    }
+
+    def create_stream_load_table = { table_name ->
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+    }
+
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
+    def do_stream_load = { i, table_name ->
+        logger.info("file:" + i)
+
+        streamLoad {
+            table table_name
+
+            set 'column_separator', '|'
+            set 'columns', columns + ",lo_dummy"
+            set 'group_commit', 'true'
+            unset 'label'
+            file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, rowCountArray[i - 1], 
rowCountArray[i - 1], 0, 0)
+            }
+        }
+        getRowCount(rowCountArray[i - 1], table_name)
+    }
+
+    def process = {
+        def threads = []
+        for (int k = 1; k <= 10; k++) {
+            int n = k;
+            String table_name = stream_load_table_base + "_" + n;
+            create_stream_load_table(table_name)
+            threads.add(Thread.startDaemon {
+                do_stream_load(n, table_name)
+            })
+        }
+        for (Thread th in threads) {
+            th.join()
+        }
+
+        for (int k = 1; k <= 10; k++) {
+            String table_name = stream_load_table_base + "_" + k;
+            qt_sql """ select count(*) from ${table_name}; """
+        }
+    }
+
+    try {
+        process()
+    } finally {
+
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy
new file mode 100644
index 00000000000..1924396718a
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_stream_load_lineitem_normal") {
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            retry++
+            Thread.sleep(5000)
+        }
+    }
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+    def stream_load_table = "test_stream_load_lineitem_normal_sf1"
+    def create_stream_load_table = {
+        sql """ drop table if exists ${stream_load_table}; """
+
+        sql """
+   CREATE TABLE ${stream_load_table} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+    }
+    int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 
601365, 599301, 600504, 599715, 600659};
+    def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity, l_extendedprice, l_discount, 
+l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment"""
+    def process = {
+        int total = 0;
+        for (int k = 0; k < 3; k++) {
+            for (int i = 1; i <= 10; i++) {
+                streamLoad {
+                    table stream_load_table
+
+                    set 'column_separator', '|'
+                    set 'columns', columns + ",lo_dummy"
+                    set 'group_commit', 'true'
+                    unset 'label'
+                    file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" 
+ i
+
+                    check { result, exception, startTime, endTime ->
+                        checkStreamLoadResult(exception, result, 
rowCountArray[i - 1], rowCountArray[i - 1], 0, 0)
+                    }
+                }
+                total += rowCountArray[i - 1];
+            }
+            getRowCount(total, stream_load_table)
+            qt_sql """ select count(*) from ${stream_load_table} """
+        }
+    }
+    try {
+        create_stream_load_table()
+        process()
+    } finally {
+
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy
 
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy
new file mode 100644
index 00000000000..1be1d9180b5
--- /dev/null
+++ 
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+enum SC {
+    TRUNCATE_TABLE(1),
+    ADD_COLUMN(2),
+    DELETE(3),
+    DROP_COLUMN(4),
+    CHANGE_ORDER(5)
+    private int value
+
+    SC(int value) {
+        this.value = value
+    }
+
+    int getValue() {
+        return value
+    }
+}
+
+enum STATE {
+    NORMAL(1),
+    BEFORE_ADD_COLUMN(2),
+    DROP_COLUMN(3)
+    private int value
+
+    STATE(int value) {
+        this.value = value
+    }
+
+    int getValue() {
+        return value
+    }
+}
+
+suite("test_group_commit_stream_load_lineitem_schema_change") {
+    def stream_load_table = "test_stream_load_lineitem_schema_change_sf1"
+    def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity, l_extendedprice, l_discount, 
+l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment"""
+    int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 
601365, 599301, 600504, 599715, 600659};
+    def total = 0;
+    def getRowCount = { expectedRowCount, table_name ->
+        def retry = 0
+        while (retry < 60) {
+            try {
+                def rowCount = sql "select count(*) from ${table_name}"
+                logger.info("rowCount: " + rowCount + ", retry: " + retry)
+                if (rowCount[0][0] >= expectedRowCount) {
+                    break
+                }
+            } catch (Exception e) {
+                logger.info("select count get exception", e);
+            }
+            retry++
+            Thread.sleep(5000)
+        }
+    }
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
+    def create_stream_load_table = { table_name ->
+        // create table
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_receiptdate DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+    }
+
+    def create_stream_load_table_less_column = { table_name ->
+        // create table
+        sql """ drop table if exists ${table_name}; """
+
+        sql """
+   CREATE TABLE ${table_name} (
+    l_shipdate    DATEV2 NOT NULL,
+    l_orderkey    bigint NOT NULL,
+    l_linenumber  int not null,
+    l_partkey     int NOT NULL,
+    l_suppkey     int not null,
+    l_quantity    decimalv3(15, 2) NOT NULL,
+    l_extendedprice  decimalv3(15, 2) NOT NULL,
+    l_discount    decimalv3(15, 2) NOT NULL,
+    l_tax         decimalv3(15, 2) NOT NULL,
+    l_returnflag  VARCHAR(1) NOT NULL,
+    l_linestatus  VARCHAR(1) NOT NULL,
+    l_commitdate  DATEV2 NOT NULL,
+    l_shipinstruct VARCHAR(25) NOT NULL,
+    l_shipmode     VARCHAR(10) NOT NULL,
+    l_comment      VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+    "replication_num" = "1"
+);
+        """
+
+    }
+
+    def insert_data = { i, table_name ->
+        streamLoad {
+            table table_name
+
+            set 'column_separator', '|'
+            set 'columns', columns + ",lo_dummy"
+            set 'group_commit', 'true'
+            unset 'label'
+            file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, rowCountArray[i - 1], 
rowCountArray[i - 1], 0, 0)
+            }
+        }
+        total += rowCountArray[i - 1];
+    }
+
+    def getAlterTableState = { table_name ->
+        def retry = 0
+        while (true) {
+            def state = sql "show alter table column where tablename = 
'${table_name}' order by CreateTime desc "
+            logger.info("alter table state: ${state}")
+            logger.info("state:" + state[0][9]);
+            if (state.size() > 0 && state[0][9] == "FINISHED") {
+                return true
+            }
+            retry++
+            if (retry >= 60) {
+                return false
+            }
+            Thread.sleep(5000)
+        }
+        return false
+    }
+
+    def truncate = { table_name ->
+        create_stream_load_table(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 5) {
+                getRowCount(total, table_name)
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ truncate table ${table_name}; """
+                        break
+                    } catch (Exception e) {
+                        logger.info("select count get exception", e);
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+                total = 0;
+            }
+            insert_data(i, table_name)
+        }
+        logger.info("process truncate total:" + total)
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def delete = { table_name ->
+        create_stream_load_table(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        def rowCount = sql """select count(*) from 
${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;"""
+                        logger.info("rowCount:" + rowCount)
+                        sql """ delete from ${table_name} where l_orderkey 
>=1000000 and l_orderkey <=5000000; """
+                        total -= rowCount[0][0]
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+            }
+            insert_data(i, table_name)
+        }
+        logger.info("process delete total:" + total)
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def drop_column = { table_name ->
+        create_stream_load_table(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} DROP column 
l_suppkey; """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+            }
+            insert_data(i, table_name)
+        }
+        logger.info("process drop column total:" + total)
+        assertTrue(getAlterTableState(table_name), "drop column should 
success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def add_column = { table_name ->
+        create_stream_load_table_less_column(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 5) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} ADD column 
l_receiptdate DATEV2 after l_commitdate; """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+            }
+            insert_data(i, table_name)
+        }
+        logger.info("process add column total:" + total)
+        assertTrue(getAlterTableState(table_name), "add column should success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+    def change_order = { table_name ->
+        create_stream_load_table(table_name)
+        total = 0;
+        for (int i = 1; i <= 10; i++) {
+            logger.info("process file:" + i)
+            if (i == 2) {
+                def retry = 0
+                while (retry < 10) {
+                    try {
+                        sql """ alter table ${table_name} order by 
(l_orderkey,l_shipdate,l_linenumber, 
l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment);
 """
+                        break
+                    } catch (Exception e) {
+                        log.info("exception:", e)
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+            }
+            insert_data(i, table_name)
+        }
+        logger.info("process change order total:" + total)
+        assertTrue(getAlterTableState(table_name), "modify column order should 
success")
+        getRowCount(total, table_name)
+        qt_sql """ select count(*) from ${table_name}; """
+    }
+
+
+    def process = { table_name ->
+        for (int i = 1; i <= 5; i++) {
+            switch (i) {
+                case SC.TRUNCATE_TABLE.value:
+                    truncate(table_name)
+                    break
+                case SC.DELETE.value:
+                    delete(table_name)
+                    break
+                case SC.DROP_COLUMN.value:
+                    drop_column(table_name)
+                    break
+                case SC.ADD_COLUMN.value:
+                    add_column(table_name)
+                    break
+                case SC.CHANGE_ORDER.value:
+                    change_order(table_name)
+                    break
+            }
+        }
+    }
+
+    try {
+        process(stream_load_table)
+    } finally {
+
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to