liaoxin01 commented on code in PR #35376: URL: https://github.com/apache/doris/pull/35376#discussion_r1616526526
########## fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java: ########## @@ -270,27 +202,60 @@ public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); - // get info - Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get real offsets: " + result.getStatus().getErrorMsgsList()); - } else { - List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); - List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } - realOffsets.addAll(partitionOffsets); - LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic); - return realOffsets; + List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); + List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); } + realOffsets.addAll(partitionOffsets); + LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic); + return realOffsets; } catch (Exception e) { LOG.warn("failed to get real offsets.", e); throw new LoadException( "Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage()); } } + + private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout) + throws LoadException { + int retryTimes = 0; + TNetworkAddress address = null; + while (true) { + List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (backendIds.isEmpty()) { + throw new LoadException("Failed to get info. No alive backends"); + } + Collections.shuffle(backendIds); + Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + + Future<InternalService.PProxyResult> future; + InternalService.PProxyResult result; + try { + future = BackendServiceProxy.getInstance().getInfo(address, request); + result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); + } catch (Exception e) { Review Comment: It may result in dead loop if throw exception constantly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org