This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fd1e2466083 [improve](routine load) introduce routine load task min timeout (#46292) fd1e2466083 is described below commit fd1e246608384edde8cc757b2752d65d6aeca555 Author: hui lai <lai...@selectdb.com> AuthorDate: Mon Jan 6 11:40:12 2025 +0800 [improve](routine load) introduce routine load task min timeout (#46292) For certain scenarios, it is possible to set the max_match_interval of the routine load to be very small, such as setting it to 1, which can result in a very short timeout period. In some scenarios (such as high pressure and using MOW table), it may lead to a continuous non consumption for timeout. Therefore, a minimum timeout is configured to solve a problem. --- .../main/java/org/apache/doris/common/Config.java | 6 + .../load/routineload/KafkaRoutineLoadJob.java | 3 +- .../doris/load/routineload/RoutineLoadJob.java | 5 +- .../data/test_routine_load_timeout_value.csv | 1 + .../test_routine_load_timeout_value.groovy | 189 +++++++++++++++++++++ 5 files changed, 201 insertions(+), 3 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 28a11632dc5..a8b0bd15e84 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1260,6 +1260,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int routine_load_task_timeout_multiplier = 10; + /** + * routine load task min timeout second. + */ + @ConfField(mutable = true, masterOnly = true) + public static int routine_load_task_min_timeout_sec = 60; + /** * the max timeout of get kafka meta. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index d0843eb9204..8cb0898eda8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -234,8 +234,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { ((KafkaProgress) progress).getOffsetByPartition(kafkaPartition)); } KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, - maxBatchIntervalS * Config.routine_load_task_timeout_multiplier * 1000, - taskKafkaProgress, isMultiTable(), -1, false); + getTimeout() * 1000, taskKafkaProgress, isMultiTable(), -1, false); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 1ee4fbee123..048c999b30e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -620,7 +620,10 @@ public abstract class RoutineLoadJob @Override public int getTimeout() { - return (int) getMaxBatchIntervalS() * Config.routine_load_task_timeout_multiplier; + int timeoutSec = (int) getMaxBatchIntervalS() * Config.routine_load_task_timeout_multiplier; + int realTimeoutSec = timeoutSec < Config.routine_load_task_min_timeout_sec + ? Config.routine_load_task_min_timeout_sec : timeoutSec; + return realTimeoutSec; } @Override diff --git a/regression-test/suites/load_p0/routine_load/data/test_routine_load_timeout_value.csv b/regression-test/suites/load_p0/routine_load/data/test_routine_load_timeout_value.csv new file mode 100644 index 00000000000..7469de21d82 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_timeout_value.csv @@ -0,0 +1 @@ +57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10 18:39:10|2023-02-12|2023-01-27 07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": "New York"} diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy new file mode 100644 index 00000000000..de813084c09 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_timeout_value","nonConcurrent") { + def kafkaCsvTpoics = [ + "test_routine_load_timeout_value", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + // send data to kafka + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + def producer = new KafkaProducer<>(props) + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // create table + def jobName = "test_routine_load_timeout_value" + def tableName = "test_routine_load_timeout_value" + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 BOOLEAN NOT NULL DEFAULT "TRUE", + kd02 TINYINT NOT NULL DEFAULT "1", + kd03 SMALLINT NOT NULL DEFAULT "2", + kd04 INT NOT NULL DEFAULT "3", + kd05 BIGINT NOT NULL DEFAULT "4", + kd06 LARGEINT NOT NULL DEFAULT "5", + kd07 FLOAT NOT NULL DEFAULT "6.0", + kd08 DOUBLE NOT NULL DEFAULT "7.0", + kd09 DECIMAL NOT NULL DEFAULT "888888888", + kd10 DECIMALV3 NOT NULL DEFAULT "999999999", + kd11 DATE NOT NULL DEFAULT "2023-08-24", + kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00", + kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", + kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00", + kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd18 JSON NULL, + + INDEX idx_inverted_k104 (`k05`) USING INVERTED, + INDEX idx_inverted_k110 (`k11`) USING INVERTED, + INDEX idx_inverted_k113 (`k13`) USING INVERTED, + INDEX idx_inverted_k114 (`k14`) USING INVERTED, + INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), + INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_bitmap_k104 (`k02`) USING BITMAP, + INDEX idx_bitmap_k110 (`kd01`) USING BITMAP + + ) + DUPLICATE KEY(k00) + PARTITION BY RANGE(k01) + ( + PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')), + PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')), + PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01')) + ) + DISTRIBUTED BY HASH(k00) BUCKETS 32 + PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" + ); + """ + sql "sync" + + // create job + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + // test timeout less than 60s + def count = 0 + while (true) { + def res = sql "SHOW ROUTINE LOAD TASK WHERE JobName = '${jobName}'" + if (res.size() > 0) { + log.info("res: ${res[0].toString()}".toString()) + log.info("timeout: ${res[0][6].toString()}".toString()) + assertTrue(res[0][6].toString().contains("60")) + break; + } + count++ + if (count > 60) { + assertEquals(1, 2) + break; + } else { + sleep(1000) + continue; + } + } + sql "pause routine load for ${jobName}" + sql "ALTER ROUTINE LOAD FOR ${jobName} PROPERTIES ( \"max_batch_interval\" = \"10\") FROM KAFKA(\"kafka_partitions\" = \"0\", \"kafka_offsets\" = \"0\");" + sql "resume routine load for ${jobName}" + // test timeout greater than 60s + while (true) { + def res = sql "SHOW ROUTINE LOAD TASK WHERE JobName = '${jobName}'" + if (res.size() > 0) { + log.info("res: ${res[0].toString()}".toString()) + log.info("timeout: ${res[0][6].toString()}".toString()) + assertTrue(res[0][6].toString().contains("100")) + break; + } + count++ + if (count > 60) { + assertEquals(1, 2) + break; + } else { + sleep(1000) + continue; + } + } + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org