This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new add160b768e [improvement](regression-test) add more group commit regression-test (#26952) add160b768e is described below commit add160b768ea68776ecd256772acd9de861cee34 Author: huanghaibin <284824...@qq.com> AuthorDate: Wed Nov 15 00:01:13 2023 +0800 [improvement](regression-test) add more group commit regression-test (#26952) --- ...commit_http_stream_lineitem_multiple_client.out | 4 + ..._commit_http_stream_lineitem_multiple_table.out | 31 ++ ...st_group_commit_http_stream_lineitem_normal.out | 10 + ...p_commit_http_stream_lineitem_schema_change.out | 16 + ...commit_insert_into_lineitem_multiple_client.out | 4 + ..._commit_insert_into_lineitem_multiple_table.out | 31 ++ ...st_group_commit_insert_into_lineitem_normal.out | 4 + ...p_commit_insert_into_lineitem_scheme_change.out | 16 + ...commit_stream_load_lineitem_multiple_client.out | 4 + ..._commit_stream_load_lineitem_multiple_table.out | 31 ++ ...st_group_commit_stream_load_lineitem_normal.out | 10 + ...p_commit_stream_load_lineitem_schema_change.out | 16 + .../org/apache/doris/regression/suite/Suite.groovy | 6 + ...mit_http_stream_lineitem_multiple_client.groovy | 149 +++++++ ...mmit_http_stream_lineitem_multiple_table.groovy | 140 +++++++ ...group_commit_http_stream_lineitem_normal.groovy | 122 ++++++ ...ommit_http_stream_lineitem_schema_change.groovy | 385 ++++++++++++++++++ ...mit_insert_into_lineitem_multiple_client.groovy | 219 ++++++++++ ...mmit_insert_into_lineitem_multiple_table.groovy | 225 +++++++++++ ...group_commit_insert_into_lineitem_normal.groovy | 192 +++++++++ ...ommit_insert_into_lineitem_scheme_change.groovy | 448 +++++++++++++++++++++ ...mit_stream_load_lineitem_multiple_client.groovy | 146 +++++++ ...mmit_stream_load_lineitem_multiple_table.groovy | 137 +++++++ ...group_commit_stream_load_lineitem_normal.groovy | 119 ++++++ ...ommit_stream_load_lineitem_schema_change.groovy | 353 ++++++++++++++++ 25 files changed, 2818 insertions(+) diff --git a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out new file mode 100644 index 00000000000..ef23823e23e --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +6001215 + diff --git a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out new file mode 100644 index 00000000000..7495e1a6b6b --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out @@ -0,0 +1,31 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +600572 + +-- !sql -- +599397 + +-- !sql -- +600124 + +-- !sql -- +599647 + +-- !sql -- +599931 + +-- !sql -- +601365 + +-- !sql -- +599301 + +-- !sql -- +600504 + +-- !sql -- +599715 + +-- !sql -- +600659 + diff --git a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out new file mode 100644 index 00000000000..bac125e32fc --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +6001215 + +-- !sql -- +12002430 + +-- !sql -- +18003645 + diff --git a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out new file mode 100644 index 00000000000..1be3e26c88e --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +3601475 + +-- !sql -- +6001215 + +-- !sql -- +6001215 + +-- !sql -- +6001215 + +-- !sql -- +6001215 + diff --git a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out new file mode 100644 index 00000000000..ef23823e23e --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +6001215 + diff --git a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out new file mode 100644 index 00000000000..86534c17502 --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out @@ -0,0 +1,31 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +600572 + +-- !sql -- +600659 + +-- !sql -- +599397 + +-- !sql -- +600124 + +-- !sql -- +599647 + +-- !sql -- +599931 + +-- !sql -- +601365 + +-- !sql -- +599301 + +-- !sql -- +600504 + +-- !sql -- +599715 + diff --git a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out new file mode 100644 index 00000000000..ef23823e23e --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +6001215 + diff --git a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out new file mode 100644 index 00000000000..4f530919417 --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +3000816 + +-- !sql -- +6001215 + +-- !sql -- +4673070 + +-- !sql -- +6001215 + +-- !sql -- +6001215 + diff --git a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out new file mode 100644 index 00000000000..ef23823e23e --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +6001215 + diff --git a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out new file mode 100644 index 00000000000..7495e1a6b6b --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out @@ -0,0 +1,31 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +600572 + +-- !sql -- +599397 + +-- !sql -- +600124 + +-- !sql -- +599647 + +-- !sql -- +599931 + +-- !sql -- +601365 + +-- !sql -- +599301 + +-- !sql -- +600504 + +-- !sql -- +599715 + +-- !sql -- +600659 + diff --git a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out new file mode 100644 index 00000000000..bac125e32fc --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +6001215 + +-- !sql -- +12002430 + +-- !sql -- +18003645 + diff --git a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out new file mode 100644 index 00000000000..1be3e26c88e --- /dev/null +++ b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +3601475 + +-- !sql -- +6001215 + +-- !sql -- +6001215 + +-- !sql -- +6001215 + +-- !sql -- +6001215 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index c9e8fe4215e..9913a5381c8 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -266,6 +266,12 @@ class Suite implements GroovyInterceptable { return result } + List<List<Object>> insert_into_sql(String sqlStr, int num) { + logger.info("insert into " + num + " records") + def (result, meta) = JdbcUtils.executeToList(context.getConnection(), sqlStr) + return result + } + def sql_return_maparray(String sqlStr) { logger.info("Execute sql: ${sqlStr}".toString()) def (result, meta) = JdbcUtils.executeToList(context.getConnection(), sqlStr) diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy new file mode 100644 index 00000000000..1bf941fb57a --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +suite("test_group_commit_http_stream_lineitem_multiple_client") { + def db = "regression_test_insert_p2" + def stream_load_table = "test_http_stream_lineitem_multiple_client_sf1" + int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 601365, 599301, 600504, 599715, 600659}; + def total = 0; + def rwLock = new ReentrantReadWriteLock(); + def wlock = rwLock.writeLock(); + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + sleep(5000) + } + } + + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def create_stream_load_table = { + sql """ drop table if exists ${stream_load_table}; """ + + sql """ + CREATE TABLE ${stream_load_table} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + } + + def do_stream_load = { i -> + logger.info("file:" + i) + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${stream_load_table}(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, +l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct, +l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16 from http_stream + ("format"="csv", "column_separator"="|") + """ + + set 'group_commit', 'true' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + unset 'label' + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + + logger.info("load num: " + rowCountArray[i - 1]) + wlock.lock() + total += rowCountArray[i - 1]; + wlock.unlock() + + } + + def process = { + def threads = [] + for (int k = 1; k <= 10; k++) { + int n = k; + logger.info("insert into file:" + n) + threads.add(Thread.startDaemon { + do_stream_load(n) + }) + } + for (Thread th in threads) { + th.join() + } + + logger.info("total:" + total) + getRowCount(total, stream_load_table) + + qt_sql """ select count(*) from ${stream_load_table}; """ + } + + try { + create_stream_load_table() + process() + } finally { + + } +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy new file mode 100644 index 00000000000..03932d96cd3 --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_http_stream_lineitem_multiple_table") { + def db = "regression_test_insert_p2" + def stream_load_table_base = "test_http_stream_lineitem_multiple_table" + int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 601365, 599301, 600504, 599715, 600659}; + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + sleep(5000) + } + } + + def create_stream_load_table = { table_name -> + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + } + + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def do_stream_load = { i, table_name -> + logger.info("file:" + i) + + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${table_name}(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, +l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct, +l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16 from http_stream + ("format"="csv", "column_separator"="|") + """ + + set 'group_commit', 'true' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + unset 'label' + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + getRowCount(rowCountArray[i - 1], table_name) + } + + def process = { + def threads = [] + for (int k = 1; k <= 10; k++) { + int n = k; + String table_name = stream_load_table_base + "_" + n; + create_stream_load_table(table_name) + threads.add(Thread.startDaemon { + do_stream_load(n, table_name) + }) + } + for (Thread th in threads) { + th.join() + } + + for (int k = 1; k <= 10; k++) { + String table_name = stream_load_table_base + "_" + k; + qt_sql """ select count(*) from ${table_name}; """ + } + } + + try { + process() + } finally { + + } + + +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy new file mode 100644 index 00000000000..76ff680af11 --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_http_stream_lineitem_normal") { + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + Thread.sleep(5000) + } + } + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + def db = "regression_test_insert_p2" + def stream_load_table = "test_stream_load_lineitem_normal_sf1" + def create_stream_load_table = { + sql """ drop table if exists ${stream_load_table}; """ + + sql """ + CREATE TABLE ${stream_load_table} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + } + int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 601365, 599301, 600504, 599715, 600659}; + def process = { + int total = 0; + for (int k = 0; k < 3; k++) { + for (int i = 1; i <= 10; i++) { + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${stream_load_table}(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, +l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct, +l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16 from http_stream + ("format"="csv", "column_separator"="|") + """ + + set 'group_commit', 'true' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + unset 'label' + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + total += rowCountArray[i - 1]; + } + getRowCount(total, stream_load_table) + qt_sql """ select count(*) from ${stream_load_table} """ + } + } + try { + create_stream_load_table() + process() + } finally { + + } + + +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy new file mode 100644 index 00000000000..4a8359fb9c4 --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy @@ -0,0 +1,385 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +enum SC { + TRUNCATE_TABLE(1), + ADD_COLUMN(2), + DELETE(3), + DROP_COLUMN(4), + CHANGE_ORDER(5) + private int value + + SC(int value) { + this.value = value + } + + int getValue() { + return value + } +} + +enum STATE { + NORMAL(1), + BEFORE_ADD_COLUMN(2), + DROP_COLUMN(3) + private int value + + STATE(int value) { + this.value = value + } + + int getValue() { + return value + } +} + +suite("test_group_commit_http_stream_lineitem_schema_change") { + def db = "regression_test_insert_p2" + def stream_load_table = "test_http_stream_lineitem_schema_change_sf1" + int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 601365, 599301, 600504, 599715, 600659}; + def total = 0; + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + Thread.sleep(5000) + } + } + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def create_stream_load_table = { table_name -> + // create table + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + } + + def create_stream_load_table_less_column = { table_name -> + // create table + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + + } + + def insert_data = { i, table_name -> + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${stream_load_table}(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, +l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct, +l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16 from http_stream + ("format"="csv", "column_separator"="|") + """ + + set 'group_commit', 'true' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + unset 'label' + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + total += rowCountArray[i - 1]; + } + + def insert_data_less_column = { i, table_name -> + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${stream_load_table}(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, +l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_shipinstruct,l_shipmode, +l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c14, c15, c16 from http_stream + ("format"="csv", "column_separator"="|") + """ + + set 'group_commit', 'true' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + unset 'label' + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + total += rowCountArray[i - 1]; + } + + def getAlterTableState = { table_name -> + def retry = 0 + while (true) { + def state = sql "show alter table column where tablename = '${table_name}' order by CreateTime desc " + logger.info("alter table state: ${state}") + logger.info("state:" + state[0][9]); + if (state.size() > 0 && state[0][9] == "FINISHED") { + return true + } + retry++ + if (retry >= 60) { + return false + } + Thread.sleep(5000) + } + return false + } + + def truncate = { table_name -> + create_stream_load_table(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 5) { + getRowCount(total, table_name) + def retry = 0 + while (retry < 10) { + try { + sql """ truncate table ${table_name}; """ + break + } catch (Exception e) { + logger.info("select count get exception", e); + } + Thread.sleep(2000) + retry++ + } + total = 0; + } + insert_data(i, table_name) + } + logger.info("process truncate total:" + total) + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def delete = { table_name -> + create_stream_load_table(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + def rowCount = sql """select count(*) from ${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;""" + logger.info("rowCount:" + rowCount) + sql """ delete from ${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000; """ + total -= rowCount[0][0] + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ + } + } + insert_data(i, table_name) + } + logger.info("process delete total:" + total) + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def drop_column = { table_name -> + create_stream_load_table(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} DROP column l_receiptdate; """ + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ + } + } + if (i < 5) { + insert_data(i, table_name) + } else { + insert_data_less_column(i, table_name) + } + } + logger.info("process drop column total:" + total) + assertTrue(getAlterTableState(table_name), "drop column should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def add_column = { table_name -> + create_stream_load_table_less_column(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} ADD column l_receiptdate DATEV2 after l_commitdate; """ + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ + } + } + if (i < 5) { + insert_data_less_column(i, table_name) + } else { + insert_data(i, table_name) + } + } + logger.info("process add column total:" + total) + assertTrue(getAlterTableState(table_name), "add column should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def change_order = { table_name -> + create_stream_load_table(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 2) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} order by (l_orderkey,l_shipdate,l_linenumber, l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment); """ + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ + } + } + insert_data(i, table_name) + } + logger.info("process change order total:" + total) + assertTrue(getAlterTableState(table_name), "modify column order should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + + def process = { table_name -> + for (int i = 1; i <= 5; i++) { + switch (i) { + case SC.TRUNCATE_TABLE.value: + truncate(table_name) + break + case SC.DELETE.value: + delete(table_name) + break + case SC.DROP_COLUMN.value: + drop_column(table_name) + break + case SC.ADD_COLUMN.value: + add_column(table_name) + break + case SC.CHANGE_ORDER.value: + change_order(table_name) + break + } + } + } + + try { + process(stream_load_table) + } finally { + + } + +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy new file mode 100644 index 00000000000..64bb09f271c --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +String[] getFiles(String dirName, int num) { + File[] datas = new File(dirName).listFiles() + if (num != datas.length) { + throw new Exception("num not equals,expect:" + num + " vs real:" + datas.length) + } + String[] array = new String[datas.length]; + for (int i = 0; i < datas.length; i++) { + array[i] = datas[i].getPath(); + } + Arrays.sort(array); + return array; +} + +suite("test_group_commit_insert_into_lineitem_multiple_client") { + String[] file_array; + def prepare = { + def dataDir = "${context.config.cacheDataPath}/lineitem/" + File dir = new File(dataDir) + if (!dir.exists()) { + new File("${context.config.cacheDataPath}/lineitem/").mkdir() + for (int i = 1; i <= 10; i++) { + logger.info("download lineitem.tbl.${i}") + def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i} +--output ${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText() + } + } + file_array = getFiles(dataDir, 10) + for (String s : file_array) { + logger.info(s) + } + } + def insert_table = "test_insert_into_lineitem_multiple_client_sf1" + def batch = 100; + def total = 0; + def rwLock = new ReentrantReadWriteLock(); + def wlock = rwLock.writeLock(); + + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + Thread.sleep(5000) + retry++ + } + } + + def create_insert_table = { + // create table + sql """ drop table if exists ${insert_table}; """ + + sql """ + CREATE TABLE ${insert_table} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + } + + def do_insert_into = { file_name -> + logger.info("file:" + file_name) + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + //read and insert + BufferedReader reader; + try { + reader = new BufferedReader(new FileReader(file_name)); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + + String s = null; + StringBuilder sb = null; + int c = 0; + int t = 0; + while (true) { + try { + if (c == batch) { + sb.append(";"); + String exp = sb.toString(); + while (true) { + try { + def result = insert_into_sql(exp, c); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + } + c = 0; + } + s = reader.readLine(); + if (s != null) { + if (c == 0) { + sb = new StringBuilder(); + sb.append("insert into ${insert_table} (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES"); + } + if (c > 0) { + sb.append(","); + } + String[] array = s.split("\\|"); + sb.append("("); + for (int i = 0; i < array.length; i++) { + sb.append("\"" + array[i] + "\""); + if (i != array.length - 1) { + sb.append(","); + } + } + sb.append(")"); + c++; + t++; + } else if (c > 0) { + sb.append(";"); + String exp = sb.toString(); + while (true) { + try { + def result = insert_into_sql(exp, c); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + } + break; + } else { + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + { + logger.info("t: " + t) + wlock.lock() + total += t; + wlock.unlock() + } + + if (reader != null) { + reader.close(); + } + } + + def process = { + def threads = [] + for (int k = 0; k < file_array.length; k++) { + int n = k; + String file_name = file_array[n] + logger.info("insert into file:" + file_name) + threads.add(Thread.startDaemon { + do_insert_into(file_name) + }) + } + for (Thread th in threads) { + th.join() + } + + getRowCount(total, insert_table) + qt_sql """ select count(*) from ${insert_table}; """ + } + + try { + prepare() + create_insert_table() + process() + } finally { + + } +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy new file mode 100644 index 00000000000..722b6973356 --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +String[] getFiles(String dirName, int num) { + File[] datas = new File(dirName).listFiles() + if (num != datas.length) { + throw new Exception("num not equals,expect:" + num + " vs real:" + datas.length) + } + String[] array = new String[datas.length]; + for (int i = 0; i < datas.length; i++) { + array[i] = datas[i].getPath(); + } + Arrays.sort(array); + return array; +} + +suite("test_group_commit_insert_into_lineitem_multiple_table") { + String[] file_array; + def prepare = { + def dataDir = "${context.config.cacheDataPath}/lineitem/" + File dir = new File(dataDir) + if (!dir.exists()) { + new File("${context.config.cacheDataPath}/lineitem/").mkdir() + for (int i = 1; i <= 10; i++) { + logger.info("download lineitem.tbl.${i}") + def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i} +--output ${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText() + } + } + file_array = getFiles(dataDir, 10) + for (String s : file_array) { + logger.info(s) + } + } + def insert_table_base = "test_insert_into_lineitem_multiple_table_sf1" + def batch = 100; + def total = 0; + def rwLock = new ReentrantReadWriteLock(); + def wlock = rwLock.writeLock(); + + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + Thread.sleep(5000) + retry++ + } + } + + def create_insert_table = { table_name -> + // create table + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + } + + def do_insert_into = { file_name, table_name -> + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + logger.info("file:" + file_name) + //read and insert + BufferedReader reader; + try { + reader = new BufferedReader(new FileReader(file_name)); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + + String s = null; + StringBuilder sb = null; + int c = 0; + int t = 0; + while (true) { + try { + if (c == batch) { + sb.append(";"); + String exp = sb.toString(); + while (true) { + try { + def result = insert_into_sql(exp, c); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + } + c = 0; + } + s = reader.readLine(); + if (s != null) { + if (c == 0) { + sb = new StringBuilder(); + sb.append("insert into ${table_name} (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES"); + } + if (c > 0) { + sb.append(","); + } + String[] array = s.split("\\|"); + sb.append("("); + for (int i = 0; i < array.length; i++) { + sb.append("\"" + array[i] + "\""); + if (i != array.length - 1) { + sb.append(","); + } + } + sb.append(")"); + c++; + t++; + } else if (c > 0) { + sb.append(";"); + String exp = sb.toString(); + while (true) { + try { + def result = insert_into_sql(exp, c); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + } + break; + } else { + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + { + logger.info("t: " + t) + wlock.lock() + total += t; + wlock.unlock() + } + + if (reader != null) { + reader.close(); + } + getRowCount(t, table_name) + } + + def process = { + def threads = [] + for (int k = 0; k < file_array.length; k++) { + int n = k; + String file_name = file_array[n] + String table_name = insert_table_base + "_" + n; + create_insert_table(table_name) + logger.info("insert into file:" + file_name) + threads.add(Thread.startDaemon { + do_insert_into(file_name, table_name) + }) + } + for (Thread th in threads) { + th.join() + } + + for (int k = 0; k < file_array.length; k++) { + String table_name = insert_table_base + "_" + k; + qt_sql """ select count(*) from ${table_name}; """ + } + } + + try { + prepare() + process() + } finally { + + } + + +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy new file mode 100644 index 00000000000..250255958ee --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +String[] getFiles(String dirName, int num) { + File[] datas = new File(dirName).listFiles() + if (num != datas.length) { + throw new Exception("num not equals,expect:" + num + " vs real:" + datas.length) + } + String[] array = new String[datas.length]; + for (int i = 0; i < datas.length; i++) { + array[i] = datas[i].getPath(); + } + Arrays.sort(array); + return array; +} + +suite("test_group_commit_insert_into_lineitem_normal") { + String[] file_array; + def prepare = { + def dataDir = "${context.config.cacheDataPath}/lineitem/" + File dir = new File(dataDir) + if (!dir.exists()) { + new File("${context.config.cacheDataPath}/lineitem/").mkdir() + for (int i = 1; i <= 10; i++) { + logger.info("download lineitem.tbl.${i}") + def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i} +--output ${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText() + } + } + file_array = getFiles(dataDir, 10) + for (String s : file_array) { + logger.info(s) + } + } + def insert_table = "test_insert_into_lineitem_sf1" + def batch = 100; + def count = 0; + def total = 0; + + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + Thread.sleep(5000) + retry++ + } + } + + def create_insert_table = { + // create table + sql """ drop table if exists ${insert_table}; """ + + sql """ + CREATE TABLE ${insert_table} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + } + + def process = { + for (String file : file_array) { + logger.info("insert into file: " + file) + BufferedReader reader; + try { + reader = new BufferedReader(new FileReader(file)); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + + String s = null; + StringBuilder sb = null; + count = 0; + while (true) { + try { + if (count == batch) { + sb.append(";"); + String exp = sb.toString(); + while (true) { + try { + def result = insert_into_sql(exp, count); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + } + count = 0; + } + s = reader.readLine(); + if (s != null) { + if (count == 0) { + sb = new StringBuilder(); + sb.append("insert into ${insert_table} (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES"); + } + if (count > 0) { + sb.append(","); + } + String[] array = s.split("\\|"); + sb.append("("); + for (int i = 0; i < array.length; i++) { + sb.append("\"" + array[i] + "\""); + if (i != array.length - 1) { + sb.append(","); + } + } + sb.append(")"); + count++; + total++; + } else if (count > 0) { + sb.append(";"); + String exp = sb.toString(); + while (true) { + try { + def result = insert_into_sql(exp, count); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + } + break; + } else { + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (reader != null) { + reader.close(); + } + } + logger.info("total: " + total) + getRowCount(total, insert_table) + + qt_sql """select count(*) from ${insert_table};""" + } + + try { + prepare() + create_insert_table() + for (int i = 0; i < 1; i++) { + process() + } + } finally { + } +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy new file mode 100644 index 00000000000..fba562f819e --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy @@ -0,0 +1,448 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +enum SC { + TRUNCATE_TABLE(1), + ADD_COLUMN(2), + DELETE(3), + DROP_COLUMN(4), + CHANGE_ORDER(5) + private int value + + SC(int value) { + this.value = value + } + + int getValue() { + return value + } +} + +enum STATE { + NORMAL(1), + BEFORE_ADD_COLUMN(2), + DROP_COLUMN(3) + private int value + + STATE(int value) { + this.value = value + } + + int getValue() { + return value + } +} + +String[] getFiles(String dirName, int num) { + File[] datas = new File(dirName).listFiles() + if (num != datas.length) { + throw new Exception("num not equals,expect:" + num + " vs real:" + datas.length) + } + String[] array = new String[datas.length]; + for (int i = 0; i < datas.length; i++) { + array[i] = datas[i].getPath(); + } + Arrays.sort(array); + return array; +} + +suite("test_group_commit_insert_into_lineitem_scheme_change") { + String[] file_array; + def prepare = { + def dataDir = "${context.config.cacheDataPath}/lineitem/" + File dir = new File(dataDir) + if (!dir.exists()) { + new File("${context.config.cacheDataPath}/lineitem/").mkdir() + for (int i = 1; i <= 10; i++) { + logger.info("download lineitem.tbl.${i}") + def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i} +--output ${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText() + } + } + file_array = getFiles(dataDir, 10) + for (String s : file_array) { + logger.info(s) + } + } + def insert_table = "test_lineitem_scheme_change_sf1" + def batch = 100; + def count = 0; + def total = 0; + + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + Thread.sleep(5000) + retry++ + } + } + + def create_insert_table = { table_name -> + // create table + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + } + + def create_insert_table_less_column = { table_name -> + // create table + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + + } + + def insert_data = { file_name, table_name, index -> + BufferedReader reader; + try { + reader = new BufferedReader(new FileReader(file_name)); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + + String s = null; + StringBuilder sb = null; + count = 0; + while (true) { + try { + if (count == batch) { + sb.append(";"); + String exp = sb.toString(); + while (true) { + try { + def result = insert_into_sql(exp, count); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + } + count = 0; + } + s = reader.readLine(); + if (s != null) { + if (count == 0) { + sb = new StringBuilder(); + if (index == STATE.BEFORE_ADD_COLUMN.value) { + sb.append("insert into ${table_name} (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_shipinstruct,l_shipmode,l_comment)VALUES"); + } else if (index == STATE.NORMAL.value) { + sb.append("insert into ${table_name} (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES"); + } else if (index == STATE.DROP_COLUMN.value) { + sb.append("insert into ${table_name} (l_orderkey, l_partkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES"); + } + } + if (count > 0) { + sb.append(","); + } + String[] array = s.split("\\|"); + sb.append("("); + for (int i = 0; i < array.length; i++) { + if (index == STATE.BEFORE_ADD_COLUMN.value && i == 11) { + continue; + } else if (index == STATE.DROP_COLUMN.value && i == 2) { + continue; + } + sb.append("\"" + array[i] + "\""); + if (i != array.length - 1) { + sb.append(","); + } + } + sb.append(")"); + count++; + total++; + } else if (count > 0) { + sb.append(";"); + String exp = sb.toString(); + while (true) { + try { + def result = insert_into_sql(exp, count); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + } + break; + } else { + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (reader != null) { + reader.close(); + } + } + + def getAlterTableState = { table_name -> + def retry = 0 + while (true) { + def state = sql "show alter table column where tablename = '${table_name}' order by CreateTime desc " + logger.info("alter table state: ${state}") + logger.info("state:" + state[0][9]); + if (state.size() > 0 && state[0][9] == "FINISHED") { + return true + } + retry++ + if (retry >= 60) { + return false + } + Thread.sleep(5000) + } + return false + } + + def truncate = { table_name -> + create_insert_table(table_name) + total = 0; + for (int i = 0; i < file_array.length; i++) { + String fileName = file_array[i] + logger.info("process file:" + fileName) + if (i == 5) { + getRowCount(total, table_name) + def retry = 0 + while (retry < 10) { + try { + sql """ truncate table ${table_name}; """ + break + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + Thread.sleep(2000) + } + total = 0; + } + insert_data(fileName, table_name, STATE.NORMAL.value) + } + logger.info("process truncate total:" + total) + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def delete = { table_name -> + create_insert_table(table_name) + total = 0; + for (int i = 0; i < file_array.length; i++) { + String fileName = file_array[i] + logger.info("process file:" + fileName) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + def rowCount = sql """select count(*) from ${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;""" + log.info("rowCount:" + rowCount[0][0]) + sql """ delete from ${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000; """ + total -= rowCount[0][0] + break + } catch (Exception e) { + log.info("exception:", e) + } + retry++ + Thread.sleep(2000) + } + } + insert_data(fileName, table_name, STATE.NORMAL.value) + } + logger.info("process delete total:" + total) + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def drop_column = { table_name -> + create_insert_table(table_name) + total = 0; + for (int i = 0; i < file_array.length; i++) { + String fileName = file_array[i] + logger.info("process file:" + fileName) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} DROP column l_suppkey; """ + break + } catch (Exception e) { + log.info("exception:", e) + } + retry++ + Thread.sleep(2000) + } + } + if (i < 5) { + insert_data(fileName, table_name, STATE.NORMAL.value) + } else { + insert_data(fileName, table_name, STATE.DROP_COLUMN.value) + } + } + logger.info("process drop column total:" + total) + assertTrue(getAlterTableState(table_name), "drop column should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def add_column = { table_name -> + create_insert_table_less_column(table_name) + total = 0; + for (int i = 0; i < file_array.length; i++) { + String fileName = file_array[i] + logger.info("process file:" + fileName) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} ADD column l_receiptdate DATEV2 after l_commitdate; """ + break + } catch (Exception e) { + log.info("exception:", e) + } + retry++ + Thread.sleep(2000) + } + } + if (i < 5) { + insert_data(fileName, table_name, STATE.BEFORE_ADD_COLUMN.value) + } else { + insert_data(fileName, table_name, STATE.NORMAL.value) + } + } + logger.info("process add column total:" + total) + assertTrue(getAlterTableState(table_name), "add column should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def change_order = { table_name -> + create_insert_table(table_name) + total = 0; + for (int i = 0; i < file_array.length; i++) { + String fileName = file_array[i] + logger.info("process file:" + fileName) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} order by (l_orderkey,l_shipdate,l_linenumber, l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment); """ + break + } catch (Exception e) { + log.info("exception:", e) + } + retry++ + Thread.sleep(2000) + } + } + insert_data(fileName, table_name, STATE.NORMAL.value) + } + logger.info("process change order total:" + total) + assertTrue(getAlterTableState(table_name), "modify column order should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + + def process = { table_name -> + for (int i = 1; i <= 5; i++) { + switch (i) { + case SC.TRUNCATE_TABLE.value: + truncate(table_name) + break + case SC.DELETE.value: + delete(table_name) + break + case SC.DROP_COLUMN.value: + drop_column(table_name) + break + case SC.ADD_COLUMN.value: + add_column(table_name) + break + case SC.CHANGE_ORDER.value: + change_order(table_name) + break + } + } + } + + try { + prepare() + process(insert_table) + } finally { + + } + +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy new file mode 100644 index 00000000000..aa7c6ffc571 --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +suite("test_group_commit_stream_load_lineitem_multiple_client") { + def stream_load_table = "test_stream_load_lineitem_multiple_client_sf1" + def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, +l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment""" + int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 601365, 599301, 600504, 599715, 600659}; + def total = 0; + def rwLock = new ReentrantReadWriteLock(); + def wlock = rwLock.writeLock(); + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + sleep(5000) + } + } + + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def create_stream_load_table = { + sql """ drop table if exists ${stream_load_table}; """ + + sql """ + CREATE TABLE ${stream_load_table} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + } + + def do_stream_load = { i -> + logger.info("file:" + i) + streamLoad { + table stream_load_table + + set 'column_separator', '|' + set 'columns', columns + ",lo_dummy" + set 'group_commit', 'true' + unset 'label' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + + logger.info("load num: " + rowCountArray[i - 1]) + wlock.lock() + total += rowCountArray[i - 1]; + wlock.unlock() + + } + + def process = { + def threads = [] + for (int k = 1; k <= 10; k++) { + int n = k; + logger.info("insert into file:" + n) + threads.add(Thread.startDaemon { + do_stream_load(n) + }) + } + for (Thread th in threads) { + th.join() + } + + logger.info("total:" + total) + getRowCount(total, stream_load_table) + + qt_sql """ select count(*) from ${stream_load_table}; """ + } + + try { + create_stream_load_table() + process() + } finally { + + } +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy new file mode 100644 index 00000000000..58d00e1f163 --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_stream_load_lineitem_multiple_table") { + def stream_load_table_base = "test_stream_load_lineitem_multiple_table" + def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, +l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment""" + int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 601365, 599301, 600504, 599715, 600659}; + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + sleep(5000) + } + } + + def create_stream_load_table = { table_name -> + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + } + + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def do_stream_load = { i, table_name -> + logger.info("file:" + i) + + streamLoad { + table table_name + + set 'column_separator', '|' + set 'columns', columns + ",lo_dummy" + set 'group_commit', 'true' + unset 'label' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + getRowCount(rowCountArray[i - 1], table_name) + } + + def process = { + def threads = [] + for (int k = 1; k <= 10; k++) { + int n = k; + String table_name = stream_load_table_base + "_" + n; + create_stream_load_table(table_name) + threads.add(Thread.startDaemon { + do_stream_load(n, table_name) + }) + } + for (Thread th in threads) { + th.join() + } + + for (int k = 1; k <= 10; k++) { + String table_name = stream_load_table_base + "_" + k; + qt_sql """ select count(*) from ${table_name}; """ + } + } + + try { + process() + } finally { + + } + + +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy new file mode 100644 index 00000000000..1924396718a --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_stream_load_lineitem_normal") { + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + Thread.sleep(5000) + } + } + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + def stream_load_table = "test_stream_load_lineitem_normal_sf1" + def create_stream_load_table = { + sql """ drop table if exists ${stream_load_table}; """ + + sql """ + CREATE TABLE ${stream_load_table} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + } + int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 601365, 599301, 600504, 599715, 600659}; + def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, +l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment""" + def process = { + int total = 0; + for (int k = 0; k < 3; k++) { + for (int i = 1; i <= 10; i++) { + streamLoad { + table stream_load_table + + set 'column_separator', '|' + set 'columns', columns + ",lo_dummy" + set 'group_commit', 'true' + unset 'label' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + total += rowCountArray[i - 1]; + } + getRowCount(total, stream_load_table) + qt_sql """ select count(*) from ${stream_load_table} """ + } + } + try { + create_stream_load_table() + process() + } finally { + + } + + +} \ No newline at end of file diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy new file mode 100644 index 00000000000..1be1d9180b5 --- /dev/null +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy @@ -0,0 +1,353 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +enum SC { + TRUNCATE_TABLE(1), + ADD_COLUMN(2), + DELETE(3), + DROP_COLUMN(4), + CHANGE_ORDER(5) + private int value + + SC(int value) { + this.value = value + } + + int getValue() { + return value + } +} + +enum STATE { + NORMAL(1), + BEFORE_ADD_COLUMN(2), + DROP_COLUMN(3) + private int value + + STATE(int value) { + this.value = value + } + + int getValue() { + return value + } +} + +suite("test_group_commit_stream_load_lineitem_schema_change") { + def stream_load_table = "test_stream_load_lineitem_schema_change_sf1" + def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, +l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment""" + int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931, 601365, 599301, 600504, 599715, 600659}; + def total = 0; + def getRowCount = { expectedRowCount, table_name -> + def retry = 0 + while (retry < 60) { + try { + def rowCount = sql "select count(*) from ${table_name}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + } catch (Exception e) { + logger.info("select count get exception", e); + } + retry++ + Thread.sleep(5000) + } + } + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def create_stream_load_table = { table_name -> + // create table + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_receiptdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + } + + def create_stream_load_table_less_column = { table_name -> + // create table + sql """ drop table if exists ${table_name}; """ + + sql """ + CREATE TABLE ${table_name} ( + l_shipdate DATEV2 NOT NULL, + l_orderkey bigint NOT NULL, + l_linenumber int not null, + l_partkey int NOT NULL, + l_suppkey int not null, + l_quantity decimalv3(15, 2) NOT NULL, + l_extendedprice decimalv3(15, 2) NOT NULL, + l_discount decimalv3(15, 2) NOT NULL, + l_tax decimalv3(15, 2) NOT NULL, + l_returnflag VARCHAR(1) NOT NULL, + l_linestatus VARCHAR(1) NOT NULL, + l_commitdate DATEV2 NOT NULL, + l_shipinstruct VARCHAR(25) NOT NULL, + l_shipmode VARCHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL +)ENGINE=OLAP +DUPLICATE KEY(`l_shipdate`, `l_orderkey`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 +PROPERTIES ( + "replication_num" = "1" +); + """ + + } + + def insert_data = { i, table_name -> + streamLoad { + table table_name + + set 'column_separator', '|' + set 'columns', columns + ",lo_dummy" + set 'group_commit', 'true' + unset 'label' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + total += rowCountArray[i - 1]; + } + + def getAlterTableState = { table_name -> + def retry = 0 + while (true) { + def state = sql "show alter table column where tablename = '${table_name}' order by CreateTime desc " + logger.info("alter table state: ${state}") + logger.info("state:" + state[0][9]); + if (state.size() > 0 && state[0][9] == "FINISHED") { + return true + } + retry++ + if (retry >= 60) { + return false + } + Thread.sleep(5000) + } + return false + } + + def truncate = { table_name -> + create_stream_load_table(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 5) { + getRowCount(total, table_name) + def retry = 0 + while (retry < 10) { + try { + sql """ truncate table ${table_name}; """ + break + } catch (Exception e) { + logger.info("select count get exception", e); + } + Thread.sleep(2000) + retry++ + } + total = 0; + } + insert_data(i, table_name) + } + logger.info("process truncate total:" + total) + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def delete = { table_name -> + create_stream_load_table(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + def rowCount = sql """select count(*) from ${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;""" + logger.info("rowCount:" + rowCount) + sql """ delete from ${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000; """ + total -= rowCount[0][0] + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ + } + } + insert_data(i, table_name) + } + logger.info("process delete total:" + total) + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def drop_column = { table_name -> + create_stream_load_table(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} DROP column l_suppkey; """ + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ + } + } + insert_data(i, table_name) + } + logger.info("process drop column total:" + total) + assertTrue(getAlterTableState(table_name), "drop column should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def add_column = { table_name -> + create_stream_load_table_less_column(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 5) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} ADD column l_receiptdate DATEV2 after l_commitdate; """ + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ + } + } + insert_data(i, table_name) + } + logger.info("process add column total:" + total) + assertTrue(getAlterTableState(table_name), "add column should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + def change_order = { table_name -> + create_stream_load_table(table_name) + total = 0; + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (i == 2) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} order by (l_orderkey,l_shipdate,l_linenumber, l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment); """ + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ + } + } + insert_data(i, table_name) + } + logger.info("process change order total:" + total) + assertTrue(getAlterTableState(table_name), "modify column order should success") + getRowCount(total, table_name) + qt_sql """ select count(*) from ${table_name}; """ + } + + + def process = { table_name -> + for (int i = 1; i <= 5; i++) { + switch (i) { + case SC.TRUNCATE_TABLE.value: + truncate(table_name) + break + case SC.DELETE.value: + delete(table_name) + break + case SC.DROP_COLUMN.value: + drop_column(table_name) + break + case SC.ADD_COLUMN.value: + add_column(table_name) + break + case SC.CHANGE_ORDER.value: + change_order(table_name) + break + } + } + } + + try { + process(stream_load_table) + } finally { + + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org