This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f73205231c0 [improve](routine-load) add retry when get Kafka meta info
(#35376)
f73205231c0 is described below
commit f73205231c07624217a0f250d2a942f036b335f0
Author: hui lai <[email protected]>
AuthorDate: Wed Jun 5 14:20:17 2024 +0800
[improve](routine-load) add retry when get Kafka meta info (#35376)
If be down when FE send RPC `getInfo` or meet network error when be send
RPC to Kafka, routine load job will pause.
To keep routine load stable, add retry when get Kafka meta info.
---
.../apache/doris/datasource/kafka/KafkaUtil.java | 171 ++++++++-------------
1 file changed, 66 insertions(+), 105 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 00169f4c8ea..656ebf65152 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -43,24 +43,11 @@ import java.util.stream.Collectors;
public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
- private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
+ private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;
public static List<Integer> getAllKafkaPartitions(String brokerList,
String topic,
Map<String, String> convertedCustomProperties) throws
UserException {
- TNetworkAddress address = null;
- Backend be = null;
- long beId = -1L;
try {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (backendIds.isEmpty()) {
- throw new LoadException("Failed to get all partitions. No
alive backends");
- }
- Collections.shuffle(backendIds);
- beId = backendIds.get(0);
- be = Env.getCurrentSystemInfo().getBackend(beId);
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
- // create request
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -72,21 +59,10 @@ public class KafkaUtil {
)
)
).build();
-
- // get info
- Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result =
future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS);
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- throw new UserException("failed to get kafka partition info: "
+ result.getStatus().getErrorMsgsList());
- } else {
- return result.getKafkaMetaResult().getPartitionIdsList();
- }
+ return getInfoRequest(request,
MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
} catch (Exception e) {
- LOG.warn("failed to get partitions from backend[{}].", beId, e);
throw new LoadException(
- "Failed to get all partitions of kafka topic: " + topic +
" from backend[" + beId
- + "]. error: " + e.getMessage());
+ "Failed to get all partitions of kafka topic: " + topic +
" error: " + e.getMessage());
}
}
@@ -96,20 +72,10 @@ public class KafkaUtil {
public static List<Pair<Integer, Long>> getOffsetsForTimes(String
brokerList, String topic,
Map<String, String> convertedCustomProperties, List<Pair<Integer,
Long>> timestampOffsets)
throws LoadException {
- TNetworkAddress address = null;
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get offsets for times of topic: {}, {}",
topic, timestampOffsets);
}
try {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (backendIds.isEmpty()) {
- throw new LoadException("Failed to get offset for times. No
alive backends");
- }
- Collections.shuffle(backendIds);
- Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
- // create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -131,24 +97,17 @@ public class KafkaUtil {
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+ InternalService.PProxyResult result = getInfoRequest(request,
MAX_GET_OFFSET_TIMEOUT_SECOND);
- // get info
- Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result =
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- throw new UserException("failed to get offsets for times: " +
result.getStatus().getErrorMsgsList());
- } else {
- List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
- List<Pair<Integer, Long>> partitionOffsets =
Lists.newArrayList();
- for (InternalService.PIntegerPair pair : pairs) {
- partitionOffsets.add(Pair.of(pair.getKey(),
pair.getVal()));
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("finish to get offsets for times of topic: {},
{}", topic, partitionOffsets);
- }
- return partitionOffsets;
+ List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
+ List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+ for (InternalService.PIntegerPair pair : pairs) {
+ partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finish to get offsets for times of topic: {}, {}",
topic, partitionOffsets);
}
+ return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get offsets for times.", e);
throw new LoadException(
@@ -159,21 +118,11 @@ public class KafkaUtil {
public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID
taskId, String brokerList, String topic,
Map<String,
String> convertedCustomProperties,
List<Integer>
partitionIds) throws LoadException {
- TNetworkAddress address = null;
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get latest offsets for partitions {} in topic:
{}, task {}, job {}",
partitionIds, topic, taskId, jobId);
}
try {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (backendIds.isEmpty()) {
- throw new LoadException("Failed to get latest offsets. No
alive backends");
- }
- Collections.shuffle(backendIds);
- Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
- // create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -193,25 +142,18 @@ public class KafkaUtil {
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+ InternalService.PProxyResult result = getInfoRequest(request,
MAX_GET_OFFSET_TIMEOUT_SECOND);
- // get info
- Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result =
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- throw new UserException("failed to get latest offsets: " +
result.getStatus().getErrorMsgsList());
- } else {
- List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
- List<Pair<Integer, Long>> partitionOffsets =
Lists.newArrayList();
- for (InternalService.PIntegerPair pair : pairs) {
- partitionOffsets.add(Pair.of(pair.getKey(),
pair.getVal()));
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("finish to get latest offsets for partitions {}
in topic: {}, task {}, job {}",
- partitionOffsets, topic, taskId, jobId);
- }
- return partitionOffsets;
+ List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
+ List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+ for (InternalService.PIntegerPair pair : pairs) {
+ partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finish to get latest offsets for partitions {} in
topic: {}, task {}, job {}",
+ partitionOffsets, topic, taskId, jobId);
}
+ return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get latest offsets.", e);
throw new LoadException(
@@ -239,17 +181,7 @@ public class KafkaUtil {
return offsets;
}
- TNetworkAddress address = null;
try {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (backendIds.isEmpty()) {
- throw new LoadException("Failed to get real offsets. No alive
backends");
- }
- Collections.shuffle(backendIds);
- Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
- // create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -270,27 +202,56 @@ public class KafkaUtil {
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+ InternalService.PProxyResult result = getInfoRequest(request,
MAX_GET_OFFSET_TIMEOUT_SECOND);
- // get info
- Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result =
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- throw new UserException("failed to get real offsets: " +
result.getStatus().getErrorMsgsList());
- } else {
- List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
- List<Pair<Integer, Long>> partitionOffsets =
Lists.newArrayList();
- for (InternalService.PIntegerPair pair : pairs) {
- partitionOffsets.add(Pair.of(pair.getKey(),
pair.getVal()));
- }
- realOffsets.addAll(partitionOffsets);
- LOG.info("finish to get real offsets for partitions {} in
topic: {}", realOffsets, topic);
- return realOffsets;
+ List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
+ List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+ for (InternalService.PIntegerPair pair : pairs) {
+ partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
+ realOffsets.addAll(partitionOffsets);
+ LOG.info("finish to get real offsets for partitions {} in topic:
{}", realOffsets, topic);
+ return realOffsets;
} catch (Exception e) {
LOG.warn("failed to get real offsets.", e);
throw new LoadException(
"Failed to get real offsets of kafka topic: " + topic + ".
error: " + e.getMessage());
}
}
+
+ private static InternalService.PProxyResult
getInfoRequest(InternalService.PProxyRequest request, int timeout)
+ throws LoadException {
+ int retryTimes = 0;
+ TNetworkAddress address = null;
+ Future<InternalService.PProxyResult> future = null;
+ InternalService.PProxyResult result = null;
+ while (retryTimes < 3) {
+ List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
+ if (backendIds.isEmpty()) {
+ throw new LoadException("Failed to get info. No alive
backends");
+ }
+ Collections.shuffle(backendIds);
+ Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+ try {
+ future = BackendServiceProxy.getInstance().getInfo(address,
request);
+ result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.warn("failed to get info request to " + address + " err "
+ e.getMessage());
+ retryTimes++;
+ continue;
+ }
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ LOG.warn("failed to get info request to "
+ + address + " err " +
result.getStatus().getErrorMsgsList());
+ retryTimes++;
+ } else {
+ return result;
+ }
+ }
+
+ throw new LoadException("Failed to get info");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]