This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fef962aea6c [fix](routine load) make routine load delay eof schedule 
work (#45528)
fef962aea6c is described below

commit fef962aea6cc38c59e548faa87f8c70ca5552873
Author: hui lai <lai...@selectdb.com>
AuthorDate: Fri Dec 20 15:29:21 2024 +0800

    [fix](routine load) make routine load delay eof schedule work (#45528)
---
 .../load/routineload/KafkaRoutineLoadJob.java      |   2 +-
 .../doris/load/routineload/KafkaTaskInfo.java      |   9 +-
 .../load/routineload/RoutineLoadTaskInfo.java      |   9 +-
 .../load/routineload/KafkaRoutineLoadJobTest.java  |   2 +-
 .../routineload/RoutineLoadTaskSchedulerTest.java  |   2 +-
 .../transaction/GlobalTransactionMgrTest.java      |   4 +-
 .../suites/load_p0/routine_load/data/test_eof.csv  |   1 +
 .../routine_load/test_routine_load_eof.groovy      | 178 +++++++++++++++++++++
 8 files changed, 195 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 6bdef3301a6..d0843eb9204 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -235,7 +235,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                     }
                     KafkaTaskInfo kafkaTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), id,
                             maxBatchIntervalS * 
Config.routine_load_task_timeout_multiplier * 1000,
-                            taskKafkaProgress, isMultiTable());
+                            taskKafkaProgress, isMultiTable(), -1, false);
                     routineLoadTaskInfoList.add(kafkaTaskInfo);
                     result.add(kafkaTaskInfo);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index f1578269529..e3292dc671f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -49,16 +49,17 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
     private Map<Integer, Long> partitionIdToOffset;
 
     public KafkaTaskInfo(UUID id, long jobId,
-                         long timeoutMs, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable) {
-        super(id, jobId, timeoutMs, isMultiTable);
+                         long timeoutMs, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable,
+                         long lastScheduledTime, boolean isEof) {
+        super(id, jobId, timeoutMs, isMultiTable, lastScheduledTime, isEof);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
     public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable) {
         super(UUID.randomUUID(), kafkaTaskInfo.getJobId(),
-                kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), 
isMultiTable);
+                kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), 
isMultiTable,
+                kafkaTaskInfo.getLastScheduledTime(), 
kafkaTaskInfo.getIsEof());
         this.partitionIdToOffset = partitionIdToOffset;
-        this.isEof = kafkaTaskInfo.getIsEof();
     }
 
     public List<Integer> getPartitions() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 1ff825d97b9..5075311299d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -79,17 +79,20 @@ public abstract class RoutineLoadTaskInfo {
     // so that user or other logic can know the status of the corresponding 
txn.
     protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean 
isMultiTable) {
+    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean 
isMultiTable,
+                    long lastScheduledTime, boolean isEof) {
         this.id = id;
         this.jobId = jobId;
         this.createTimeMs = System.currentTimeMillis();
         this.timeoutMs = timeoutMs;
         this.isMultiTable = isMultiTable;
+        this.lastScheduledTime = lastScheduledTime;
+        this.isEof = isEof;
     }
 
     public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long 
