liaoxin01 commented on code in PR #35376:
URL: https://github.com/apache/doris/pull/35376#discussion_r1616526526


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java:
##########
@@ -270,27 +202,60 @@ public static List<Pair<Integer, Long>> 
getRealOffsets(String brokerList, String
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                     
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+            InternalService.PProxyResult result = getInfoRequest(request, 
MAX_GET_OFFSET_TIMEOUT_SECOND);
 
-            // get info
-            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new UserException("failed to get real offsets: " + 
result.getStatus().getErrorMsgsList());
-            } else {
-                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
-                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
-                for (InternalService.PIntegerPair pair : pairs) {
-                    partitionOffsets.add(Pair.of(pair.getKey(), 
pair.getVal()));
-                }
-                realOffsets.addAll(partitionOffsets);
-                LOG.info("finish to get real offsets for partitions {} in 
topic: {}", realOffsets, topic);
-                return realOffsets;
+            List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+            for (InternalService.PIntegerPair pair : pairs) {
+                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
             }
+            realOffsets.addAll(partitionOffsets);
+            LOG.info("finish to get real offsets for partitions {} in topic: 
{}", realOffsets, topic);
+            return realOffsets;
         } catch (Exception e) {
             LOG.warn("failed to get real offsets.", e);
             throw new LoadException(
                     "Failed to get real offsets of kafka topic: " + topic + ". 
error: " + e.getMessage());
         }
     }
+
+    private static InternalService.PProxyResult 
getInfoRequest(InternalService.PProxyRequest request, int timeout)
+                                                        throws LoadException {
+        int retryTimes = 0;
+        TNetworkAddress address = null;
+        while (true) {
+            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+            if (backendIds.isEmpty()) {
+                throw new LoadException("Failed to get info. No alive 
backends");
+            }
+            Collections.shuffle(backendIds);
+            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+            Future<InternalService.PProxyResult> future;
+            InternalService.PProxyResult result;
+            try {
+                future = BackendServiceProxy.getInstance().getInfo(address, 
request);
+                result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+            } catch (Exception e) {

Review Comment:
   It may result in dead loop if throw exception constantly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to