This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bf60fd18fa7 HDDS-14212. Make maxFailovers in GrpcOmTransport apply on
a per-request basis. (#9546)
bf60fd18fa7 is described below
commit bf60fd18fa7deb47c2b3a64f5f4df0229ba3d2c6
Author: Aleksei Ieshin <[email protected]>
AuthorDate: Tue Jan 13 00:19:22 2026 +1100
HDDS-14212. Make maxFailovers in GrpcOmTransport apply on a per-request
basis. (#9546)
---
.../common/src/main/resources/ozone-default.xml | 6 +-
.../ozone/om/protocolPB/GrpcOmTransport.java | 20 +-
.../TestGrpcOmTransportConcurrentFailover.java | 305 +++++++++++++++++++++
.../ozone/om/protocolPB/TestS3GrpcOmTransport.java | 42 ++-
4 files changed, 358 insertions(+), 15 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 7783b3988a3..3b169c331fe 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3116,9 +3116,9 @@
Expert only. Ozone RpcClient attempts talking to each OzoneManager
ipc.client.connect.max.retries (default = 10) number of times before
failing over to another OzoneManager, if available. This parameter
- represents the number of times the client will failover before giving
- up. This value is kept high so that client does not give up trying to
- connect to OMs easily.
+ represents the number of times per request the client will failover
+ before giving up. This value is kept high so that client does not
+ give up trying to connect to OMs easily.
</description>
</property>
<property>
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index 75ea99f77a8..ef4ec32ddc2 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -81,12 +81,11 @@ public class GrpcOmTransport implements OmTransport {
private ConfigurationSource conf;
private AtomicReference<String> host;
- private AtomicInteger syncFailoverCount;
+ private AtomicInteger globalFailoverCount;
private final int maxSize;
private SecurityConfig secConfig;
private RetryPolicy retryPolicy;
- private int failoverCount = 0;
private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
@@ -102,9 +101,7 @@ public GrpcOmTransport(ConfigurationSource conf,
this.clients = new HashMap<>();
this.conf = conf;
this.host = new AtomicReference();
- this.failoverCount = 0;
- this.syncFailoverCount = new AtomicInteger();
-
+ this.globalFailoverCount = new AtomicInteger();
secConfig = new SecurityConfig(conf);
maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
@@ -175,12 +172,13 @@ public void start() throws IOException {
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
AtomicReference<OMResponse> resp = new AtomicReference<>();
+ int requestFailoverCount = 0;
boolean tryOtherHost = true;
int expectedFailoverCount = 0;
ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
while (tryOtherHost) {
tryOtherHost = false;
- expectedFailoverCount = syncFailoverCount.get();
+ expectedFailoverCount = globalFailoverCount.get();
try {
InetAddress inetAddress = InetAddress.getLocalHost();
Context.current()
@@ -201,7 +199,7 @@ public OMResponse submitRequest(OMRequest payload) throws
IOException {
}
Exception exp = new Exception(e);
tryOtherHost = shouldRetry(unwrapException(exp),
- expectedFailoverCount);
+ expectedFailoverCount, ++requestFailoverCount);
if (!tryOtherHost) {
throw new OMException(resultCode);
}
@@ -251,11 +249,11 @@ private Exception unwrapException(Exception ex) {
return grpcException;
}
- private boolean shouldRetry(Exception ex, int expectedFailoverCount) {
+ private boolean shouldRetry(Exception ex, int expectedFailoverCount, int
requestFailoverCount) {
boolean retry = false;
RetryPolicy.RetryAction action = null;
try {
- action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++,
true);
+ action = retryPolicy.shouldRetry(ex, 0, requestFailoverCount, true);
LOG.debug("grpc failover retry action {}", action.action);
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
retry = false;
@@ -273,9 +271,9 @@ private boolean shouldRetry(Exception ex, int
expectedFailoverCount) {
}
}
// switch om host to current proxy OMNodeId
- if (syncFailoverCount.get() == expectedFailoverCount) {
+ if (globalFailoverCount.get() == expectedFailoverCount) {
omFailoverProxyProvider.performFailover(null);
- syncFailoverCount.getAndIncrement();
+ globalFailoverCount.getAndIncrement();
} else {
LOG.warn("A failover has occurred since the start of current" +
" thread retry, NOT failover using current proxy");
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java
new file mode 100644
index 00000000000..c8890eaa49c
--- /dev/null
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_PORT_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Concurrent test for GrpcOmTransport client.
+ */
+public class TestGrpcOmTransportConcurrentFailover {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestGrpcOmTransportConcurrentFailover.class);
+
+ private static final String OM_SERVICE_ID = "om-service-test";
+ private static final String NODE_ID_BASE = "om";
+ private static final int NUM_OMS = 3;
+ private static final int BASE_PORT = 19860;
+
+ private Map<String, MockOMServer> mockServers;
+ private GrpcOmTransport transport;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ mockServers = new HashMap<>();
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ conf.setLong(OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
250);
+ conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10);
+ conf.set(OZONE_OM_SERVICE_IDS_KEY, OM_SERVICE_ID);
+
+ StringJoiner omNodes = new StringJoiner(",");
+
+ for (int i = 0; i < NUM_OMS; i++) {
+ String nodeId = NODE_ID_BASE + i;
+ omNodes.add(nodeId);
+
+ int port = BASE_PORT + i;
+ MockOMServer server = new MockOMServer(nodeId, port);
+ server.start();
+ mockServers.put(nodeId, server);
+
+ conf.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID,
nodeId),
+ "localhost");
+ conf.setInt(ConfUtils.addKeySuffixes(OZONE_OM_GRPC_PORT_KEY,
OM_SERVICE_ID, nodeId),
+ port);
+ }
+
+ conf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
+ omNodes.toString());
+
+ failover("om0", "om1", "om2");
+
+ transport = new GrpcOmTransport(conf,
UserGroupInformation.getCurrentUser(), OM_SERVICE_ID);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (transport != null) {
+ transport.close();
+ }
+ for (MockOMServer server : mockServers.values()) {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testConcurrentFailoverTriesAllOMs() throws Exception {
+ final int numThreads = 500;
+ final int requestsPerThread = 10;
+
+ sendInitialOmRequestsBeforeFailover();
+ failover("om2", "om0", "om1");
+ runConcurrentClientRequests(numThreads, requestsPerThread);
+
+ int omsWithFailuresCount = omRequestFailoverDistributionReport();
+ int om0FailuresCount = mockServers.get("om0").getFailureCount();
+ int om2SuccessesCount = mockServers.get("om2").getSuccessCount();
+
+ assertTrue(omsWithFailuresCount >= 1,
+ "At least 1 OMs should receive failed requests during failover. Got: "
+ omsWithFailuresCount);
+ assertTrue(om0FailuresCount > 0, "om0 should receive failed requests");
+ assertEquals(numThreads * requestsPerThread, om2SuccessesCount,
+ "All requests should eventually succeed on om2 (leader)");
+ }
+
+ private int omRequestFailoverDistributionReport() {
+ int totalRequests = 0;
+ int totalFailures = 0;
+ int totalSuccesses = 0;
+ int omsWithFailures = 0;
+
+ for (int i = 0; i < NUM_OMS; i++) {
+ String omId = NODE_ID_BASE + i;
+ MockOMServer server = mockServers.get(omId);
+ totalRequests += server.getRequestCount();
+ totalFailures += server.getFailureCount();
+ totalSuccesses += server.getSuccessCount();
+ if (server.getFailureCount() > 0) {
+ omsWithFailures++;
+ }
+ }
+
+ LOG.info("Total requests: {} (failures: {}, successes: {})",
totalRequests, totalFailures, totalSuccesses);
+ LOG.info("OMs that received failed requests: {}/{}", omsWithFailures,
NUM_OMS);
+
+ LOG.info("--- Failed Requests (Failover Attempts) ---");
+ for (int i = 0; i < NUM_OMS; i++) {
+ String omId = NODE_ID_BASE + i;
+ int failures = mockServers.get(omId).getFailureCount();
+ double percentage = totalFailures > 0 ? (failures * 100.0 /
totalFailures) : 0;
+ String status = failures == 0 ? " NEVER TRIED!" : "";
+ LOG.info(" {}: {} failures ({} %){}", omId, failures,
String.format("%.1f", percentage), status);
+ }
+
+ LOG.info("--- Successful Requests ---");
+ for (int i = 0; i < NUM_OMS; i++) {
+ String omId = NODE_ID_BASE + i;
+ int successes = mockServers.get(omId).getSuccessCount();
+ double percentage = totalSuccesses > 0 ? (successes * 100.0 /
totalSuccesses) : 0;
+ String status = successes > 0 ? " LEADER" : "";
+ LOG.info(" {}: {} successes ({} %){}", omId, successes,
String.format("%.1f", percentage), status);
+ }
+ return omsWithFailures;
+ }
+
+ private void failover(String leader, String follower1, String follower2) {
+ mockServers.get(leader).setAsLeader(true);
+ mockServers.get(follower1).setAsLeader(false);
+ mockServers.get(follower2).setAsLeader(false);
+ }
+
+ private void runConcurrentClientRequests(int numThreads, int
requestsPerThread) throws InterruptedException {
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ CyclicBarrier startBarrier = new CyclicBarrier(numThreads);
+ CountDownLatch completionLatch = new CountDownLatch(numThreads);
+
+ for (int threadId = 0; threadId < numThreads; threadId++) {
+ final int id = threadId;
+ executor.submit(() -> {
+ try {
+ startBarrier.await();
+
+ for (int attempt = 0; attempt < requestsPerThread; attempt++) {
+ OMRequest request = OMRequest.newBuilder()
+ .setCmdType(Type.ListVolume)
+ .setClientId("test-client")
+ .build();
+
+ try {
+ transport.submitRequest(request);
+ } catch (Exception e) {
+ LOG.error("Thread: {}, Request {} failed: {}", id, attempt + 1,
e.getMessage());
+ }
+
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ LOG.error("Thread: {}, Failed: {}", id, e.getMessage());
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ if (!completionLatch.await(30, TimeUnit.SECONDS)) {
+ LOG.info("Latch didn't count down before timeout");
+ }
+ executor.shutdown();
+ }
+
+ private void sendInitialOmRequestsBeforeFailover() throws IOException {
+ for (int i = 0; i < 5; i++) {
+ OMRequest request = OMRequest.newBuilder()
+ .setCmdType(Type.ListVolume)
+ .setClientId("test-client")
+ .build();
+ transport.submitRequest(request);
+ }
+ }
+
+ private static class MockOMServer {
+ private final String nodeId;
+ private final int port;
+ private final AtomicInteger requestCount = new AtomicInteger(0);
+ private final AtomicInteger successCount = new AtomicInteger(0);
+ private final AtomicInteger failureCount = new AtomicInteger(0);
+ private final AtomicBoolean isLeader = new AtomicBoolean(false);
+ private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase
serviceImpl =
+ mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
+ delegatesTo(new
OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() {
+ @Override
+ public void submitRequest(OMRequest request,
StreamObserver<OMResponse> responseObserver) {
+ requestCount.incrementAndGet();
+
+ if (!isLeader.get()) {
+ failureCount.incrementAndGet();
+ String errorMsg =
"org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException: " +
+ "OM:" + nodeId + " is not the leader. Suggested leader:
om2";
+
+ responseObserver.onError(new StatusRuntimeException(
+ Status.INTERNAL.withDescription(errorMsg)));
+ } else {
+ successCount.incrementAndGet();
+ OMResponse response = OMResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+
.setStatus(org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK)
+ .setMessage("Success from " + nodeId)
+ .build();
+
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
+ }
+ }));
+ private Server server;
+
+ MockOMServer(String nodeId, int port) {
+ this.nodeId = nodeId;
+ this.port = port;
+ }
+
+ public void start() throws Exception {
+ server = ServerBuilder.forPort(port)
+ .addService(serviceImpl)
+ .build()
+ .start();
+ }
+
+ public void stop() throws Exception {
+ if (server != null) {
+ server.shutdown();
+ server.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+
+ public void setAsLeader(boolean leader) {
+ this.isLeader.set(leader);
+ }
+
+ public int getRequestCount() {
+ return requestCount.get();
+ }
+
+ public int getSuccessCount() {
+ return successCount.get();
+ }
+
+ public int getFailureCount() {
+ return failureCount.get();
+ }
+ }
+}
+
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
index d5b8723e0ef..176d9b6d03b 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -66,6 +66,8 @@ public class TestS3GrpcOmTransport {
.build();
private boolean doFailover = false;
+ private boolean completeFailover = true;
+ private int failoverCount;
private OzoneConfiguration conf;
@@ -91,7 +93,10 @@ public void
submitRequest(org.apache.hadoop.ozone.protocol.proto
responseObserver) {
try {
if (doFailover) {
- doFailover = false;
+ if (completeFailover) {
+ doFailover = false;
+ }
+ failoverCount++;
throw createNotLeaderException();
} else {
responseObserver.onNext(omResponse);
@@ -122,6 +127,7 @@ private ServiceException createNotLeaderException() {
@BeforeEach
public void setUp() throws Exception {
+ failoverCount = 0;
// Generate a unique in-process server name.
serverName = InProcessServerBuilder.generateName();
@@ -190,6 +196,7 @@ public void testGrpcFailoverProxy() throws Exception {
@Test
public void testGrpcFailoverProxyExhaustRetry() throws Exception {
+ final int expectedFailoverCount = 1;
ServiceListRequest req = ServiceListRequest.newBuilder().build();
final OMRequest omRequest = OMRequest.newBuilder()
@@ -210,6 +217,29 @@ public void testGrpcFailoverProxyExhaustRetry() throws
Exception {
// max failovers
assertThrows(Exception.class, () -> client.submitRequest(omRequest));
+ assertEquals(expectedFailoverCount, failoverCount);
+ }
+
+ @Test
+ public void testGrpcFailoverProxyCalculatesFailoverCountPerRequest() throws
Exception {
+ final int maxFailoverAttempts = 2;
+ final int expectedRequest2FailoverAttemptsCount = 1;
+ doFailover = true;
+ completeFailover = false;
+ conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
maxFailoverAttempts);
+ conf.setLong(OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
50);
+ client = new GrpcOmTransport(conf, ugi, omServiceId);
+ client.startClient(channel);
+
+ assertThrows(Exception.class, () ->
client.submitRequest(arbitraryOmRequest()));
+ assertEquals(maxFailoverAttempts, failoverCount);
+
+ failoverCount = 0;
+ completeFailover = true;
+ //No exception this time
+ client.submitRequest(arbitraryOmRequest());
+
+ assertEquals(expectedRequest2FailoverAttemptsCount, failoverCount);
}
@Test
@@ -241,4 +271,14 @@ public void testGrpcFailoverExceedMaxMesgLen() throws
Exception {
// rather to fail.
assertThrows(Exception.class, () -> client.submitRequest(omRequest));
}
+
+ private static OMRequest arbitraryOmRequest() {
+ ServiceListRequest req = ServiceListRequest.newBuilder().build();
+ return OMRequest.newBuilder()
+ .setCmdType(Type.ServiceList)
+ .setVersion(CURRENT_VERSION)
+ .setClientId("test")
+ .setServiceListRequest(req)
+ .build();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]