This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4942677688e3bf722bf9e78f0d241e79addc126b Author: hui lai <1353307...@qq.com> AuthorDate: Tue Jul 30 14:45:24 2024 +0800 [fix](routine load) fix enclose and escape can not set in routine load job (#38402) --- .../doris/analysis/AlterRoutineLoadStmt.java | 8 + .../doris/analysis/CreateRoutineLoadStmt.java | 32 ++-- .../doris/load/routineload/RoutineLoadJob.java | 10 +- .../routine_load/test_routine_load_property.out | 10 ++ .../routine_load/data/test_enclose_and_escape0.csv | 1 + .../routine_load/data/test_enclose_and_escape1.csv | 1 + .../routine_load/test_routine_load_property.groovy | 187 +++++++++++++++++++++ 7 files changed, 235 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java index 67c103ed02c..d2a0844dfb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -68,6 +68,8 @@ public class AlterRoutineLoadStmt extends DdlStmt { .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) .add(CreateRoutineLoadStmt.WORKLOAD_GROUP) + .add(LoadStmt.KEY_ENCLOSE) + .add(LoadStmt.KEY_ESCAPE) .build(); private final LabelName labelName; @@ -250,6 +252,12 @@ public class AlterRoutineLoadStmt extends DdlStmt { .getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), workloadGroup); analyzedJobProperties.put(CreateRoutineLoadStmt.WORKLOAD_GROUP, String.valueOf(wgId)); } + if (jobProperties.containsKey(LoadStmt.KEY_ENCLOSE)) { + analyzedJobProperties.put(LoadStmt.KEY_ENCLOSE, jobProperties.get(LoadStmt.KEY_ENCLOSE)); + } + if (jobProperties.containsKey(LoadStmt.KEY_ESCAPE)) { + analyzedJobProperties.put(LoadStmt.KEY_ESCAPE, jobProperties.get(LoadStmt.KEY_ESCAPE)); + } } private void checkDataSourceProperties() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 95434a1fd19..13654509821 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -141,6 +141,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(LOAD_TO_SINGLE_TABLET) .add(PARTIAL_COLUMNS) .add(WORKLOAD_GROUP) + .add(LoadStmt.KEY_ENCLOSE) + .add(LoadStmt.KEY_ESCAPE) .build(); private final LabelName labelName; @@ -178,9 +180,9 @@ public class CreateRoutineLoadStmt extends DdlStmt { private boolean numAsString = false; private boolean fuzzyParse = false; - private String enclose; + private byte enclose; - private String escape; + private byte escape; private long workloadGroupId = -1; @@ -311,11 +313,11 @@ public class CreateRoutineLoadStmt extends DdlStmt { return jsonPaths; } - public String getEnclose() { + public byte getEnclose() { return enclose; } - public String getEscape() { + public byte getEscape() { return escape; } @@ -507,14 +509,24 @@ public class CreateRoutineLoadStmt extends DdlStmt { loadToSingleTablet = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET), RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET, LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean"); - enclose = jobProperties.get(LoadStmt.KEY_ENCLOSE); - if (enclose != null && enclose.length() != 1) { - throw new AnalysisException("enclose must be single-char"); + + String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE); + if (encloseStr != null) { + if (encloseStr.length() != 1) { + throw new AnalysisException("enclose must be single-char"); + } else { + enclose = encloseStr.getBytes()[0]; + } } - escape = jobProperties.get(LoadStmt.KEY_ESCAPE); - if (escape != null && escape.length() != 1) { - throw new AnalysisException("escape must be single-char"); + String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE); + if (escapeStr != null) { + if (escapeStr.length() != 1) { + throw new AnalysisException("enclose must be single-char"); + } else { + escape = escapeStr.getBytes()[0]; + } } + String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP); if (!StringUtils.isEmpty(inputWorkloadGroupStr)) { this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr() diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index cc503df5880..a25cd999858 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -424,11 +424,13 @@ public abstract class RoutineLoadJob } else { jobProperties.put(PROPS_FUZZY_PARSE, "false"); } - if (stmt.getEnclose() != null) { - jobProperties.put(LoadStmt.KEY_ENCLOSE, stmt.getEnclose()); + if (String.valueOf(stmt.getEnclose()) != null) { + this.enclose = stmt.getEnclose(); + jobProperties.put(LoadStmt.KEY_ENCLOSE, String.valueOf(stmt.getEnclose())); } - if (stmt.getEscape() != null) { - jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape()); + if (String.valueOf(stmt.getEscape()) != null) { + this.escape = stmt.getEscape(); + jobProperties.put(LoadStmt.KEY_ESCAPE, String.valueOf(stmt.getEscape())); } if (stmt.getWorkloadGroupId() > 0) { jobProperties.put(WORKLOAD_GROUP, String.valueOf(stmt.getWorkloadGroupId())); diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_property.out b/regression-test/data/load_p0/routine_load/test_routine_load_property.out new file mode 100644 index 00000000000..62fdfe4fcf3 --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_property.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_enclose_and_escape -- +1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi" + +-- !sql_enclose_and_escape_resume -- +1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi" + +-- !sql_enclose_and_escape_multi_table -- +1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi" + diff --git a/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv new file mode 100644 index 00000000000..0d6530d5686 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv @@ -0,0 +1 @@ +1,eab,cfede,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv new file mode 100644 index 00000000000..8cb2bed0210 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv @@ -0,0 +1 @@ +1,eab,gfegdg,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy new file mode 100644 index 00000000000..9cc1fa0d2d9 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_property","p0") { + // send data to Kafka + def kafkaCsvTpoics = [ + "test_enclose_and_escape0", + "test_enclose_and_escape1", + ] + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + // test create routine load job with enclose and escape + def tableName = "test_routine_load_with_enclose_and_escape" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def jobName = "test_enclose_and_escape" + try { + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "enclose" = "e", + "escape" = "f", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + def res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(1000) + count++ + } + qt_sql_enclose_and_escape "select * from ${tableName} order by k1" + + sql "pause routine load for ${jobName}" + def res = sql "show routine load for ${jobName}" + log.info("routine load job properties: ${res[0][11].toString()}".toString()) + sql "ALTER ROUTINE LOAD FOR ${jobName} PROPERTIES(\"enclose\" = \"g\");" + sql "resume routine load for ${jobName}" + count = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(1000) + count++ + } + qt_sql_enclose_and_escape_resume "select * from ${tableName} order by k1" + } finally { + sql "stop routine load for ${jobName}" + } + + try { + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "enclose" = "e", + "escape" = "f" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + def res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(1000) + count++ + } + + qt_sql_enclose_and_escape_multi_table "select * from ${tableName} order by k1" + } finally { + sql "stop routine load for ${jobName}" + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org