This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f76397277ec [fix](routine load) fix show routine load task result 
incorrect (#38523) (#38826)
f76397277ec is described below

commit f76397277ec44e5cb901c716644fad712500e2dd
Author: hui lai <1353307...@qq.com>
AuthorDate: Sun Aug 4 22:18:25 2024 +0800

    [fix](routine load) fix show routine load task result incorrect (#38523) 
(#38826)
    
    pick (#38523)
    
    Create a job:
    ```
    CREATE ROUTINE LOAD testShow ON test_show_routine_load
    COLUMNS TERMINATED BY ","
    PROPERTIES
    (
    "max_batch_interval" = "5",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "127.0.0.1:19092",
    "kafka_topic" = "test_show_routine_load",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
    ```
    show routine load task:
    ```
    SHOW ROUTINE LOAD TASK WHERE JobName = "testShow";
    ```
    result:
    ```
    ERROR 1105 (HY000): errCode = 2, detailMessage = The job named testshowdoes 
not exists or job state is stopped or cancelled
    ```
    
    Do not  use `toLowerCase` method;
---
 .../doris/analysis/ShowRoutineLoadTaskStmt.java    |   2 +-
 .../routine_load/data/test_show_routine_load.csv   |   1 +
 .../routine_load/test_show_routine_load.groovy     | 150 +++++++++++++++++++++
 3 files changed, 152 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java
index 7258822ab50..570b82d3667 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java
@@ -121,7 +121,7 @@ public class ShowRoutineLoadTaskStmt extends ShowStmt {
                 break CHECK;
             }
             StringLiteral stringLiteral = (StringLiteral) 
binaryPredicate.getChild(1);
-            jobName = stringLiteral.getValue().toLowerCase();
+            jobName = stringLiteral.getValue();
         } // CHECKSTYLE IGNORE THIS LINE
 
         if (!valid) {
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_show_routine_load.csv 
b/regression-test/suites/load_p0/routine_load/data/test_show_routine_load.csv
new file mode 100644
index 00000000000..b226b99ee4e
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_show_routine_load.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
new file mode 100644
index 00000000000..6075dc20dbe
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_show_routine_load","p0") {
+    def kafkaCsvTpoics = [
+                  "test_show_routine_load",
+                ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def tableName = "test_show_routine_load"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD testShow ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql """
+                CREATE ROUTINE LOAD testShow1 ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            String db = context.config.getDbNameByFile(context.file)
+            log.info("reason of state changed: ${db}".toString())
+
+            def res = sql "show routine load for ${db}.testShow"
+            log.info("reason of state changed: ${res.size()}".toString())
+            assertTrue(res.size() == 1)
+
+            res = sql "show routine load for testShow"
+            log.info("reason of state changed: ${res.size()}".toString())
+            assertTrue(res.size() == 1)
+
+            res = sql "show all routine load"
+            log.info("reason of state changed: ${res.size()}".toString())
+            assertTrue(res.size() > 1)
+
+            res = sql "SHOW ROUTINE LOAD LIKE \"%testShow%\""
+            log.info("reason of state changed: ${res.size()}".toString())
+            assertTrue(res.size() == 2)
+
+            def count = 0
+            while (true) {
+                res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for testShow1"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            res = sql "SHOW ROUTINE LOAD TASK WHERE JobName = \"testShow1\";"
+            log.info("SHOW ROUTINE LOAD task result: ${res}".toString())
+            assertTrue(res.size() == 1)
+        } finally {
+            sql "stop routine load for testShow"
+            sql "stop routine load for testShow1"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to