This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new e7056c52ac4 [branch-2.0](routine-load) add retry when get Kafka meta info and make get Kafka meta timeout configurable (#37458) e7056c52ac4 is described below commit e7056c52ac4d7c58b159810a7d94e4ba3bf2efae Author: hui lai <1353307...@qq.com> AuthorDate: Mon Jul 8 16:59:32 2024 +0800 [branch-2.0](routine-load) add retry when get Kafka meta info and make get Kafka meta timeout configurable (#37458) --- be/src/service/internal_service.cpp | 2 +- .../main/java/org/apache/doris/common/Config.java | 6 + .../org/apache/doris/common/util/KafkaUtil.java | 144 ++++++++++----------- 3 files changed, 72 insertions(+), 80 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 3677c1210a5..9591c1928ee 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -933,7 +933,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, // Currently it supports 2 kinds of requests: // 1. get all kafka partition ids for given topic // 2. get all kafka partition offsets for given topic and timestamp. - int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 5 * 1000; + int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 60 * 1000; if (request->has_kafka_meta_request()) { const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); if (!kafka_request.partition_id_for_latest_offsets().empty()) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index dcdcc7dd035..0b4aa1cfd3a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1141,6 +1141,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_routine_load_task_num_per_be = 5; + /** + * the max timeout of get kafka meta. + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_get_kafka_meta_timeout_second = 60; + /** * The max number of files store in SmallFileMgr */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index 60f423773e7..f6342e1a6fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -18,6 +18,7 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; @@ -41,25 +42,10 @@ import java.util.stream.Collectors; public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); - private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; - private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5; public static List<Integer> getAllKafkaPartitions(String brokerList, String topic, Map<String, String> convertedCustomProperties) throws UserException { - TNetworkAddress address = null; - Backend be = null; - long beId = -1L; try { - List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get all partitions. No alive backends"); - } - Collections.shuffle(backendIds); - beId = backendIds.get(0); - be = Env.getCurrentSystemInfo().getBackend(beId); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -71,21 +57,11 @@ public class KafkaUtil { ) ) ).build(); - - // get info - Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList()); - } else { - return result.getKafkaMetaResult().getPartitionIdsList(); - } + return getInfoRequest(request, Config.max_get_kafka_meta_timeout_second) + .getKafkaMetaResult().getPartitionIdsList(); } catch (Exception e) { - LOG.warn("failed to get partitions from backend[{}].", beId, e); throw new LoadException( - "Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId - + "]. error: " + e.getMessage()); + "Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage()); } } @@ -95,18 +71,10 @@ public class KafkaUtil { public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic, Map<String, String> convertedCustomProperties, List<Pair<Integer, Long>> timestampOffsets) throws LoadException { - TNetworkAddress address = null; - LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets); + if (LOG.isDebugEnabled()) { + LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets); + } try { - List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get offset for times. No alive backends"); - } - Collections.shuffle(backendIds); - Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -127,23 +95,18 @@ public class KafkaUtil { } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); - // get info - Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList()); - } else { - List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); - List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } + List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); + List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); + } + if (LOG.isDebugEnabled()) { LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets); - return partitionOffsets; } + return partitionOffsets; } catch (Exception e) { LOG.warn("failed to get offsets for times.", e); throw new LoadException( @@ -154,19 +117,11 @@ public class KafkaUtil { public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic, Map<String, String> convertedCustomProperties, List<Integer> partitionIds) throws LoadException { - TNetworkAddress address = null; - LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}", - partitionIds, topic, taskId, jobId); + if (LOG.isDebugEnabled()) { + LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}", + partitionIds, topic, taskId, jobId); + } try { - List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get latest offsets. No alive backends"); - } - Collections.shuffle(backendIds); - Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -185,28 +140,59 @@ public class KafkaUtil { metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId); } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build(); + InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second); - // get info - Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList()); - } else { - List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); - List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } + List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); + List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); + } + if (LOG.isDebugEnabled()) { LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}", partitionOffsets, topic, taskId, jobId); - return partitionOffsets; } + return partitionOffsets; } catch (Exception e) { LOG.warn("failed to get latest offsets.", e); throw new LoadException( "Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage()); } } + + private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout) + throws LoadException { + int retryTimes = 0; + TNetworkAddress address = null; + Future<InternalService.PProxyResult> future = null; + InternalService.PProxyResult result = null; + while (retryTimes < 3) { + List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (backendIds.isEmpty()) { + throw new LoadException("Failed to get info. No alive backends"); + } + Collections.shuffle(backendIds); + Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + + try { + future = BackendServiceProxy.getInstance().getInfo(address, request); + result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("failed to get info request to " + address + " err " + e.getMessage()); + retryTimes++; + continue; + } + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + LOG.warn("failed to get info request to " + + address + " err " + result.getStatus().getErrorMsgsList()); + retryTimes++; + } else { + return result; + } + } + + throw new LoadException("Failed to get info"); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org