This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new bf60fd18fa7 HDDS-14212. Make maxFailovers in GrpcOmTransport apply on 
a per-request basis. (#9546)
bf60fd18fa7 is described below

commit bf60fd18fa7deb47c2b3a64f5f4df0229ba3d2c6
Author: Aleksei Ieshin <[email protected]>
AuthorDate: Tue Jan 13 00:19:22 2026 +1100

    HDDS-14212. Make maxFailovers in GrpcOmTransport apply on a per-request 
basis. (#9546)
---
 .../common/src/main/resources/ozone-default.xml    |   6 +-
 .../ozone/om/protocolPB/GrpcOmTransport.java       |  20 +-
 .../TestGrpcOmTransportConcurrentFailover.java     | 305 +++++++++++++++++++++
 .../ozone/om/protocolPB/TestS3GrpcOmTransport.java |  42 ++-
 4 files changed, 358 insertions(+), 15 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 7783b3988a3..3b169c331fe 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3116,9 +3116,9 @@
       Expert only. Ozone RpcClient attempts talking to each OzoneManager
       ipc.client.connect.max.retries (default = 10) number of times before
       failing over to another OzoneManager, if available. This parameter
-      represents the number of times the client will failover before giving
-      up. This value is kept high so that client does not give up trying to
-      connect to OMs easily.
+      represents the number of times per request the client will failover
+      before giving up. This value is kept high so that client does not
+      give up trying to connect to OMs easily.
     </description>
   </property>
   <property>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index 75ea99f77a8..ef4ec32ddc2 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -81,12 +81,11 @@ public class GrpcOmTransport implements OmTransport {
   private ConfigurationSource conf;
 
   private AtomicReference<String> host;
-  private AtomicInteger syncFailoverCount;
+  private AtomicInteger globalFailoverCount;
   private final int maxSize;
   private SecurityConfig secConfig;
 
   private RetryPolicy retryPolicy;
-  private int failoverCount = 0;
   private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
       omFailoverProxyProvider;
 
@@ -102,9 +101,7 @@ public GrpcOmTransport(ConfigurationSource conf,
     this.clients = new HashMap<>();
     this.conf = conf;
     this.host = new AtomicReference();
-    this.failoverCount = 0;
-    this.syncFailoverCount = new AtomicInteger();
-
+    this.globalFailoverCount = new AtomicInteger();
 
     secConfig =  new SecurityConfig(conf);
     maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
@@ -175,12 +172,13 @@ public void start() throws IOException {
   @Override
   public OMResponse submitRequest(OMRequest payload) throws IOException {
     AtomicReference<OMResponse> resp = new AtomicReference<>();
+    int requestFailoverCount = 0;
     boolean tryOtherHost = true;
     int expectedFailoverCount = 0;
     ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
     while (tryOtherHost) {
       tryOtherHost = false;
-      expectedFailoverCount = syncFailoverCount.get();
+      expectedFailoverCount = globalFailoverCount.get();
       try {
         InetAddress inetAddress = InetAddress.getLocalHost();
         Context.current()
@@ -201,7 +199,7 @@ public OMResponse submitRequest(OMRequest payload) throws 
IOException {
         }
         Exception exp = new Exception(e);
         tryOtherHost = shouldRetry(unwrapException(exp),
-            expectedFailoverCount);
+            expectedFailoverCount, ++requestFailoverCount);
         if (!tryOtherHost) {
           throw new OMException(resultCode);
         }
@@ -251,11 +249,11 @@ private Exception unwrapException(Exception ex) {
     return grpcException;
   }
 
-  private boolean shouldRetry(Exception ex, int expectedFailoverCount) {
+  private boolean shouldRetry(Exception ex, int expectedFailoverCount, int 
requestFailoverCount) {
     boolean retry = false;
     RetryPolicy.RetryAction action = null;
     try {
-      action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, 
true);
+      action = retryPolicy.shouldRetry(ex, 0, requestFailoverCount, true);
       LOG.debug("grpc failover retry action {}", action.action);
       if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
         retry = false;
@@ -273,9 +271,9 @@ private boolean shouldRetry(Exception ex, int 
expectedFailoverCount) {
             }
           }
           // switch om host to current proxy OMNodeId
-          if (syncFailoverCount.get() == expectedFailoverCount) {
+          if (globalFailoverCount.get() == expectedFailoverCount) {
             omFailoverProxyProvider.performFailover(null);
-            syncFailoverCount.getAndIncrement();
+            globalFailoverCount.getAndIncrement();
           } else {
             LOG.warn("A failover has occurred since the start of current" +
                 " thread retry, NOT failover using current proxy");
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java
new file mode 100644
index 00000000000..c8890eaa49c
--- /dev/null
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_PORT_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Concurrent test for GrpcOmTransport client.
+ */
+public class TestGrpcOmTransportConcurrentFailover {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestGrpcOmTransportConcurrentFailover.class);
+
+  private static final String OM_SERVICE_ID = "om-service-test";
+  private static final String NODE_ID_BASE = "om";
+  private static final int NUM_OMS = 3;
+  private static final int BASE_PORT = 19860;
+
+  private Map<String, MockOMServer> mockServers;
+  private GrpcOmTransport transport;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    mockServers = new HashMap<>();
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    conf.setLong(OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 
250);
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10);
+    conf.set(OZONE_OM_SERVICE_IDS_KEY, OM_SERVICE_ID);
+
+    StringJoiner omNodes = new StringJoiner(",");
+
+    for (int i = 0; i < NUM_OMS; i++) {
+      String nodeId = NODE_ID_BASE + i;
+      omNodes.add(nodeId);
+
+      int port = BASE_PORT + i;
+      MockOMServer server = new MockOMServer(nodeId, port);
+      server.start();
+      mockServers.put(nodeId, server);
+
+      conf.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID, 
nodeId),
+          "localhost");
+      conf.setInt(ConfUtils.addKeySuffixes(OZONE_OM_GRPC_PORT_KEY, 
OM_SERVICE_ID, nodeId),
+          port);
+    }
+
+    conf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
+        omNodes.toString());
+
+    failover("om0", "om1", "om2");
+
+    transport = new GrpcOmTransport(conf, 
UserGroupInformation.getCurrentUser(), OM_SERVICE_ID);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (transport != null) {
+      transport.close();
+    }
+    for (MockOMServer server : mockServers.values()) {
+      server.stop();
+    }
+  }
+
+  @Test
+  public void testConcurrentFailoverTriesAllOMs() throws Exception {
+    final int numThreads = 500;
+    final int requestsPerThread = 10;
+
+    sendInitialOmRequestsBeforeFailover();
+    failover("om2", "om0", "om1");
+    runConcurrentClientRequests(numThreads, requestsPerThread);
+
+    int omsWithFailuresCount = omRequestFailoverDistributionReport();
+    int om0FailuresCount = mockServers.get("om0").getFailureCount();
+    int om2SuccessesCount = mockServers.get("om2").getSuccessCount();
+
+    assertTrue(omsWithFailuresCount >= 1,
+        "At least 1 OMs should receive failed requests during failover. Got: " 
+ omsWithFailuresCount);
+    assertTrue(om0FailuresCount > 0, "om0 should receive failed requests");
+    assertEquals(numThreads * requestsPerThread, om2SuccessesCount,
+        "All requests should eventually succeed on om2 (leader)");
+  }
+
+  private int omRequestFailoverDistributionReport() {
+    int totalRequests = 0;
+    int totalFailures = 0;
+    int totalSuccesses = 0;
+    int omsWithFailures = 0;
+
+    for (int i = 0; i < NUM_OMS; i++) {
+      String omId = NODE_ID_BASE + i;
+      MockOMServer server = mockServers.get(omId);
+      totalRequests += server.getRequestCount();
+      totalFailures += server.getFailureCount();
+      totalSuccesses += server.getSuccessCount();
+      if (server.getFailureCount() > 0) {
+        omsWithFailures++;
+      }
+    }
+
+    LOG.info("Total requests: {} (failures: {}, successes: {})", 
totalRequests, totalFailures, totalSuccesses);
+    LOG.info("OMs that received failed requests: {}/{}", omsWithFailures, 
NUM_OMS);
+
+    LOG.info("--- Failed Requests (Failover Attempts) ---");
+    for (int i = 0; i < NUM_OMS; i++) {
+      String omId = NODE_ID_BASE + i;
+      int failures = mockServers.get(omId).getFailureCount();
+      double percentage = totalFailures > 0 ? (failures * 100.0 / 
totalFailures) : 0;
+      String status = failures == 0 ? " NEVER TRIED!" : "";
+      LOG.info("  {}: {} failures ({} %){}", omId, failures, 
String.format("%.1f", percentage), status);
+    }
+
+    LOG.info("--- Successful Requests ---");
+    for (int i = 0; i < NUM_OMS; i++) {
+      String omId = NODE_ID_BASE + i;
+      int successes = mockServers.get(omId).getSuccessCount();
+      double percentage = totalSuccesses > 0 ? (successes * 100.0 / 
totalSuccesses) : 0;
+      String status = successes > 0 ? " LEADER" : "";
+      LOG.info("  {}: {} successes ({} %){}", omId, successes, 
String.format("%.1f", percentage), status);
+    }
+    return omsWithFailures;
+  }
+
+  private void failover(String leader, String follower1, String follower2) {
+    mockServers.get(leader).setAsLeader(true);
+    mockServers.get(follower1).setAsLeader(false);
+    mockServers.get(follower2).setAsLeader(false);
+  }
+
+  private void runConcurrentClientRequests(int numThreads, int 
requestsPerThread) throws InterruptedException {
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CyclicBarrier startBarrier = new CyclicBarrier(numThreads);
+    CountDownLatch completionLatch = new CountDownLatch(numThreads);
+
+    for (int threadId = 0; threadId < numThreads; threadId++) {
+      final int id = threadId;
+      executor.submit(() -> {
+        try {
+          startBarrier.await();
+
+          for (int attempt = 0; attempt < requestsPerThread; attempt++) {
+            OMRequest request = OMRequest.newBuilder()
+                .setCmdType(Type.ListVolume)
+                .setClientId("test-client")
+                .build();
+
+            try {
+              transport.submitRequest(request);
+            } catch (Exception e) {
+              LOG.error("Thread: {}, Request {} failed: {}", id, attempt + 1, 
e.getMessage());
+            }
+
+            Thread.sleep(1);
+          }
+        } catch (Exception e) {
+          LOG.error("Thread: {}, Failed: {}", id, e.getMessage());
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+    }
+
+    if (!completionLatch.await(30, TimeUnit.SECONDS)) {
+      LOG.info("Latch didn't count down before timeout");
+    }
+    executor.shutdown();
+  }
+
+  private void sendInitialOmRequestsBeforeFailover() throws IOException {
+    for (int i = 0; i < 5; i++) {
+      OMRequest request = OMRequest.newBuilder()
+          .setCmdType(Type.ListVolume)
+          .setClientId("test-client")
+          .build();
+      transport.submitRequest(request);
+    }
+  }
+
+  private static class MockOMServer {
+    private final String nodeId;
+    private final int port;
+    private final AtomicInteger requestCount = new AtomicInteger(0);
+    private final AtomicInteger successCount = new AtomicInteger(0);
+    private final AtomicInteger failureCount = new AtomicInteger(0);
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+    private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase 
serviceImpl =
+        mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
+            delegatesTo(new 
OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() {
+              @Override
+              public void submitRequest(OMRequest request, 
StreamObserver<OMResponse> responseObserver) {
+                requestCount.incrementAndGet();
+
+                if (!isLeader.get()) {
+                  failureCount.incrementAndGet();
+                  String errorMsg = 
"org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException: " +
+                      "OM:" + nodeId + " is not the leader. Suggested leader: 
om2";
+
+                  responseObserver.onError(new StatusRuntimeException(
+                      Status.INTERNAL.withDescription(errorMsg)));
+                } else {
+                  successCount.incrementAndGet();
+                  OMResponse response = OMResponse.newBuilder()
+                      .setCmdType(request.getCmdType())
+                      
.setStatus(org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK)
+                      .setMessage("Success from " + nodeId)
+                      .build();
+
+                  responseObserver.onNext(response);
+                  responseObserver.onCompleted();
+                }
+              }
+            }));
+    private Server server;
+
+    MockOMServer(String nodeId, int port) {
+      this.nodeId = nodeId;
+      this.port = port;
+    }
+
+    public void start() throws Exception {
+      server = ServerBuilder.forPort(port)
+          .addService(serviceImpl)
+          .build()
+          .start();
+    }
+
+    public void stop() throws Exception {
+      if (server != null) {
+        server.shutdown();
+        server.awaitTermination(5, TimeUnit.SECONDS);
+      }
+    }
+
+    public void setAsLeader(boolean leader) {
+      this.isLeader.set(leader);
+    }
+
+    public int getRequestCount() {
+      return requestCount.get();
+    }
+
+    public int getSuccessCount() {
+      return successCount.get();
+    }
+
+    public int getFailureCount() {
+      return failureCount.get();
+    }
+  }
+}
+
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
index d5b8723e0ef..176d9b6d03b 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -66,6 +66,8 @@ public class TestS3GrpcOmTransport {
       .build();
 
   private boolean doFailover = false;
+  private boolean completeFailover = true;
+  private int failoverCount;
 
   private OzoneConfiguration conf;
 
@@ -91,7 +93,10 @@ public void 
submitRequest(org.apache.hadoop.ozone.protocol.proto
                                               responseObserver) {
                   try {
                     if (doFailover) {
-                      doFailover = false;
+                      if (completeFailover) {
+                        doFailover = false;
+                      }
+                      failoverCount++;
                       throw createNotLeaderException();
                     } else {
                       responseObserver.onNext(omResponse);
@@ -122,6 +127,7 @@ private ServiceException createNotLeaderException() {
 
   @BeforeEach
   public void setUp() throws Exception {
+    failoverCount = 0;
     // Generate a unique in-process server name.
     serverName = InProcessServerBuilder.generateName();
 
@@ -190,6 +196,7 @@ public void testGrpcFailoverProxy() throws Exception {
 
   @Test
   public void testGrpcFailoverProxyExhaustRetry() throws Exception {
+    final int expectedFailoverCount = 1;
     ServiceListRequest req = ServiceListRequest.newBuilder().build();
 
     final OMRequest omRequest = OMRequest.newBuilder()
@@ -210,6 +217,29 @@ public void testGrpcFailoverProxyExhaustRetry() throws 
Exception {
     // max failovers
 
     assertThrows(Exception.class, () -> client.submitRequest(omRequest));
+    assertEquals(expectedFailoverCount, failoverCount);
+  }
+
+  @Test
+  public void testGrpcFailoverProxyCalculatesFailoverCountPerRequest() throws 
Exception {
+    final int maxFailoverAttempts = 2;
+    final int expectedRequest2FailoverAttemptsCount = 1;
+    doFailover = true;
+    completeFailover = false;
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 
maxFailoverAttempts);
+    conf.setLong(OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 
50);
+    client = new GrpcOmTransport(conf, ugi, omServiceId);
+    client.startClient(channel);
+
+    assertThrows(Exception.class, () -> 
client.submitRequest(arbitraryOmRequest()));
+    assertEquals(maxFailoverAttempts, failoverCount);
+
+    failoverCount = 0;
+    completeFailover = true;
+    //No exception this time
+    client.submitRequest(arbitraryOmRequest());
+
+    assertEquals(expectedRequest2FailoverAttemptsCount, failoverCount);
   }
 
   @Test
@@ -241,4 +271,14 @@ public void testGrpcFailoverExceedMaxMesgLen() throws 
Exception {
     // rather to fail.
     assertThrows(Exception.class, () -> client.submitRequest(omRequest));
   }
+
+  private static OMRequest arbitraryOmRequest() {
+    ServiceListRequest req = ServiceListRequest.newBuilder().build();
+    return OMRequest.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setVersion(CURRENT_VERSION)
+        .setClientId("test")
+        .setServiceListRequest(req)
+        .build();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to