This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 898fe415b9d branch-3.0: [improve](load) introduce black list of 
backend when load job fetch meta to avoid jitter #50587 (#51043)
898fe415b9d is described below

commit 898fe415b9d09adebf3bcf8a6f3f8511e0a2e7a4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 22 14:18:34 2025 +0800

    branch-3.0: [improve](load) introduce black list of backend when load job 
fetch meta to avoid jitter #50587 (#51043)
    
    Cherry-picked from #50587
    
    Co-authored-by: hui lai <lai...@selectdb.com>
---
 be/src/runtime/routine_load/data_consumer.cpp      |   4 +
 .../main/java/org/apache/doris/common/Config.java  |   7 ++
 .../apache/doris/datasource/kafka/KafkaUtil.java   |  40 ++++++-
 .../doris/load/routineload/RoutineLoadManager.java |  25 +++++
 .../load_p0/routine_load/data/test_black_list.csv  |   1 +
 .../load_p0/routine_load/test_black_list.groovy    | 124 +++++++++++++++++++++
 6 files changed, 199 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 7566d06914a..00f9c726b41 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -418,6 +418,10 @@ Status KafkaDataConsumer::get_offsets_for_times(const 
std::vector<PIntegerPair>&
 Status KafkaDataConsumer::get_latest_offsets_for_partitions(
         const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* 
offsets,
         int timeout) {
+    
DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", {
+        // sleep 60s
+        std::this_thread::sleep_for(std::chrono::seconds(60));
+    });
     MonotonicStopWatch watch;
     watch.start();
     for (int32_t partition_id : partition_ids) {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index e609da9900c..e5c88a0e485 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1270,6 +1270,13 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int max_get_kafka_meta_timeout_second = 60;
 
+
+    /**
+     * the expire time of routine load blacklist.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int routine_load_blacklist_expire_time_second = 300;
+
     /**
      * The max number of files store in SmallFileMgr
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index da683cc2b37..3e78ba0d4a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -35,8 +35,10 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -227,12 +229,29 @@ public class KafkaUtil {
         TNetworkAddress address = null;
         Future<InternalService.PProxyResult> future = null;
         InternalService.PProxyResult result = null;
+        Set<Long> failedBeIds = new HashSet<>();
+        TStatusCode code = null;
+
         try {
             while (retryTimes < 3) {
                 List<Long> backendIds = new ArrayList<>();
                 for (Long beId : 
Env.getCurrentSystemInfo().getAllBackendIds(true)) {
                     Backend backend = 
Env.getCurrentSystemInfo().getBackend(beId);
-                    if (backend != null && backend.isLoadAvailable() && 
!backend.isDecommissioned()) {
+                    if (backend != null && backend.isLoadAvailable()
+                            && !backend.isDecommissioned()
+                            && !failedBeIds.contains(beId)
+                            && 
!Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) {
+                        backendIds.add(beId);
+                    }
+                }
+                // If there are no available backends, utilize the blacklist.
+                // Special scenarios include:
+                // 1. A specific job that connects to Kafka may time out for 
topic config or network error,
+                //    leaving only one backend operational.
+                // 2. If that sole backend is decommissioned, the 
aliveBackends list becomes empty.
+                // Hence, in such cases, it's essential to rely on the 
blacklist to obtain meta information.
+                if (backendIds.isEmpty()) {
+                    for (Long beId : 
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
                         backendIds.add(beId);
                     }
                 }
@@ -243,19 +262,22 @@ public class KafkaUtil {
                 Collections.shuffle(backendIds);
                 Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
                 address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+                long beId = be.getId();
 
                 try {
                     future = 
BackendServiceProxy.getInstance().getInfo(address, request);
                     result = 
future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
                 } catch (Exception e) {
                     LOG.warn("failed to get info request to " + address + " 
err " + e.getMessage());
+                    failedBeIds.add(beId);
                     retryTimes++;
                     continue;
                 }
-                TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+                code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
                 if (code != TStatusCode.OK) {
                     LOG.warn("failed to get info request to "
                             + address + " err " + 
result.getStatus().getErrorMsgsList());
+                    failedBeIds.add(beId);
                     retryTimes++;
                 } else {
                     return result;
@@ -265,6 +287,20 @@ public class KafkaUtil {
             MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
             throw new LoadException("Failed to get info");
         } finally {
+            // Ensure that not all BE added to the blacklist.
+            // For single request:
+            //     Only when the final success is achieved, the failed BE will 
be added to the blacklist,
+            //     ensuring that there are always BE nodes that are not on the 
blacklist.
+            // For multiple requests:
+            //     If there is only one BE left without being blacklisted 
after multiple jitters,
+            //     even if this BE fails, it will not be blacklisted.
+            if (code != null && code == TStatusCode.OK && 
!failedBeIds.isEmpty()) {
+                for (Long beId : failedBeIds) {
+                    
Env.getCurrentEnv().getRoutineLoadManager().addToBlacklist(beId);
+                    LOG.info("add beId {} to blacklist, blacklist: {}", beId,
+                            
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist());
+                }
+            }
             long endTime = System.currentTimeMillis();
             MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime 
- startTime);
             MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index bfe42ad7695..23b36f11a4b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -91,6 +91,9 @@ public class RoutineLoadManager implements Writable {
 
     private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
+    // Map<beId, timestamp when added to blacklist>
+    private Map<Long, Long> blacklist = new ConcurrentHashMap<>();
+
     private void readLock() {
         lock.readLock().lock();
     }
@@ -110,6 +113,10 @@ public class RoutineLoadManager implements Writable {
     public RoutineLoadManager() {
     }
 
+    public Map<Long, Long> getBlacklist() {
+        return blacklist;
+    }
+
     public List<RoutineLoadJob> getAllRoutineLoadJobs() {
         return new ArrayList<>(idToRoutineLoadJob.values());
     }
@@ -916,4 +923,22 @@ public class RoutineLoadManager implements Writable {
             }
         }
     }
+
+    public void addToBlacklist(long beId) {
+        blacklist.put(beId, System.currentTimeMillis());
+    }
+
+    public boolean isInBlacklist(long beId) {
+        Long timestamp = blacklist.get(beId);
+        if (timestamp == null) {
+            return false;
+        }
+
+        if (System.currentTimeMillis() - timestamp > 
Config.routine_load_blacklist_expire_time_second * 1000) {
+            blacklist.remove(beId);
+            LOG.info("remove beId {} from blacklist, blacklist: {}", beId, 
blacklist);
+            return false;
+        }
+        return true;
+    }
 }
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_black_list.csv 
b/regression-test/suites/load_p0/routine_load/data/test_black_list.csv
new file mode 100644
index 00000000000..b226b99ee4e
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_black_list.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy 
b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
new file mode 100644
index 00000000000..04779f10362
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_black_list","nonConcurrent,p0") {
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // 1. send data
+        def kafkaCsvTpoics = [
+                  "test_black_list",
+                ]
+        String kafka_port = context.config.otherConfigs.get("kafka_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        def kafka_broker = "${externalEnvIp}:${kafka_port}"
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        def producer = new KafkaProducer<>(props)
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+
+        // 2. create table and routine load job
+        def tableName = "test_black_list"
+        def job = "test_black_list_job"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def inject = 
"KafkaDataConsumer.get_latest_offsets_for_partitions.timeout"
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs(inject)
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                log.info("res: ${res}")
+                def state = sql "show routine load for ${job}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                log.info("other msg: ${state[0][19].toString()}".toString())
+                if (state[0][17].toString().contains("Failed to get info") || 
state[0][19].toString().contains("Failed to get info")) {
+                    break
+                }
+                if (count >= 90) {
+                    log.error("routine load test fail")
+                    assertEquals(1, 2)
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+
+            count = 0
+            GetDebugPoint().disableDebugPointForAllBEs(inject)
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${job}"
+                log.info("routine load statistic: 
${res[0][14].toString()}".toString())
+                log.info("progress: ${res[0][15].toString()}".toString())
+                log.info("lag: ${res[0][16].toString()}".toString())
+                res = sql "select count(*) from ${tableName}"
+                if (res[0][0] > 0) {
+                    break;
+                }
+                count++
+                if (count > 60) {
+                    assertEquals(1, 2)
+                } 
+                continue;
+            }
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs(inject)
+            sql "stop routine load for ${job}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to