This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new dd18652861b [branch-2.1](routine-load) make get Kafka meta timeout configurable (#37399) dd18652861b is described below commit dd18652861bc2879f230404d0285788d75a82b41 Author: hui lai <1353307...@qq.com> AuthorDate: Mon Jul 8 10:39:17 2024 +0800 [branch-2.1](routine-load) make get Kafka meta timeout configurable (#37399) pick #36619 --- be/src/service/internal_service.cpp | 2 +- .../main/java/org/apache/doris/common/Config.java | 6 ++++++ .../org/apache/doris/datasource/kafka/KafkaUtil.java | 20 ++++++++++---------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index c6bedc630e8..f4831d08d29 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1190,7 +1190,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, // Currently it supports 2 kinds of requests: // 1. get all kafka partition ids for given topic // 2. get all kafka partition offsets for given topic and timestamp. - int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 5 * 1000; + int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 60 * 1000; if (request->has_kafka_meta_request()) { const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); if (!kafka_request.offset_flags().empty()) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 94d5725c38a..1be7b871d68 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1181,6 +1181,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_routine_load_task_num_per_be = 1024; + /** + * the max timeout of get kafka meta. + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_get_kafka_meta_timeout_second = 60; + /** * The max number of files store in SmallFileMgr */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index 656ebf65152..c0c932bb8ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.kafka; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; @@ -42,8 +43,6 @@ import java.util.stream.Collectors; public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); - private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; - private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10; public static List<Integer> getAllKafkaPartitions(String brokerList, String topic, Map<String, String> convertedCustomProperties) throws UserException { @@ -59,7 +58,8 @@ public class KafkaUtil { ) ) ).build(); - return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList(); + return getInfoRequest(request, Config.max_get_kafka_meta_timeout_second) + .getKafkaMetaResult().getPartitionIdsList(); } catch (Exception e) { throw new LoadException( "Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage()); @@ -96,8 +96,8 @@ public class KafkaUtil { } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); @@ -141,8 +141,8 @@ public class KafkaUtil { metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId); } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); @@ -201,8 +201,8 @@ public class KafkaUtil { .setVal(pair.second).build()); } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); @@ -236,7 +236,7 @@ public class KafkaUtil { try { future = BackendServiceProxy.getInstance().getInfo(address, request); - result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); + result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("failed to get info request to " + address + " err " + e.getMessage()); retryTimes++; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org