This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bfb9ba871b2 [test](routine load) add routine load doc case (#43230)
bfb9ba871b2 is described below

commit bfb9ba871b2e710b449717e4b431ff25977951e2
Author: hui lai <1353307...@qq.com>
AuthorDate: Wed Nov 6 23:43:02 2024 +0800

    [test](routine load) add routine load doc case (#43230)
    
    Add routine load doc case.
---
 .../import/import-way/routine-load-manual.md.out   |  123 +++
 .../import/import-way/test_rl_array.json           |    3 +
 .../import/import-way/test_rl_bitmap.json          |    3 +
 .../import/import-way/test_rl_column_mapping.csv   |    3 +
 .../data-operate/import/import-way/test_rl_csv.csv |   10 +
 .../import/import-way/test_rl_delete.csv           |    2 +
 .../data-operate/import/import-way/test_rl_hll.csv |    8 +
 .../import/import-way/test_rl_json.json            |    3 +
 .../import/import-way/test_rl_json_path.json       |    3 +
 .../import/import-way/test_rl_json_root.json       |    3 +
 .../import/import-way/test_rl_map.json             |    3 +
 .../import/import-way/test_rl_max_filter_ratio.csv |    3 +
 .../import/import-way/test_rl_partition.csv        |    3 +
 .../import-way/routine-load-manual.md.groovy       | 1019 ++++++++++++++++++++
 14 files changed, 1189 insertions(+)

diff --git 
a/regression-test/data/doc/data-operate/import/import-way/routine-load-manual.md.out
 
b/regression-test/data/doc/data-operate/import/import-way/routine-load-manual.md.out
new file mode 100644
index 00000000000..4c0c1a9e6b7
--- /dev/null
+++ 
b/regression-test/data/doc/data-operate/import/import-way/routine-load-manual.md.out
@@ -0,0 +1,123 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql1 --
+1      Emily   25
+2      Benjamin        35
+3      Olivia  28
+4      Alexander       60
+5      Ava     17
+6      William 69
+7      Sophia  32
+8      James   64
+9      Emma    37
+10     Liam    64
+
+-- !sql4 --
+1      Benjamin        18
+2      Emily   20
+
+-- !sql5 --
+4      Alexander       60
+5      Ava     17
+6      William 69
+7      Sophia  32
+8      James   64
+9      Emma    37
+10     Liam    64
+
+-- !sql6 --
+1      Emily   25
+2      Benjamin        35
+3      Olivia  28
+4      Alexander       60
+5      Ava     17
+6      William 69
+7      Sophia  32
+8      James   64
+9      Emma    37
+10     Liam    64
+
+-- !sql7 --
+3      Olivia  28
+4      Alexander       60
+5      Ava     17
+6      William 69
+7      Sophia  32
+8      James   64
+9      Emma    37
+10     Liam    64
+
+-- !sql8 --
+1      Benjamin        18      2024-02-04T10:00
+
+-- !sql9 --
+1      Benjamin        18      2024-02-04T10:00
+2      Emily   20      2024-02-05T11:00
+
+-- !sql10 --
+1      Emily   25
+2      Benjamin        35
+3      Olivia  28
+4      Alexander       60
+5      Ava     17
+6      William 69
+7      Sophia  32
+8      James   64
+9      Emma    37
+10     Liam    64
+
+-- !sql11 --
+1      Emily   25
+3      Olivia  28
+4      Alexander       60
+5      Ava     17
+6      William 69
+7      Sophia  32
+8      James   64
+9      Emma    37
+10     Liam    64
+
+-- !sql12 --
+1      Benjamin        18      180
+2      Emily   20      200
+3      Alexander       22      220
+
+-- !sql13 --
+1      Benjamin        18
+2      Emily   20
+3      Alexander       22
+
+-- !sql14 --
+1      Benjamin        18      180
+2      Emily   20      200
+3      Alexander       22      220
+
+-- !sql15 --
+1      Benjamin        18
+2      Emily   20
+3      Alexander       22
+
+-- !sql16 --
+1      Benjamin        18      [1, 2, 3, 4, 5]
+2      Emily   20      [6, 7, 8, 9, 10]
+3      Alexander       22      [11, 12, 13, 14, 15]
+
+-- !sql17 --
+1      Benjamin        18      {"a":100, "b":200}
+2      Emily   20      {"c":300, "d":400}
+3      Alexander       22      {"e":500, "f":600}
+
+-- !sql18 --
+1      Benjamin        18      \N      \N
+2      Emily   20      \N      \N
+3      Alexander       22      \N      \N
+
+-- !sql19 --
+2022-05-05     10001   Test01  Beijing windows \N
+2022-05-06     10001   Test01  Shanghai        windows \N
+2022-05-05     10002   Test01  Beijing linux   \N
+2022-05-06     10002   Test01  Shanghai        linux   \N
+2022-05-05     10003   Test01  Beijing macos   \N
+2022-05-06     10003   Test01  Jiangsu macos   \N
+2022-05-05     10004   Test01  Hebei   windows \N
+2022-05-06     10004   Test01  Shaanxi windows \N
+
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_array.json 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_array.json
new file mode 100644
index 00000000000..8260cad6b77
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_array.json
@@ -0,0 +1,3 @@
+{ "id" : 1, "name" : "Benjamin", "age":18, "array":[1,2,3,4,5]}
+{ "id" : 2, "name" : "Emily", "age":20, "array":[6,7,8,9,10]}
+{ "id" : 3, "name" : "Alexander", "age":22, "array":[11,12,13,14,15]}
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_bitmap.json 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_bitmap.json
new file mode 100644
index 00000000000..73bb59becb4
--- /dev/null
+++ 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_bitmap.json
@@ -0,0 +1,3 @@
+{ "id" : 1, "name" : "Benjamin", "age":18, "bitmap_id":243}
+{ "id" : 2, "name" : "Emily", "age":20, "bitmap_id":28574}
+{ "id" : 3, "name" : "Alexander", "age":22, "bitmap_id":8573}
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_column_mapping.csv
 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_column_mapping.csv
new file mode 100644
index 00000000000..7eb3f2c7998
--- /dev/null
+++ 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_column_mapping.csv
@@ -0,0 +1,3 @@
+1,Benjamin,18
+2,Emily,20
+3,Alexander,22
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_csv.csv 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_csv.csv
new file mode 100644
index 00000000000..9e401297ab2
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_csv.csv
@@ -0,0 +1,10 @@
+1,Emily,25
+2,Benjamin,35
+3,Olivia,28
+4,Alexander,60
+5,Ava,17
+6,William,69
+7,Sophia,32
+8,James,64
+9,Emma,37
+10,Liam,64
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_delete.csv 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_delete.csv
new file mode 100644
index 00000000000..11d1c8468f2
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_delete.csv
@@ -0,0 +1,2 @@
+3,Alexander,22
+5,William,26
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_hll.csv 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_hll.csv
new file mode 100644
index 00000000000..a7d55471221
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_hll.csv
@@ -0,0 +1,8 @@
+2022-05-05,10001,Test01,Beijing,windows
+2022-05-05,10002,Test01,Beijing,linux
+2022-05-05,10003,Test01,Beijing,macos
+2022-05-05,10004,Test01,Hebei,windows
+2022-05-06,10001,Test01,Shanghai,windows
+2022-05-06,10002,Test01,Shanghai,linux
+2022-05-06,10003,Test01,Jiangsu,macos
+2022-05-06,10004,Test01,Shaanxi,windows
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_json.json 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json.json
new file mode 100644
index 00000000000..6e06fe67521
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_json.json
@@ -0,0 +1,3 @@
+{ "id" : 1, "name" : "Benjamin", "age":18 }
+{ "id" : 2, "name" : "Emily", "age":20 }
+{ "id" : 3, "name" : "Alexander", "age":22 }
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_json_path.json
 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json_path.json
new file mode 100644
index 00000000000..f96e2cd232b
--- /dev/null
+++ 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json_path.json
@@ -0,0 +1,3 @@
+{ "name" : "Benjamin", "id" : 1, "num":180 , "age":18 }
+{ "name" : "Emily", "id" : 2, "num":200 , "age":20 }
+{ "name" : "Alexander", "id" : 3, "num":220 , "age":22 }
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_json_root.json
 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json_root.json
new file mode 100644
index 00000000000..60df381670c
--- /dev/null
+++ 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json_root.json
@@ -0,0 +1,3 @@
+{"id": 1231, "source" :{ "id" : 1, "name" : "Benjamin", "age":18 }}
+{"id": 1232, "source" :{ "id" : 2, "name" : "Emily", "age":20 }}
+{"id": 1233, "source" :{ "id" : 3, "name" : "Alexander", "age":22 }}
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_map.json 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_map.json
new file mode 100644
index 00000000000..d8768982495
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_map.json
@@ -0,0 +1,3 @@
+{ "id" : 1, "name" : "Benjamin", "age":18, "map":{"a": 100, "b": 200}}
+{ "id" : 2, "name" : "Emily", "age":20, "map":{"c": 300, "d": 400}}
+{ "id" : 3, "name" : "Alexander", "age":22, "map":{"e": 500, "f": 600}}
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_max_filter_ratio.csv
 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_max_filter_ratio.csv
new file mode 100644
index 00000000000..1e07d97228d
--- /dev/null
+++ 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_max_filter_ratio.csv
@@ -0,0 +1,3 @@
+1,Benjamin,18
+2,Emily,20
+3,Alexander,dirty_data
\ No newline at end of file
diff --git 
a/regression-test/data/doc/data-operate/import/import-way/test_rl_partition.csv 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_partition.csv
new file mode 100644
index 00000000000..f41efd837e1
--- /dev/null
+++ 
b/regression-test/data/doc/data-operate/import/import-way/test_rl_partition.csv
@@ -0,0 +1,3 @@
+1,Benjamin,18,2024-02-04 10:00:00
+2,Emily,20,2024-02-05 11:00:00
+3,Alexander,22,2024-02-06 12:00:00
\ No newline at end of file
diff --git 
a/regression-test/suites/doc/data-operate/import/import-way/routine-load-manual.md.groovy
 
b/regression-test/suites/doc/data-operate/import/import-way/routine-load-manual.md.groovy
new file mode 100644
index 00000000000..b4327a4cb83
--- /dev/null
+++ 
b/regression-test/suites/doc/data-operate/import/import-way/routine-load-manual.md.groovy
@@ -0,0 +1,1019 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_doc_case","p0") {
+    def kafkaCsvTopics = [
+                  "test_rl_csv",
+                  "test_rl_max_filter_ratio",
+                  "test_rl_partition",
+                  "test_rl_delete",
+                  "test_rl_column_mapping",
+                  "test_rl_hll"
+                ]
+
+    def kafkaJsonTopics = [
+                  "test_rl_json",
+                  "test_rl_json_path",
+                  "test_rl_json_root",
+                  "test_rl_array",
+                  "test_rl_map",
+                  "test_rl_bitmap"
+                ]
+
+    def jsonpaths = [
+                '[\"$.id\",\"$.name\",\"$.age\"]',
+                '[\"$.name\",\"$.id\",\"$.num\",\"$.age\"]',
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTopics) {
+            def txt = new 
File("""${context.config.dataPath}/doc/data-operate/import/import-way/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+        for (String kafkaJsonTopic in kafkaJsonTopics) {
+            def kafkaJson = new 
File("""${context.config.dataPath}/doc/data-operate/import/import-way/${kafkaJsonTopic}.json""").text
+            def lines = kafkaJson.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaJsonTopic, null, line)
+                producer.send(record)
+            }
+        }     
+
+        // case1: load csv
+        def tableName = "test_routine_load_doc_case"
+        def jobName = "test_routine_load_doc_case_job"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                user_id            BIGINT       NOT NULL COMMENT "用户 ID",
+                name               VARCHAR(20)           COMMENT "用户姓名",
+                age                INT                   COMMENT "用户年龄"
+            )
+            UNIQUE KEY(user_id)
+            DISTRIBUTED BY HASH(user_id) BUCKETS 10
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS(user_id, name, age)
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql1 "select * from ${tableName} order by user_id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case2: load json
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS(user_id,name,age)
+                PROPERTIES(
+                    "format"="json",
+                    "jsonpaths"='${jsonpaths[0]}'
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaJsonTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case3: alter routine load
+        sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS(user_id, name, age)
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+        """
+        sql "pause routine load for ${jobName}"
+        sql """
+        ALTER ROUTINE LOAD FOR ${jobName}
+        PROPERTIES(
+            "desired_concurrent_number" = "3"
+        )
+        FROM KAFKA(
+            "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+            "kafka_topic" = "test-topic"
+        );
+        """
+        sql "stop routine load for ${jobName}"
+
+        //case4: max_filter_ratio
+        def tableName1 = "test_routine_load_doc_case1"
+        sql """ DROP TABLE IF EXISTS ${tableName1} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName1} (
+                id       INT             NOT NULL   COMMENT "User ID",
+                name     VARCHAR(30)     NOT NULL   COMMENT "Name",
+                age      INT                        COMMENT "Age"
+            )
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName1}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_filter_ratio"="0.5",
+                    "max_error_number" = "100",
+                    "strict_mode" = "true"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[1]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName1}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                log.info("url: ${state[0][18].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql4 "select * from ${tableName1} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName1} """
+        }
+
+        //case5: kafka_offsets
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "kafka_partitions" = "0",
+                    "kafka_offsets" = "3"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql5 "select * from ${tableName} order by user_id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case6: group.id and client.id
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.group.id" = "kafka_job03",
+                    "property.client.id" = "kafka_client_03",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql6 "select * from ${tableName} order by user_id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case7: filter
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                WHERE user_id >= 3
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql7 "select * from ${tableName} order by user_id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        // case8: Loading specified partition data
+        def tableName2 = "test_routine_load_doc_case2"
+        sql """ DROP TABLE IF EXISTS ${tableName2} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName2} (
+                id       INT             NOT NULL   COMMENT "User ID",
+                name     VARCHAR(30)     NOT NULL   COMMENT "Name",
+                age      INT                        COMMENT "Age",
+                date    DATETIME                 COMMENT "Date"
+            )
+            DUPLICATE KEY(`id`)
+            PARTITION BY RANGE(`id`)
+            (PARTITION partition_a VALUES [("0"), ("1")),
+            PARTITION partition_b VALUES [("1"), ("2")),
+            PARTITION partition_c VALUES [("2"), ("3")))
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName2}
+                COLUMNS TERMINATED BY ",",
+                PARTITION(partition_b)
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[2]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName2}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql8 "select * from ${tableName2} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName2} """
+        }
+
+        // case9: timezone
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName2}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "timezone" = "Asia/Shanghai"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[2]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName2}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql9 "select * from ${tableName2} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName2} """
+        }
+
+        // case10: merge delete
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS(user_id, name, age)
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            sql "stop routine load for ${jobName}"
+            sql "sync"
+
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                WITH DELETE
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[3]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql10 "select * from ${tableName} order by user_id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case11: delete on
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                WITH MERGE
+                COLUMNS TERMINATED BY ",",
+                DELETE ON user_id = 2
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql11 "select * from ${tableName} order by user_id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        // case12: Load with column mapping and derived column calculation
+        tableName = "test_routine_load_doc_case3"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                id      INT            NOT NULL  COMMENT "id",
+                name    VARCHAR(30)    NOT NULL  COMMENT "name",
+                age     INT                      COMMENT "age",
+                num     INT                      COMMENT "number"
+            )
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS(id, name, age, num=age*10)
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[4]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql12 "select * from ${tableName} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case13: test json
+        tableName = "routine_test12"
+        jobName = "kafka_job12"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                id      INT            NOT NULL  COMMENT "id",
+                name    VARCHAR(30)    NOT NULL  COMMENT "name",
+                age     INT                      COMMENT "age"
+            )
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                PROPERTIES
+                (
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaJsonTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql13 "select * from ${tableName} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case14: json path
+        tableName = "routine_test13"
+        jobName = "kafka_job13"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                id      INT            NOT NULL  COMMENT "id",
+                name    VARCHAR(30)    NOT NULL  COMMENT "name",
+                age     INT                      COMMENT "age",
+                num     INT                      COMMENT "num"
+            )
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS(name, id, num, age)
+                PROPERTIES
+                (
+                    "format" = "json",
+                    "jsonpaths"='${jsonpaths[1]}'
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaJsonTopics[1]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql14 "select * from ${tableName} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case15: json root
+        tableName = "routine_test14"
+        jobName = "kafka_job14"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                id      INT            NOT NULL  COMMENT "id",
+                name    VARCHAR(30)    NOT NULL  COMMENT "name",
+                age     INT                      COMMENT "age"
+            )
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                PROPERTIES
+                (
+                    "format" = "json",
+                    "json_root" = "\$.source"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaJsonTopics[2]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql15 "select * from ${tableName} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        // case16: array
+        tableName = "routine_test16"
+        jobName = "kafka_job16"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                id      INT             NOT NULL  COMMENT "id",
+                name    VARCHAR(30)     NOT NULL  COMMENT "name",
+                age     INT                       COMMENT "age",
+                array   ARRAY<int(11)>  NULL      COMMENT "test array column"
+            )
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                PROPERTIES
+                (
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaJsonTopics[3]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql16 "select * from ${tableName} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        // case17: map
+        tableName = "routine_test17"
+        jobName = "kafka_job17"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                id      INT                 NOT NULL  COMMENT "id",
+                name    VARCHAR(30)         NOT NULL  COMMENT "name",
+                age     INT                           COMMENT "age",
+                map     Map<STRING, INT>    NULL      COMMENT "test column"
+            )
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                PROPERTIES
+                (
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaJsonTopics[4]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql17 "select * from ${tableName} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        // case18: bitmap
+        tableName = "routine_test18"
+        jobName = "kafka_job18"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                id        INT            NOT NULL      COMMENT "id",
+                name      VARCHAR(30)    NOT NULL      COMMENT "name",
+                age       INT                          COMMENT "age",
+                bitmap_id INT                          COMMENT "test",
+                device_id BITMAP         BITMAP_UNION  COMMENT "test column"
+            )
+            AGGREGATE KEY (`id`,`name`,`age`,`bitmap_id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS(id, name, age, bitmap_id, 
device_id=to_bitmap(bitmap_id))
+                PROPERTIES
+                (
+                    "format" = "json"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaJsonTopics[4]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql18 "select * from ${tableName} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        // case19: hll
+        tableName = "routine_test19"
+        jobName = "kafka_job19"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                dt        DATE,
+                id        INT,
+                name      VARCHAR(10),
+                province  VARCHAR(10),
+                os        VARCHAR(10),
+                pv        hll hll_union
+            )
+            Aggregate KEY (dt,id,name,province,os)
+            distributed by hash(id) buckets 10
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS(dt, id, name, province, os, pv=hll_hash(id))
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[5]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                log.info("url: ${state[0][18].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql19 "select * from ${tableName} order by id"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ truncate table ${tableName} """
+        }
+
+        //case 19: Single-task Loading to Multiple Tables
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName}
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[5]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+
+        //case20: strict mode
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName}
+                PROPERTIES
+                (
+                    "strict_mode" = "true"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTopics[5]}"
+                );
+            """
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to