This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 179e650ef11 [chore](routine-load) make get Kafka meta timeout 
configurable (#36619)
179e650ef11 is described below

commit 179e650ef11f008e8e94c3abef551da1a987d846
Author: hui lai <1353307...@qq.com>
AuthorDate: Sat Jun 22 08:02:26 2024 +0800

    [chore](routine-load) make get Kafka meta timeout configurable (#36619)
    
    Sometimes, the delay from be to Kafka is relatively high, which can
    cause get info RPC timeout.
    
    Make get Kafka meta timeout configurable allow users to customize
    timeout times which can solve this issue.
---
 be/src/service/internal_service.cpp                  |  2 +-
 .../main/java/org/apache/doris/common/Config.java    |  6 ++++++
 .../org/apache/doris/datasource/kafka/KafkaUtil.java | 20 ++++++++++----------
 3 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index a547b02a88b..8bf04ead035 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1170,7 +1170,7 @@ void 
PInternalService::get_info(google::protobuf::RpcController* controller,
         // Currently it supports 2 kinds of requests:
         // 1. get all kafka partition ids for given topic
         // 2. get all kafka partition offsets for given topic and timestamp.
-        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() 
* 1000 : 5 * 1000;
+        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() 
* 1000 : 60 * 1000;
         if (request->has_kafka_meta_request()) {
             const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
             if (!kafka_request.offset_flags().empty()) {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index eaebeafb0c5..8054063642e 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1185,6 +1185,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int max_routine_load_task_num_per_be = 1024;
 
+    /**
+     * the max timeout of get kafka meta.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int max_get_kafka_meta_timeout_second = 60;
+
     /**
      * The max number of files store in SmallFileMgr
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 656ebf65152..c0c932bb8ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -18,6 +18,7 @@
 package org.apache.doris.datasource.kafka;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
@@ -42,8 +43,6 @@ import java.util.stream.Collectors;
 
 public class KafkaUtil {
     private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
-    private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
-    private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;
 
     public static List<Integer> getAllKafkaPartitions(String brokerList, 
String topic,
             Map<String, String> convertedCustomProperties) throws 
UserException {
@@ -59,7 +58,8 @@ public class KafkaUtil {
                                     )
                             )
             ).build();
-            return getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
+            return getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second)
+                    .getKafkaMetaResult().getPartitionIdsList();
         } catch (Exception e) {
             throw new LoadException(
                     "Failed to get all partitions of kafka topic: " + topic + 
" error: " + e.getMessage());
@@ -96,8 +96,8 @@ public class KafkaUtil {
             }
 
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
-            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
+                    
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second);
 
             List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
             List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -141,8 +141,8 @@ public class KafkaUtil {
                 metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
-            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
+                    
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second);
 
             List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
             List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -201,8 +201,8 @@ public class KafkaUtil {
                         .setVal(pair.second).build());
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
-            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
+                    
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second);
 
             List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
             List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -236,7 +236,7 @@ public class KafkaUtil {
 
             try {
                 future = BackendServiceProxy.getInstance().getInfo(address, 
request);
-                result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+                result = future.get(Config.max_get_kafka_meta_timeout_second, 
TimeUnit.SECONDS);
             } catch (Exception e) {
                 LOG.warn("failed to get info request to " + address + " err " 
+ e.getMessage());
                 retryTimes++;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to