previousBeId,
-                               boolean isMultiTable) {
-        this(id, jobId, timeoutMs, isMultiTable);
+                               boolean isMultiTable, long lastScheduledTime, 
boolean isEof) {
+        this(id, jobId, timeoutMs, isMultiTable, lastScheduledTime, isEof);
         this.previousBeId = previousBeId;
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 20cb626ff37..63452a5d59c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -225,7 +225,7 @@ public class KafkaRoutineLoadJobTest {
         Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
         partitionIdsToOffset.put(100, 0L);
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
-                maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
+                maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false, -1, 
false);
         kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - 
maxBatchIntervalS * 2 * 1000 - 1);
         routineLoadTaskInfoList.add(kafkaTaskInfo);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index 95c2423de71..6e11fc5f71a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -70,7 +70,7 @@ public class RoutineLoadTaskSchedulerTest {
 
         LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = 
new LinkedBlockingDeque<>();
         KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
-                partitionIdToOffset, false);
+                partitionIdToOffset, false, -1, false);
         routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
 
         Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 420800a4bb3..c4ec468c651 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -302,7 +302,7 @@ public class GlobalTransactionMgrTest {
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
         KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
-                partitionIdToOffset, false);
+                partitionIdToOffset, false, -1, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         TransactionState transactionState = new TransactionState(1L, 
Lists.newArrayList(1L), 1L, "label", null,
@@ -368,7 +368,7 @@ public class GlobalTransactionMgrTest {
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
         KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
-                partitionIdToOffset, false);
+                partitionIdToOffset, false, -1, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         TransactionState transactionState = new TransactionState(1L, 
Lists.newArrayList(1L), 1L, "label", null,
diff --git a/regression-test/suites/load_p0/routine_load/data/test_eof.csv 
b/regression-test/suites/load_p0/routine_load/data/test_eof.csv
new file mode 100644
index 00000000000..bc857cabcfd
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_eof.csv
@@ -0,0 +1 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
 18:39:10|2023-02-12|2023-01-27 
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": 
"New York"}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
new file mode 100644
index 00000000000..6eeb9a4e51c
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_eof","p0") {
+    def kafkaCsvTpoics = [
+                  "test_eof",
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def thread = Thread.start {
+            // define kafka 
+            def props = new Properties()
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            // Create kafka producer
+            def producer = new KafkaProducer<>(props)
+
+            while(true) {
+                Thread.sleep(1000)
+                for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                    def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+                    def lines = txt.readLines()
+                    lines.each { line ->
+                        logger.info("=====${line}========")
+                        def record = new ProducerRecord<>(kafkaCsvTopic, null, 
line)
+                        producer.send(record)
+                    }
+                }
+            }
+        }
+
+        sleep(2 * 1000)
+
+        def jobName = "testEof"
+        def tableName = "test_routine_load_eof"
+        try {
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName}
+            (
+                k00 INT             NOT NULL,
+                k01 DATE            NOT NULL,
+                k02 BOOLEAN         NULL,
+                k03 TINYINT         NULL,
+                k04 SMALLINT        NULL,
+                k05 INT             NULL,
+                k06 BIGINT          NULL,
+                k07 LARGEINT        NULL,
+                k08 FLOAT           NULL,
+                k09 DOUBLE          NULL,
+                k10 DECIMAL(9,1)    NULL,
+                k11 DECIMALV3(9,1)  NULL,
+                k12 DATETIME        NULL,
+                k13 DATEV2          NULL,
+                k14 DATETIMEV2      NULL,
+                k15 CHAR            NULL,
+                k16 VARCHAR         NULL,
+                k17 STRING          NULL,
+                k18 JSON            NULL,
+                kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
+                kd02 TINYINT         NOT NULL DEFAULT "1",
+                kd03 SMALLINT        NOT NULL DEFAULT "2",
+                kd04 INT             NOT NULL DEFAULT "3",
+                kd05 BIGINT          NOT NULL DEFAULT "4",
+                kd06 LARGEINT        NOT NULL DEFAULT "5",
+                kd07 FLOAT           NOT NULL DEFAULT "6.0",
+                kd08 DOUBLE          NOT NULL DEFAULT "7.0",
+                kd09 DECIMAL         NOT NULL DEFAULT "888888888",
+                kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
+                kd11 DATE            NOT NULL DEFAULT "2023-08-24",
+                kd12 DATETIME        NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
+                kd14 DATETIMEV2      NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd18 JSON            NULL,
+                
+                INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+                INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+                INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+                INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+                INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
+                INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+                INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+                INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+                
+            )
+            DUPLICATE KEY(k00)
+            PARTITION BY RANGE(k01)
+            (
+                PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+                PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+                PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+            )
+            DISTRIBUTED BY HASH(k00) BUCKETS 32
+            PROPERTIES (
+                "bloom_filter_columns"="k05",
+                "replication_num" = "1"
+            );
+            """
+            sql "sync"
+
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "test_eof",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                if (state != "RUNNING") {
+                    count++
+                    if (count > 60) {
+                        assertEquals(1, 2)
+                    } 
+                    continue;
+                }
+                break;
+            }
+            sleep(60 * 1000)
+            def res = sql "show routine load for ${jobName}"
+            def statistic = res[0][14].toString()
+            def json = parseJson(res[0][14])
+            log.info("routine load statistic: 
${res[0][14].toString()}".toString())
+            if (json.committedTaskNum > 20) {
+                assertEquals(1, 2)
+            }
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+        thread.interrupt()
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to