This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8e4fad99a1b [test](routine load) add routine load case with timestamp 
as offset(#38567) (#38822)
8e4fad99a1b is described below

commit 8e4fad99a1bc981b8731ab6a754a39480ec06a9c
Author: hui lai <1353307...@qq.com>
AuthorDate: Sun Aug 4 22:05:19 2024 +0800

    [test](routine load) add routine load case with timestamp as offset(#38567) 
(#38822)
    
    pick (#38567)
---
 .../routine_load/test_routine_load_offset.out      |   4 +
 .../load_p0/routine_load/data/test_offset_time.csv |   1 +
 .../routine_load/test_routine_load_offset.groovy   | 113 +++++++++++++++++++++
 3 files changed, 118 insertions(+)

diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_offset.out 
b/regression-test/data/load_p0/routine_load/test_routine_load_offset.out
new file mode 100644
index 00000000000..4c7c19cfe11
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_offset.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_offset --
+1      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_offset_time.csv 
b/regression-test/suites/load_p0/routine_load/data/test_offset_time.csv
new file mode 100644
index 00000000000..c712640bcde
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_offset_time.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
new file mode 100644
index 00000000000..84d0509cea3
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_offset","p0") {
+    def kafkaCsvTpoics = [
+                  "test_offset_time",
+                ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def tableName = "test_routine_load_offset"
+        def job = "test_offset"
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "kafka_partitions" = "0",
+                    "kafka_offsets" = "2021-05-22 11:00:00"
+                );
+            """
+
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${job}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_sql_offset "select * from ${tableName} order by k1"
+        } finally {
+            sql "stop routine load for ${job}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to