This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fd1e2466083 [improve](routine load) introduce routine load task min 
timeout (#46292)
fd1e2466083 is described below

commit fd1e246608384edde8cc757b2752d65d6aeca555
Author: hui lai <lai...@selectdb.com>
AuthorDate: Mon Jan 6 11:40:12 2025 +0800

    [improve](routine load) introduce routine load task min timeout (#46292)
    
    For certain scenarios, it is possible to set the max_match_interval of
    the routine load to be very small, such as setting it to 1, which can
    result in a very short timeout period. In some scenarios (such as high
    pressure and using MOW table), it may lead to a continuous non
    consumption for timeout. Therefore, a minimum timeout is configured to
    solve a problem.
---
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../load/routineload/KafkaRoutineLoadJob.java      |   3 +-
 .../doris/load/routineload/RoutineLoadJob.java     |   5 +-
 .../data/test_routine_load_timeout_value.csv       |   1 +
 .../test_routine_load_timeout_value.groovy         | 189 +++++++++++++++++++++
 5 files changed, 201 insertions(+), 3 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 28a11632dc5..a8b0bd15e84 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1260,6 +1260,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int routine_load_task_timeout_multiplier = 10;
 
+    /**
+     * routine load task min timeout second.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int routine_load_task_min_timeout_sec = 60;
+
     /**
      * the max timeout of get kafka meta.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index d0843eb9204..8cb0898eda8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -234,8 +234,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                                 ((KafkaProgress) 
progress).getOffsetByPartition(kafkaPartition));
                     }
                     KafkaTaskInfo kafkaTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), id,
-                            maxBatchIntervalS * 
Config.routine_load_task_timeout_multiplier * 1000,
-                            taskKafkaProgress, isMultiTable(), -1, false);
+                            getTimeout() * 1000, taskKafkaProgress, 
isMultiTable(), -1, false);
                     routineLoadTaskInfoList.add(kafkaTaskInfo);
                     result.add(kafkaTaskInfo);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 1ee4fbee123..048c999b30e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -620,7 +620,10 @@ public abstract class RoutineLoadJob
 
     @Override
     public int getTimeout() {
-        return (int) getMaxBatchIntervalS() * 
Config.routine_load_task_timeout_multiplier;
+        int timeoutSec = (int) getMaxBatchIntervalS() * 
Config.routine_load_task_timeout_multiplier;
+        int realTimeoutSec = timeoutSec < 
Config.routine_load_task_min_timeout_sec
+                    ? Config.routine_load_task_min_timeout_sec : timeoutSec;
+        return realTimeoutSec;
     }
 
     @Override
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_timeout_value.csv
 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_timeout_value.csv
new file mode 100644
index 00000000000..7469de21d82
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_timeout_value.csv
@@ -0,0 +1 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
 18:39:10|2023-02-12|2023-01-27 
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": 
"New York"}
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
new file mode 100644
index 00000000000..de813084c09
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_timeout_value","nonConcurrent") {
+    def kafkaCsvTpoics = [
+                  "test_routine_load_timeout_value",
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    // send data to kafka 
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        def producer = new KafkaProducer<>(props)
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // create table
+        def jobName = "test_routine_load_timeout_value"
+        def tableName = "test_routine_load_timeout_value"
+        try {
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName}
+            (
+                k00 INT             NOT NULL,
+                k01 DATE            NOT NULL,
+                k02 BOOLEAN         NULL,
+                k03 TINYINT         NULL,
+                k04 SMALLINT        NULL,
+                k05 INT             NULL,
+                k06 BIGINT          NULL,
+                k07 LARGEINT        NULL,
+                k08 FLOAT           NULL,
+                k09 DOUBLE          NULL,
+                k10 DECIMAL(9,1)    NULL,
+                k11 DECIMALV3(9,1)  NULL,
+                k12 DATETIME        NULL,
+                k13 DATEV2          NULL,
+                k14 DATETIMEV2      NULL,
+                k15 CHAR            NULL,
+                k16 VARCHAR         NULL,
+                k17 STRING          NULL,
+                k18 JSON            NULL,
+                kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
+                kd02 TINYINT         NOT NULL DEFAULT "1",
+                kd03 SMALLINT        NOT NULL DEFAULT "2",
+                kd04 INT             NOT NULL DEFAULT "3",
+                kd05 BIGINT          NOT NULL DEFAULT "4",
+                kd06 LARGEINT        NOT NULL DEFAULT "5",
+                kd07 FLOAT           NOT NULL DEFAULT "6.0",
+                kd08 DOUBLE          NOT NULL DEFAULT "7.0",
+                kd09 DECIMAL         NOT NULL DEFAULT "888888888",
+                kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
+                kd11 DATE            NOT NULL DEFAULT "2023-08-24",
+                kd12 DATETIME        NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
+                kd14 DATETIMEV2      NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd18 JSON            NULL,
+                
+                INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+                INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+                INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+                INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+                INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
+                INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+                INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+                
+            )
+            DUPLICATE KEY(k00)
+            PARTITION BY RANGE(k01)
+            (
+                PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+                PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+                PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+            )
+            DISTRIBUTED BY HASH(k00) BUCKETS 32
+            PROPERTIES (
+                "bloom_filter_columns"="k05",
+                "replication_num" = "1"
+            );
+            """
+            sql "sync"
+
+            // create job
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            // test timeout less than 60s
+            def count = 0
+            while (true) {
+                def res = sql "SHOW ROUTINE LOAD TASK WHERE JobName = 
'${jobName}'"
+                if (res.size() > 0) {
+                    log.info("res: ${res[0].toString()}".toString())
+                    log.info("timeout: ${res[0][6].toString()}".toString())
+                    assertTrue(res[0][6].toString().contains("60"))
+                    break;
+                }
+                count++
+                if (count > 60) {
+                    assertEquals(1, 2)
+                    break;
+                } else {
+                    sleep(1000)
+                    continue;
+                }
+            }
+            sql "pause routine load for ${jobName}"
+            sql "ALTER ROUTINE LOAD FOR ${jobName} PROPERTIES ( 
\"max_batch_interval\" = \"10\") FROM KAFKA(\"kafka_partitions\" = \"0\", 
\"kafka_offsets\" = \"0\");"
+            sql "resume routine load for ${jobName}"
+            // test timeout greater than 60s
+            while (true) {
+                def res = sql "SHOW ROUTINE LOAD TASK WHERE JobName = 
'${jobName}'"
+                if (res.size() > 0) {
+                    log.info("res: ${res[0].toString()}".toString())
+                    log.info("timeout: ${res[0][6].toString()}".toString())
+                    assertTrue(res[0][6].toString().contains("100"))
+                    break;
+                }
+                count++
+                if (count > 60) {
+                    assertEquals(1, 2)
+                    break;
+                } else {
+                    sleep(1000)
+                    continue;
+                }
+            }
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to