This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 79b07d0b8a4 [fix](routine load) fix enclose and escape can not set in 
routine load job (#38402) (#38825)
79b07d0b8a4 is described below

commit 79b07d0b8a4e179c463cf450f0b991e7e194fd60
Author: hui lai <1353307...@qq.com>
AuthorDate: Sun Aug 4 22:17:12 2024 +0800

    [fix](routine load) fix enclose and escape can not set in routine load job 
(#38402) (#38825)
    
    pick (#38402)
---
 .../doris/analysis/AlterRoutineLoadStmt.java       |   8 +
 .../doris/analysis/CreateRoutineLoadStmt.java      |  32 ++--
 .../doris/load/routineload/RoutineLoadJob.java     |  10 +-
 .../routine_load/test_routine_load_property.out    |  10 ++
 .../routine_load/data/test_enclose_and_escape0.csv |   1 +
 .../routine_load/data/test_enclose_and_escape1.csv |   1 +
 .../routine_load/test_routine_load_property.groovy | 187 +++++++++++++++++++++
 7 files changed, 235 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 5a1f1ba56aa..526b2f179e8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -68,6 +68,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
             .add(CreateRoutineLoadStmt.WORKLOAD_GROUP)
+            .add(LoadStmt.KEY_ENCLOSE)
+            .add(LoadStmt.KEY_ESCAPE)
             .build();
 
     private final LabelName labelName;
@@ -250,6 +252,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
                     
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), workloadGroup);
             analyzedJobProperties.put(CreateRoutineLoadStmt.WORKLOAD_GROUP, 
String.valueOf(wgId));
         }
+        if (jobProperties.containsKey(LoadStmt.KEY_ENCLOSE)) {
+            analyzedJobProperties.put(LoadStmt.KEY_ENCLOSE, 
jobProperties.get(LoadStmt.KEY_ENCLOSE));
+        }
+        if (jobProperties.containsKey(LoadStmt.KEY_ESCAPE)) {
+            analyzedJobProperties.put(LoadStmt.KEY_ESCAPE, 
jobProperties.get(LoadStmt.KEY_ESCAPE));
+        }
     }
 
     private void checkDataSourceProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index d58b25195c7..fb794bfa7b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -141,6 +141,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(LOAD_TO_SINGLE_TABLET)
             .add(PARTIAL_COLUMNS)
             .add(WORKLOAD_GROUP)
+            .add(LoadStmt.KEY_ENCLOSE)
+            .add(LoadStmt.KEY_ESCAPE)
             .build();
 
     private final LabelName labelName;
@@ -178,9 +180,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     private boolean numAsString = false;
     private boolean fuzzyParse = false;
 
-    private String enclose;
+    private byte enclose;
 
-    private String escape;
+    private byte escape;
 
     private long workloadGroupId = -1;
 
@@ -311,11 +313,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         return jsonPaths;
     }
 
-    public String getEnclose() {
+    public byte getEnclose() {
         return enclose;
     }
 
-    public String getEscape() {
+    public byte getEscape() {
         return escape;
     }
 
@@ -507,14 +509,24 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         loadToSingleTablet = 
Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET),
                 RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
                 LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
-        enclose = jobProperties.get(LoadStmt.KEY_ENCLOSE);
-        if (enclose != null && enclose.length() != 1) {
-            throw new AnalysisException("enclose must be single-char");
+
+        String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
+        if (encloseStr != null) {
+            if (encloseStr.length() != 1) {
+                throw new AnalysisException("enclose must be single-char");
+            } else {
+                enclose = encloseStr.getBytes()[0];
+            }
         }
-        escape = jobProperties.get(LoadStmt.KEY_ESCAPE);
-        if (escape != null && escape.length() != 1) {
-            throw new AnalysisException("escape must be single-char");
+        String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
+        if (escapeStr != null) {
+            if (escapeStr.length() != 1) {
+                throw new AnalysisException("enclose must be single-char");
+            } else {
+                escape = escapeStr.getBytes()[0];
+            }
         }
+
         String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
         if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
             this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 0a85bc63ac1..5ef531bb37f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -390,11 +390,13 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         } else {
             jobProperties.put(PROPS_FUZZY_PARSE, "false");
         }
-        if (stmt.getEnclose() != null) {
-            jobProperties.put(LoadStmt.KEY_ENCLOSE, stmt.getEnclose());
+        if (String.valueOf(stmt.getEnclose()) != null) {
+            this.enclose = stmt.getEnclose();
+            jobProperties.put(LoadStmt.KEY_ENCLOSE, 
String.valueOf(stmt.getEnclose()));
         }
-        if (stmt.getEscape() != null) {
-            jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape());
+        if (String.valueOf(stmt.getEscape()) != null) {
+            this.escape = stmt.getEscape();
+            jobProperties.put(LoadStmt.KEY_ESCAPE, 
String.valueOf(stmt.getEscape()));
         }
         if (stmt.getWorkloadGroupId() > 0) {
             jobProperties.put(WORKLOAD_GROUP, 
String.valueOf(stmt.getWorkloadGroupId()));
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_property.out 
b/regression-test/data/load_p0/routine_load/test_routine_load_property.out
new file mode 100644
index 00000000000..62fdfe4fcf3
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_property.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_enclose_and_escape --
+1      ab,ced  2023-07-15      de      2023-07-20T05:48:31     "ghi"
+
+-- !sql_enclose_and_escape_resume --
+1      ab,ced  2023-07-15      de      2023-07-20T05:48:31     "ghi"
+
+-- !sql_enclose_and_escape_multi_table --
+1      ab,ced  2023-07-15      de      2023-07-20T05:48:31     "ghi"
+
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv 
b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv
new file mode 100644
index 00000000000..0d6530d5686
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape0.csv
@@ -0,0 +1 @@
+1,eab,cfede,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv 
b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv
new file mode 100644
index 00000000000..8cb2bed0210
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_enclose_and_escape1.csv
@@ -0,0 +1 @@
+1,eab,gfegdg,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
new file mode 100644
index 00000000000..9cc1fa0d2d9
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_property","p0") {
+    // send data to Kafka
+    def kafkaCsvTpoics = [
+                  "test_enclose_and_escape0",
+                  "test_enclose_and_escape1",
+                ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    // test create routine load job with enclose and escape
+    def tableName = "test_routine_load_with_enclose_and_escape"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(20) NULL,
+            `k2` string NULL,
+            `v1` date  NULL,
+            `v2` string  NULL,
+            `v3` datetime  NULL,
+            `v4` string  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def jobName = "test_enclose_and_escape"
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "enclose" = "e",
+                    "escape" = "f",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_sql_enclose_and_escape "select * from ${tableName} order by k1"
+
+            sql "pause routine load for ${jobName}"
+            def res = sql "show routine load for ${jobName}"
+            log.info("routine load job properties: 
${res[0][11].toString()}".toString())
+            sql "ALTER ROUTINE LOAD FOR ${jobName} PROPERTIES(\"enclose\" = 
\"g\");"
+            sql "resume routine load for ${jobName}"
+            count = 0
+            while (true) {
+                res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_sql_enclose_and_escape_resume "select * from ${tableName} order 
by k1"
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200",
+                    "enclose" = "e",
+                    "escape" = "f"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+
+            qt_sql_enclose_and_escape_multi_table "select * from ${tableName} 
order by k1"
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to