This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 179e650ef11 [chore](routine-load) make get Kafka meta timeout configurable (#36619) 179e650ef11 is described below commit 179e650ef11f008e8e94c3abef551da1a987d846 Author: hui lai <1353307...@qq.com> AuthorDate: Sat Jun 22 08:02:26 2024 +0800 [chore](routine-load) make get Kafka meta timeout configurable (#36619) Sometimes, the delay from be to Kafka is relatively high, which can cause get info RPC timeout. Make get Kafka meta timeout configurable allow users to customize timeout times which can solve this issue. --- be/src/service/internal_service.cpp | 2 +- .../main/java/org/apache/doris/common/Config.java | 6 ++++++ .../org/apache/doris/datasource/kafka/KafkaUtil.java | 20 ++++++++++---------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a547b02a88b..8bf04ead035 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1170,7 +1170,7 @@ void PInternalService::get_info(google::protobuf::RpcController* controller, // Currently it supports 2 kinds of requests: // 1. get all kafka partition ids for given topic // 2. get all kafka partition offsets for given topic and timestamp. - int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 5 * 1000; + int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 60 * 1000; if (request->has_kafka_meta_request()) { const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); if (!kafka_request.offset_flags().empty()) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index eaebeafb0c5..8054063642e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1185,6 +1185,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_routine_load_task_num_per_be = 1024; + /** + * the max timeout of get kafka meta. + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_get_kafka_meta_timeout_second = 60; + /** * The max number of files store in SmallFileMgr */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index 656ebf65152..c0c932bb8ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.kafka; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; @@ -42,8 +43,6 @@ import java.util.stream.Collectors; public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); - private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; - private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10; public static List<Integer> getAllKafkaPartitions(String brokerList, String topic, Map<String, String> convertedCustomProperties) throws UserException { @@ -59,7 +58,8 @@ public class KafkaUtil { ) ) ).build(); - return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList(); + return getInfoRequest(request, Config.max_get_kafka_meta_timeout_second) + .getKafkaMetaResult().getPartitionIdsList(); } catch (Exception e) { throw new LoadException( "Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage()); @@ -96,8 +96,8 @@ public class KafkaUtil { } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); @@ -141,8 +141,8 @@ public class KafkaUtil { metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId); } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); @@ -201,8 +201,8 @@ public class KafkaUtil { .setVal(pair.second).build()); } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); - InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); @@ -236,7 +236,7 @@ public class KafkaUtil { try { future = BackendServiceProxy.getInstance().getInfo(address, request); - result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); + result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("failed to get info request to " + address + " err " + e.getMessage()); retryTimes++; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org