This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 898fe415b9d branch-3.0: [improve](load) introduce black list of backend when load job fetch meta to avoid jitter #50587 (#51043) 898fe415b9d is described below commit 898fe415b9d09adebf3bcf8a6f3f8511e0a2e7a4 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu May 22 14:18:34 2025 +0800 branch-3.0: [improve](load) introduce black list of backend when load job fetch meta to avoid jitter #50587 (#51043) Cherry-picked from #50587 Co-authored-by: hui lai <lai...@selectdb.com> --- be/src/runtime/routine_load/data_consumer.cpp | 4 + .../main/java/org/apache/doris/common/Config.java | 7 ++ .../apache/doris/datasource/kafka/KafkaUtil.java | 40 ++++++- .../doris/load/routineload/RoutineLoadManager.java | 25 +++++ .../load_p0/routine_load/data/test_black_list.csv | 1 + .../load_p0/routine_load/test_black_list.groovy | 124 +++++++++++++++++++++ 6 files changed, 199 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 7566d06914a..00f9c726b41 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -418,6 +418,10 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& Status KafkaDataConsumer::get_latest_offsets_for_partitions( const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets, int timeout) { + DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", { + // sleep 60s + std::this_thread::sleep_for(std::chrono::seconds(60)); + }); MonotonicStopWatch watch; watch.start(); for (int32_t partition_id : partition_ids) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index e609da9900c..e5c88a0e485 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1270,6 +1270,13 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_get_kafka_meta_timeout_second = 60; + + /** + * the expire time of routine load blacklist. + */ + @ConfField(mutable = true, masterOnly = true) + public static int routine_load_blacklist_expire_time_second = 300; + /** * The max number of files store in SmallFileMgr */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index da683cc2b37..3e78ba0d4a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -35,8 +35,10 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -227,12 +229,29 @@ public class KafkaUtil { TNetworkAddress address = null; Future<InternalService.PProxyResult> future = null; InternalService.PProxyResult result = null; + Set<Long> failedBeIds = new HashSet<>(); + TStatusCode code = null; + try { while (retryTimes < 3) { List<Long> backendIds = new ArrayList<>(); for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) { Backend backend = Env.getCurrentSystemInfo().getBackend(beId); - if (backend != null && backend.isLoadAvailable() && !backend.isDecommissioned()) { + if (backend != null && backend.isLoadAvailable() + && !backend.isDecommissioned() + && !failedBeIds.contains(beId) + && !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) { + backendIds.add(beId); + } + } + // If there are no available backends, utilize the blacklist. + // Special scenarios include: + // 1. A specific job that connects to Kafka may time out for topic config or network error, + // leaving only one backend operational. + // 2. If that sole backend is decommissioned, the aliveBackends list becomes empty. + // Hence, in such cases, it's essential to rely on the blacklist to obtain meta information. + if (backendIds.isEmpty()) { + for (Long beId : Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) { backendIds.add(beId); } } @@ -243,19 +262,22 @@ public class KafkaUtil { Collections.shuffle(backendIds); Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + long beId = be.getId(); try { future = BackendServiceProxy.getInstance().getInfo(address, request); result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("failed to get info request to " + address + " err " + e.getMessage()); + failedBeIds.add(beId); retryTimes++; continue; } - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { LOG.warn("failed to get info request to " + address + " err " + result.getStatus().getErrorMsgsList()); + failedBeIds.add(beId); retryTimes++; } else { return result; @@ -265,6 +287,20 @@ public class KafkaUtil { MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L); throw new LoadException("Failed to get info"); } finally { + // Ensure that not all BE added to the blacklist. + // For single request: + // Only when the final success is achieved, the failed BE will be added to the blacklist, + // ensuring that there are always BE nodes that are not on the blacklist. + // For multiple requests: + // If there is only one BE left without being blacklisted after multiple jitters, + // even if this BE fails, it will not be blacklisted. + if (code != null && code == TStatusCode.OK && !failedBeIds.isEmpty()) { + for (Long beId : failedBeIds) { + Env.getCurrentEnv().getRoutineLoadManager().addToBlacklist(beId); + LOG.info("add beId {} to blacklist, blacklist: {}", beId, + Env.getCurrentEnv().getRoutineLoadManager().getBlacklist()); + } + } long endTime = System.currentTimeMillis(); MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime - startTime); MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index bfe42ad7695..23b36f11a4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -91,6 +91,9 @@ public class RoutineLoadManager implements Writable { private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + // Map<beId, timestamp when added to blacklist> + private Map<Long, Long> blacklist = new ConcurrentHashMap<>(); + private void readLock() { lock.readLock().lock(); } @@ -110,6 +113,10 @@ public class RoutineLoadManager implements Writable { public RoutineLoadManager() { } + public Map<Long, Long> getBlacklist() { + return blacklist; + } + public List<RoutineLoadJob> getAllRoutineLoadJobs() { return new ArrayList<>(idToRoutineLoadJob.values()); } @@ -916,4 +923,22 @@ public class RoutineLoadManager implements Writable { } } } + + public void addToBlacklist(long beId) { + blacklist.put(beId, System.currentTimeMillis()); + } + + public boolean isInBlacklist(long beId) { + Long timestamp = blacklist.get(beId); + if (timestamp == null) { + return false; + } + + if (System.currentTimeMillis() - timestamp > Config.routine_load_blacklist_expire_time_second * 1000) { + blacklist.remove(beId); + LOG.info("remove beId {} from blacklist, blacklist: {}", beId, blacklist); + return false; + } + return true; + } } diff --git a/regression-test/suites/load_p0/routine_load/data/test_black_list.csv b/regression-test/suites/load_p0/routine_load/data/test_black_list.csv new file mode 100644 index 00000000000..b226b99ee4e --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_black_list.csv @@ -0,0 +1 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy b/regression-test/suites/load_p0/routine_load/test_black_list.groovy new file mode 100644 index 00000000000..04779f10362 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_black_list","nonConcurrent,p0") { + String enabled = context.config.otherConfigs.get("enableKafkaTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // 1. send data + def kafkaCsvTpoics = [ + "test_black_list", + ] + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + def producer = new KafkaProducer<>(props) + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + + // 2. create table and routine load job + def tableName = "test_black_list" + def job = "test_black_list_job" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def inject = "KafkaDataConsumer.get_latest_offsets_for_partitions.timeout" + try { + GetDebugPoint().enableDebugPointForAllBEs(inject) + sql """ + CREATE ROUTINE LOAD ${job} ON ${tableName} + COLUMNS TERMINATED BY "," + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + def count = 0 + while (true) { + def res = sql "select count(*) from ${tableName}" + log.info("res: ${res}") + def state = sql "show routine load for ${job}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + log.info("other msg: ${state[0][19].toString()}".toString()) + if (state[0][17].toString().contains("Failed to get info") || state[0][19].toString().contains("Failed to get info")) { + break + } + if (count >= 90) { + log.error("routine load test fail") + assertEquals(1, 2) + break + } + sleep(1000) + count++ + } + + count = 0 + GetDebugPoint().disableDebugPointForAllBEs(inject) + while (true) { + sleep(1000) + def res = sql "show routine load for ${job}" + log.info("routine load statistic: ${res[0][14].toString()}".toString()) + log.info("progress: ${res[0][15].toString()}".toString()) + log.info("lag: ${res[0][16].toString()}".toString()) + res = sql "select count(*) from ${tableName}" + if (res[0][0] > 0) { + break; + } + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs(inject) + sql "stop routine load for ${job}" + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org