This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f73205231c0 [improve](routine-load) add retry when get Kafka meta info 
(#35376)
f73205231c0 is described below

commit f73205231c07624217a0f250d2a942f036b335f0
Author: hui lai <1353307...@qq.com>
AuthorDate: Wed Jun 5 14:20:17 2024 +0800

    [improve](routine-load) add retry when get Kafka meta info (#35376)
    
    If be down when FE send RPC `getInfo` or meet network error when be send
    RPC to Kafka, routine load job will pause.
    To keep routine load stable, add retry when get Kafka meta info.
---
 .../apache/doris/datasource/kafka/KafkaUtil.java   | 171 ++++++++-------------
 1 file changed, 66 insertions(+), 105 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 00169f4c8ea..656ebf65152 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -43,24 +43,11 @@ import java.util.stream.Collectors;
 public class KafkaUtil {
     private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
     private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
-    private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
+    private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;
 
     public static List<Integer> getAllKafkaPartitions(String brokerList, 
String topic,
             Map<String, String> convertedCustomProperties) throws 
UserException {
-        TNetworkAddress address = null;
-        Backend be = null;
-        long beId = -1L;
         try {
-            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-            if (backendIds.isEmpty()) {
-                throw new LoadException("Failed to get all partitions. No 
alive backends");
-            }
-            Collections.shuffle(backendIds);
-            beId = backendIds.get(0);
-            be = Env.getCurrentSystemInfo().getBackend(beId);
-            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
-            // create request
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                     InternalService.PKafkaMetaProxyRequest.newBuilder()
                             
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -72,21 +59,10 @@ public class KafkaUtil {
                                     )
                             )
             ).build();
-
-            // get info
-            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = 
future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new UserException("failed to get kafka partition info: " 
+ result.getStatus().getErrorMsgsList());
-            } else {
-                return result.getKafkaMetaResult().getPartitionIdsList();
-            }
+            return getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
         } catch (Exception e) {
-            LOG.warn("failed to get partitions from backend[{}].", beId, e);
             throw new LoadException(
-                    "Failed to get all partitions of kafka topic: " + topic + 
" from backend[" + beId
-                        + "]. error: " + e.getMessage());
+                    "Failed to get all partitions of kafka topic: " + topic + 
" error: " + e.getMessage());
         }
     }
 
