This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new dd18652861b [branch-2.1](routine-load) make get Kafka meta timeout 
configurable (#37399)
dd18652861b is described below

commit dd18652861bc2879f230404d0285788d75a82b41
Author: hui lai <1353307...@qq.com>
AuthorDate: Mon Jul 8 10:39:17 2024 +0800

    [branch-2.1](routine-load) make get Kafka meta timeout configurable (#37399)
    
    pick #36619
---
 be/src/service/internal_service.cpp                  |  2 +-
 .../main/java/org/apache/doris/common/Config.java    |  6 ++++++
 .../org/apache/doris/datasource/kafka/KafkaUtil.java | 20 ++++++++++----------
 3 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index c6bedc630e8..f4831d08d29 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1190,7 +1190,7 @@ void 
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
         // Currently it supports 2 kinds of requests:
         // 1. get all kafka partition ids for given topic
         // 2. get all kafka partition offsets for given topic and timestamp.
-        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() 
* 1000 : 5 * 1000;
+        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() 
* 1000 : 60 * 1000;
         if (request->has_kafka_meta_request()) {
             const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
             if (!kafka_request.offset_flags().empty()) {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 94d5725c38a..1be7b871d68 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1181,6 +1181,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int max_routine_load_task_num_per_be = 1024;
 
+    /**
+     * the max timeout of get kafka meta.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int max_get_kafka_meta_timeout_second = 60;
+
     /**
      * The max number of files store in SmallFileMgr
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 656ebf65152..c0c932bb8ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -18,6 +18,7 @@
 package org.apache.doris.datasource.kafka;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
@@ -42,8 +43,6 @@ import java.util.stream.Collectors;
 
 public class KafkaUtil {
     private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
-    private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
-    private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;
 
     public static List<Integer> getAllKafkaPartitions(String brokerList, 
String topic,
             Map<String, String> convertedCustomProperties) throws 
UserException {
@@ -59,7 +58,8 @@ public class KafkaUtil {
                                     )
                             )
             ).build();
-            return getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
+            return getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second)
+                    .getKafkaMetaResult().getPartitionIdsList();
         } catch (Exception e) {
             throw new LoadException(
                     "Failed to get all partitions of kafka topic: " + topic + 
" error: " + e.getMessage());
@@ -96,8 +96,8 @@ public class KafkaUtil {
             }
 
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
-            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
+                    
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second);
 
             List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
             List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -141,8 +141,8 @@ public class KafkaUtil {
                 metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
-            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
+                    
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second);
 
             List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
             List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -201,8 +201,8 @@ public class KafkaUtil {
                         .setVal(pair.second).build());
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
-            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
+                    
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second);
 
             List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
             List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -236,7 +236,7 @@ public class KafkaUtil {
 
             try {
                 future = BackendServiceProxy.getInstance().getInfo(address, 
request);
-                result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+                result = future.get(Config.max_get_kafka_meta_timeout_second, 
TimeUnit.SECONDS);
             } catch (Exception e) {
                 LOG.warn("failed to get info request to " + address + " err " 
+ e.getMessage());
                 retryTimes++;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to