This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8a169b99060 [case](regression) Test enable pipeline load (#28172) 8a169b99060 is described below commit 8a169b9906006f538a5ba1f0131e4f79607b45df Author: HowardQin <hao....@esgyn.cn> AuthorDate: Thu Dec 28 10:49:19 2023 +0800 [case](regression) Test enable pipeline load (#28172) Co-authored-by: qinhao <qin...@newland.com.cn> --- .../main/java/org/apache/doris/common/Config.java | 2 +- .../load_p0/stream_load/test_pipeline_load.out | 7 + .../load_p0/stream_load/test_pipeline_load.groovy | 171 +++++++++++++++++++++ 3 files changed, 179 insertions(+), 1 deletion(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 4c92449cb57..eae58ddaafd 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1565,7 +1565,7 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean enable_quantile_state_type = true; - @ConfField + @ConfField(mutable = true) public static boolean enable_pipeline_load = false; /*---------------------- JOB CONFIG START------------------------*/ diff --git a/regression-test/data/load_p0/stream_load/test_pipeline_load.out b/regression-test/data/load_p0/stream_load/test_pipeline_load.out new file mode 100644 index 00000000000..f3d24fe05d7 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_pipeline_load.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !pipeline_load_enabled -- +2500 + +-- !pipeline_load_enabled_exception -- +0 + diff --git a/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy b/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy new file mode 100644 index 00000000000..ad0aa89b17d --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_pipeline_load", "nonConcurrent") { + + // Stream load with enable_pipeline_load = true + def config_row = sql """ ADMIN SHOW FRONTEND CONFIG LIKE 'enable_pipeline_load'; """ + String old_value = config_row[0][1] + sql """ ADMIN SET FRONTEND CONFIG ("enable_pipeline_load" = "true"); """ + def tableName = "pipeline_all_types" + try { + // Using table definations and data from test_stream_load.groovy + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL, + `k2` tinyint(4) NULL, + `k3` smallint(6) NULL, + `k4` bigint(20) NULL, + `k5` largeint(40) NULL, + `k6` float NULL, + `k7` double NULL, + `k8` decimal(9, 0) NULL, + `k9` char(10) NULL, + `k10` varchar(1024) NULL, + `k11` text NULL, + `k12` date NULL, + `k13` datetime NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + + file 'all_types.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(2500, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + + sql "sync" + qt_pipeline_load_enabled """ SELECT COUNT(*) FROM ${tableName} """ + + } finally { + sql """ DROP TABLE IF EXISTS ${tableName} FORCE""" + } + + // Stream load with enable_pipeline_load = true and fail + tableName = "pipeline_input_too_long" + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` varchar(48) NULL, + `c2` varchar(48) NULL, + `c3` varchar(48) NULL, + `c4` varchar(48) NULL, + `c5` varchar(48) NULL, + `c6` varchar(48) NULL, + `c7` varchar(48) NULL, + `c8` varchar(48) NULL, + `c9` varchar(48) NULL, + `c10` varchar(48) NULL, + `c11` varchar(48) NULL, + `c12` varchar(48) NULL, + `c13` varchar(48) NULL, + `c14` varchar(48) NULL, + `c15` varchar(48) NULL, + `c16` varchar(48) NULL, + `c17` varchar(48) NULL, + `c18` varchar(48) NULL, + `c19` varchar(48) NULL, + `c20` varchar(48) NULL, + `c21` varchar(48) NULL, + `c22` varchar(48) NULL, + `c23` varchar(48) NULL, + `c24` varchar(48) NULL, + `c25` varchar(48) NULL, + `c26` varchar(48) NULL, + `c27` varchar(48) NULL, + `c28` varchar(48) NULL, + `c29` varchar(48) NULL, + `c30` varchar(48) NULL, + `c31` varchar(48) NULL, + `c32` varchar(48) NULL, + `c33` varchar(48) NULL, + `c34` varchar(48) NULL, + `c35` varchar(48) NULL, + `c36` varchar(48) NULL, + `c37` varchar(48) NULL, + `c38` varchar(48) NULL, + `c39` varchar(48) NULL, + `c40` varchar(48) NULL, + `c41` varchar(48) NULL, + `c42` varchar(48) NULL, + `c43` varchar(48) NULL, + `c44` varchar(48) NULL, + `c45` varchar(48) NULL, + `c46` varchar(48) NULL, + `c47` varchar(48) NULL, + `c48` varchar(48) NULL, + `c49` varchar(48) NULL, + `c50` varchar(48) NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "false" + ); + """ + // Trigger error which is different from non-pipeline load + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_input_long_than_schema.csv' + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("[END_OF_FILE]Encountered unqualified data")) + assertEquals(0, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + qt_pipeline_load_enabled_exception """SELECT COUNT(*) FROM ${tableName}""" + + } finally { + sql """ DROP TABLE IF EXISTS ${tableName} FORCE""" + } + // restore enable_pipeline_load to old_value + sql """ ADMIN SET FRONTEND CONFIG ("enable_pipeline_load" = "${old_value}"); """ + +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org