@@ -96,20 +72,10 @@ public class KafkaUtil {
     public static List<Pair<Integer, Long>> getOffsetsForTimes(String 
brokerList, String topic,
             Map<String, String> convertedCustomProperties, List<Pair<Integer, 
Long>> timestampOffsets)
             throws LoadException {
-        TNetworkAddress address = null;
         if (LOG.isDebugEnabled()) {
             LOG.debug("begin to get offsets for times of topic: {}, {}", 
topic, timestampOffsets);
         }
         try {
-            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-            if (backendIds.isEmpty()) {
-                throw new LoadException("Failed to get offset for times. No 
alive backends");
-            }
-            Collections.shuffle(backendIds);
-            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
-            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
-            // create request
             InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
                     InternalService.PKafkaMetaProxyRequest.newBuilder()
                             
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -131,24 +97,17 @@ public class KafkaUtil {
 
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                     
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
 
-            // get info
-            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new UserException("failed to get offsets for times: " + 
result.getStatus().getErrorMsgsList());
-            } else {
-                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
-                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
-                for (InternalService.PIntegerPair pair : pairs) {
-                    partitionOffsets.add(Pair.of(pair.getKey(), 
pair.getVal()));
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("finish to get offsets for times of topic: {}, 
{}", topic, partitionOffsets);
-                }
-                return partitionOffsets;
+            List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+            for (InternalService.PIntegerPair pair : pairs) {
+                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("finish to get offsets for times of topic: {}, {}", 
topic, partitionOffsets);
             }
+            return partitionOffsets;
         } catch (Exception e) {
             LOG.warn("failed to get offsets for times.", e);
             throw new LoadException(
@@ -159,21 +118,11 @@ public class KafkaUtil {
     public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID 
taskId, String brokerList, String topic,
                                                              Map<String, 
String> convertedCustomProperties,
                                                              List<Integer> 
partitionIds) throws LoadException {
-        TNetworkAddress address = null;
         if (LOG.isDebugEnabled()) {
             LOG.debug("begin to get latest offsets for partitions {} in topic: 
{}, task {}, job {}",
                     partitionIds, topic, taskId, jobId);
         }
         try {
-            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-            if (backendIds.isEmpty()) {
-                throw new LoadException("Failed to get latest offsets. No 
alive backends");
-            }
-            Collections.shuffle(backendIds);
-            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
-            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
-            // create request
             InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
                     InternalService.PKafkaMetaProxyRequest.newBuilder()
                             
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -193,25 +142,18 @@ public class KafkaUtil {
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                     
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
 
-            // get info
-            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new UserException("failed to get latest offsets: " + 
result.getStatus().getErrorMsgsList());
-            } else {
-                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
-                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
-                for (InternalService.PIntegerPair pair : pairs) {
-                    partitionOffsets.add(Pair.of(pair.getKey(), 
pair.getVal()));
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("finish to get latest offsets for partitions {} 
in topic: {}, task {}, job {}",
-                            partitionOffsets, topic, taskId, jobId);
-                }
-                return partitionOffsets;
+            List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+            for (InternalService.PIntegerPair pair : pairs) {
+                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("finish to get latest offsets for partitions {} in 
topic: {}, task {}, job {}",
+                        partitionOffsets, topic, taskId, jobId);
             }
+            return partitionOffsets;
         } catch (Exception e) {
             LOG.warn("failed to get latest offsets.", e);
             throw new LoadException(
@@ -239,17 +181,7 @@ public class KafkaUtil {
             return offsets;
         }
 
-        TNetworkAddress address = null;
         try {
-            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-            if (backendIds.isEmpty()) {
-                throw new LoadException("Failed to get real offsets. No alive 
backends");
-            }
-            Collections.shuffle(backendIds);
-            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
-            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
-            // create request
             InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
                     InternalService.PKafkaMetaProxyRequest.newBuilder()
                             
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -270,27 +202,56 @@ public class KafkaUtil {
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                     
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
 
-            // get info
-            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new UserException("failed to get real offsets: " + 
result.getStatus().getErrorMsgsList());
-            } else {
-                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
-                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
-                for (InternalService.PIntegerPair pair : pairs) {
-                    partitionOffsets.add(Pair.of(pair.getKey(), 
pair.getVal()));
-                }
-                realOffsets.addAll(partitionOffsets);
-                LOG.info("finish to get real offsets for partitions {} in 
topic: {}", realOffsets, topic);
-                return realOffsets;
+            List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+            for (InternalService.PIntegerPair pair : pairs) {
+                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
             }
+            realOffsets.addAll(partitionOffsets);
+            LOG.info("finish to get real offsets for partitions {} in topic: 
{}", realOffsets, topic);
+            return realOffsets;
         } catch (Exception e) {
             LOG.warn("failed to get real offsets.", e);
             throw new LoadException(
                     "Failed to get real offsets of kafka topic: " + topic + ". 
error: " + e.getMessage());
         }
     }
+
+    private static InternalService.PProxyResult 
getInfoRequest(InternalService.PProxyRequest request, int timeout)
+                                                        throws LoadException {
+        int retryTimes = 0;
+        TNetworkAddress address = null;
+        Future<InternalService.PProxyResult> future = null;
+        InternalService.PProxyResult result = null;
+        while (retryTimes < 3) {
+            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+            if (backendIds.isEmpty()) {
+                throw new LoadException("Failed to get info. No alive 
backends");
+            }
+            Collections.shuffle(backendIds);
+            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+            try {
+                future = BackendServiceProxy.getInstance().getInfo(address, 
request);
+                result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+            } catch (Exception e) {
+                LOG.warn("failed to get info request to " + address + " err " 
+ e.getMessage());
+                retryTimes++;
+                continue;
+            }
+            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                LOG.warn("failed to get info request to "
+                        + address + " err " + 
result.getStatus().getErrorMsgsList());
+                retryTimes++;
+            } else {
+                return result;
+            }
+        }
+
+        throw new LoadException("Failed to get info");
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to