This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 681ac90ba15 branch-3.0: [fix](load) fix routine load job progress 
fallback after FE master node restart #50221 (#50282)
681ac90ba15 is described below

commit 681ac90ba159b525e553276c09b05c472837c01f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 23 17:22:07 2025 +0800

    branch-3.0: [fix](load) fix routine load job progress fallback after FE 
master node restart #50221 (#50282)
    
    Cherry-picked from #50221
    
    Co-authored-by: hui lai <lai...@selectdb.com>
---
 .../load/routineload/KafkaRoutineLoadJob.java      |   6 +
 .../data/test_routine_load_progress.csv            |   4 +
 .../routine_load/test_routine_load_progress.groovy | 130 +++++++++++++++++++++
 3 files changed, 140 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index bbf34b93258..896f1a1bcd9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -423,6 +423,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     private boolean isKafkaPartitionsChanged() throws UserException {
         if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
+            // for the case where the currentKafkaPartitions has not been 
assigned,
+            // we assume that the fe master has restarted or the job has been 
newly created,
+            // in this case, we need to pull the saved progress from meta 
service once
+            if (Config.isCloudMode() && (currentKafkaPartitions == null || 
currentKafkaPartitions.isEmpty())) {
+                updateCloudProgress();
+            }
             currentKafkaPartitions = customKafkaPartitions;
             return false;
         }
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_progress.csv
 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_progress.csv
new file mode 100644
index 00000000000..b641f1eb6a2
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_progress.csv
@@ -0,0 +1,4 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
new file mode 100644
index 00000000000..c372c5826b2
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_progress","docker") {
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    docker(options) {
+        def kafkaCsvTpoics = [
+                  "test_routine_load_progress",
+                ]
+        String enabled = context.config.otherConfigs.get("enableKafkaTest")
+        String kafka_port = context.config.otherConfigs.get("kafka_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        def kafka_broker = "${externalEnvIp}:${kafka_port}"
+        if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            // 1. send data to kafka
+            def props = new Properties()
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            def producer = new KafkaProducer<>(props)
+            for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+                def lines = txt.readLines()
+                lines.each { line ->
+                    logger.info("=====${line}========")
+                    def record = new ProducerRecord<>(kafkaCsvTopic, null, 
line)
+                    producer.send(record)
+                }
+            }
+
+            // 2. create table and routine load job
+            def tableName = "test_routine_load_progress"
+            def job = "test_progress"
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    `k1` int(20) NULL,
+                    `k2` string NULL,
+                    `v1` date  NULL,
+                    `v2` string  NULL,
+                    `v3` datetime  NULL,
+                    `v4` string  NULL
+                ) ENGINE=OLAP
+                DUPLICATE KEY(`k1`)
+                COMMENT 'OLAP'
+                DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+                PROPERTIES ("replication_allocation" = "tag.location.default: 
1");
+            """
+
+            try {
+                sql """
+                    CREATE ROUTINE LOAD ${job} ON ${tableName}
+                    COLUMNS TERMINATED BY ","
+                    FROM KAFKA
+                    (
+                        "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                        "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                        "kafka_partitions" = "0",
+                        "kafka_offsets" = "2"
+                    );
+                """
+                def count = 0
+                def beforeRes = 0
+                def afterRes = 0
+                while (true) {
+                    beforeRes = sql "select count(*) from ${tableName}"
+                    log.info("beforeRes: ${beforeRes}")
+                    def state = sql "show routine load for ${job}"
+                    log.info("routine load state: 
${state[0][8].toString()}".toString())
+                    log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                    log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                    def lagJson = parseJson(state[0][16].toString())
+                    log.info("lag raw json: ${state[0][16].toString()}")
+                    if (beforeRes[0][0] > 0 && lagJson["0"] == 0) {
+                        break
+                    }
+                    if (count >= 30) {
+                        log.error("routine load can not visible for long time")
+                        assertEquals(1, 2)
+                        break
+                    }
+                    sleep(1000)
+                    count++
+                }
+
+                // 3. restart fe master
+                def masterFeIndex = cluster.getMasterFe().index
+                cluster.restartFrontends(masterFeIndex)
+                sleep(30 * 1000)
+                context.reconnectFe()
+
+                // 4. check count of table
+                def state = sql "show routine load for ${job}"
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("progress: ${state[0][15].toString()}")
+                log.info("lag: ${state[0][16].toString()}")
+                afterRes = sql "select count(*) from ${tableName}"
+                log.info("afterRes: ${afterRes}")
+                if (beforeRes[0][0] != afterRes[0][0]) {
+                    assertEquals(1, 2)
+                }
+            } finally {
+                sql "stop routine load for ${job}"
+            }
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to