This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new e7056c52ac4 [branch-2.0](routine-load) add retry when get Kafka meta 
info and make get Kafka meta timeout configurable (#37458)
e7056c52ac4 is described below

commit e7056c52ac4d7c58b159810a7d94e4ba3bf2efae
Author: hui lai <1353307...@qq.com>
AuthorDate: Mon Jul 8 16:59:32 2024 +0800

    [branch-2.0](routine-load) add retry when get Kafka meta info and make get 
Kafka meta timeout configurable (#37458)
---
 be/src/service/internal_service.cpp                |   2 +-
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../org/apache/doris/common/util/KafkaUtil.java    | 144 ++++++++++-----------
 3 files changed, 72 insertions(+), 80 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 3677c1210a5..9591c1928ee 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -933,7 +933,7 @@ void 
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
         // Currently it supports 2 kinds of requests:
         // 1. get all kafka partition ids for given topic
         // 2. get all kafka partition offsets for given topic and timestamp.
-        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() 
* 1000 : 5 * 1000;
+        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() 
* 1000 : 60 * 1000;
         if (request->has_kafka_meta_request()) {
             const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
             if (!kafka_request.partition_id_for_latest_offsets().empty()) {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index dcdcc7dd035..0b4aa1cfd3a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1141,6 +1141,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int max_routine_load_task_num_per_be = 5;
 
+    /**
+     * the max timeout of get kafka meta.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int max_get_kafka_meta_timeout_second = 60;
+
     /**
      * The max number of files store in SmallFileMgr
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index 60f423773e7..f6342e1a6fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common.util;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
@@ -41,25 +42,10 @@ import java.util.stream.Collectors;
 
 public class KafkaUtil {
     private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
-    private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
-    private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
 
     public static List<Integer> getAllKafkaPartitions(String brokerList, 
String topic,
             Map<String, String> convertedCustomProperties) throws 
UserException {
-        TNetworkAddress address = null;
-        Backend be = null;
-        long beId = -1L;
         try {
-            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-            if (backendIds.isEmpty()) {
-                throw new LoadException("Failed to get all partitions. No 
alive backends");
-            }
-            Collections.shuffle(backendIds);
-            beId = backendIds.get(0);
-            be = Env.getCurrentSystemInfo().getBackend(beId);
-            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
-            // create request
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                     InternalService.PKafkaMetaProxyRequest.newBuilder()
                             
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -71,21 +57,11 @@ public class KafkaUtil {
                                     )
                             )
             ).build();
-
-            // get info
-            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = 
future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new UserException("failed to get kafka partition info: " 
+ result.getStatus().getErrorMsgsList());
-            } else {
-                return result.getKafkaMetaResult().getPartitionIdsList();
-            }
+            return getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second)
+                    .getKafkaMetaResult().getPartitionIdsList();
         } catch (Exception e) {
-            LOG.warn("failed to get partitions from backend[{}].", beId, e);
             throw new LoadException(
-                    "Failed to get all partitions of kafka topic: " + topic + 
" from backend[" + beId
-                        + "]. error: " + e.getMessage());
+                    "Failed to get all partitions of kafka topic: " + topic + 
" error: " + e.getMessage());
         }
     }
 
@@ -95,18 +71,10 @@ public class KafkaUtil {
     public static List<Pair<Integer, Long>> getOffsetsForTimes(String 
brokerList, String topic,
             Map<String, String> convertedCustomProperties, List<Pair<Integer, 
Long>> timestampOffsets)
             throws LoadException {
-        TNetworkAddress address = null;
-        LOG.debug("begin to get offsets for times of topic: {}, {}", topic, 
timestampOffsets);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("begin to get offsets for times of topic: {}, {}", 
topic, timestampOffsets);
+        }
         try {
-            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-            if (backendIds.isEmpty()) {
-                throw new LoadException("Failed to get offset for times. No 
alive backends");
-            }
-            Collections.shuffle(backendIds);
-            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
-            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
-            // create request
             InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
                     InternalService.PKafkaMetaProxyRequest.newBuilder()
                             
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -127,23 +95,18 @@ public class KafkaUtil {
             }
 
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+                    
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second);
 
-            // get info
-            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new UserException("failed to get offsets for times: " + 
result.getStatus().getErrorMsgsList());
-            } else {
-                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
-                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
-                for (InternalService.PIntegerPair pair : pairs) {
-                    partitionOffsets.add(Pair.of(pair.getKey(), 
pair.getVal()));
-                }
+            List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+            for (InternalService.PIntegerPair pair : pairs) {
+                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
+            }
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("finish to get offsets for times of topic: {}, {}", 
topic, partitionOffsets);
-                return partitionOffsets;
             }
+            return partitionOffsets;
         } catch (Exception e) {
             LOG.warn("failed to get offsets for times.", e);
             throw new LoadException(
@@ -154,19 +117,11 @@ public class KafkaUtil {
     public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID 
taskId, String brokerList, String topic,
                                                              Map<String, 
String> convertedCustomProperties,
                                                              List<Integer> 
partitionIds) throws LoadException {
-        TNetworkAddress address = null;
-        LOG.debug("begin to get latest offsets for partitions {} in topic: {}, 
task {}, job {}",
-                partitionIds, topic, taskId, jobId);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("begin to get latest offsets for partitions {} in topic: 
{}, task {}, job {}",
+                    partitionIds, topic, taskId, jobId);
+        }
         try {
-            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-            if (backendIds.isEmpty()) {
-                throw new LoadException("Failed to get latest offsets. No 
alive backends");
-            }
-            Collections.shuffle(backendIds);
-            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
-            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
-            // create request
             InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
                     InternalService.PKafkaMetaProxyRequest.newBuilder()
                             
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -185,28 +140,59 @@ public class KafkaUtil {
                 metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+                    
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
Config.max_get_kafka_meta_timeout_second);
 
-            // get info
-            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new UserException("failed to get latest offsets: " + 
result.getStatus().getErrorMsgsList());
-            } else {
-                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
-                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
-                for (InternalService.PIntegerPair pair : pairs) {
-                    partitionOffsets.add(Pair.of(pair.getKey(), 
pair.getVal()));
-                }
+            List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+            for (InternalService.PIntegerPair pair : pairs) {
+                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
+            }
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("finish to get latest offsets for partitions {} in 
topic: {}, task {}, job {}",
                         partitionOffsets, topic, taskId, jobId);
-                return partitionOffsets;
             }
+            return partitionOffsets;
         } catch (Exception e) {
             LOG.warn("failed to get latest offsets.", e);
             throw new LoadException(
                     "Failed to get latest offsets of kafka topic: " + topic + 
". error: " + e.getMessage());
         }
     }
+
+    private static InternalService.PProxyResult 
getInfoRequest(InternalService.PProxyRequest request, int timeout)
+                                                        throws LoadException {
+        int retryTimes = 0;
+        TNetworkAddress address = null;
+        Future<InternalService.PProxyResult> future = null;
+        InternalService.PProxyResult result = null;
+        while (retryTimes < 3) {
+            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+            if (backendIds.isEmpty()) {
+                throw new LoadException("Failed to get info. No alive 
backends");
+            }
+            Collections.shuffle(backendIds);
+            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+            try {
+                future = BackendServiceProxy.getInstance().getInfo(address, 
request);
+                result = future.get(Config.max_get_kafka_meta_timeout_second, 
TimeUnit.SECONDS);
+            } catch (Exception e) {
+                LOG.warn("failed to get info request to " + address + " err " 
+ e.getMessage());
+                retryTimes++;
+                continue;
+            }
+            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                LOG.warn("failed to get info request to "
+                        + address + " err " + 
result.getStatus().getErrorMsgsList());
+                retryTimes++;
+            } else {
+                return result;
+            }
+        }
+
+        throw new LoadException("Failed to get info");
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to