This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a3f777c86fc HDDS-13537. Client skip send request to listener OM (#8989)
a3f777c86fc is described below

commit a3f777c86fc0c12827faddac4a73e46a3106c6c8
Author: Peter Lee <[email protected]>
AuthorDate: Mon Sep 8 12:55:37 2025 +0800

    HDDS-13537. Client skip send request to listener OM (#8989)
---
 .../main/java/org/apache/hadoop/ozone/OmUtils.java | 20 ++++++--
 .../ozone/om/ha/GrpcOMFailoverProxyProvider.java   |  2 +-
 .../om/ha/HadoopRpcOMFailoverProxyProvider.java    |  2 +-
 .../java/org/apache/hadoop/ozone/TestOmUtils.java  | 39 ++++++++++++++
 .../ozone/om/ha/TestOMFailoverProxyProvider.java   | 59 +++++++++++++++++-----
 5 files changed, 104 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 5c6c020d74c..24fb8b3a5b1 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -381,6 +381,22 @@ public static Collection<String> 
getActiveOMNodeIds(ConfigurationSource conf,
     return nodeIds;
   }
 
+  /**
+   * Returns active OM node IDs that are not listener nodes for the given 
service
+   * ID.
+   *
+   * @param conf        Configuration source
+   * @param omServiceId OM service ID
+   * @return Collection of active non-listener node IDs
+   */
+  public static Collection<String> getActiveNonListenerOMNodeIds(
+      ConfigurationSource conf, String omServiceId) {
+    Collection<String> nodeIds = getActiveOMNodeIds(conf, omServiceId);
+    Collection<String> listenerNodeIds = getListenerOMNodeIds(conf, 
omServiceId);
+    nodeIds.removeAll(listenerNodeIds);
+    return nodeIds;
+  }
+
   /**
    * Returns a collection of configured nodeId's that are to be decommissioned.
    * Aggregate results from both config keys - with and without serviceId
@@ -410,10 +426,8 @@ public static Collection<String> 
getListenerOMNodeIds(ConfigurationSource conf,
       String omServiceId) {
     String listenerNodesKey = ConfUtils.addKeySuffixes(
         OZONE_OM_LISTENER_NODES_KEY, omServiceId);
-    Collection<String> listenerNodeIds = conf.getTrimmedStringCollection(
+    return conf.getTrimmedStringCollection(
         listenerNodesKey);
-
-    return listenerNodeIds;
   }
 
   /**
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
index f7bff7237dd..0688b66911a 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -67,7 +67,7 @@ public GrpcOMFailoverProxyProvider(ConfigurationSource 
configuration,
   protected void loadOMClientConfigs(ConfigurationSource config, String 
omSvcId)
       throws IOException {
 
-    Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+    Collection<String> omNodeIds = 
OmUtils.getActiveNonListenerOMNodeIds(config, omSvcId);
     Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
     List<String> omNodeIDList = new ArrayList<>();
     Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
index feef3b1e012..53db370d27c 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
@@ -74,7 +74,7 @@ protected void loadOMClientConfigs(ConfigurationSource 
config, String omSvcId)
     List<String> omNodeIDList = new ArrayList<>();
     Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
 
-    Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
+    Collection<String> omNodeIds = 
OmUtils.getActiveNonListenerOMNodeIds(config,
         omSvcId);
 
     for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
index 5ed2efded20..b08c041fd56 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
@@ -218,5 +218,44 @@ void getOmSocketAddressEmpty() {
     assertEquals("0.0.0.0", addr.getHostString());
     assertEquals(OMConfigKeys.OZONE_OM_PORT_DEFAULT, addr.getPort());
   }
+
+  @Test
+  public void testGetListenerOMNodeIdsUnion() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    String serviceId = "om-service-test1";
+    conf.set(org.apache.hadoop.ozone.ha.ConfUtils.addKeySuffixes(
+        org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_LISTENER_NODES_KEY,
+        serviceId), "s1,s2");
+
+    java.util.Collection<String> result = OmUtils.getListenerOMNodeIds(conf, 
serviceId);
+    java.util.Set<String> expected = new java.util.HashSet<>();
+    expected.add("s1");
+    expected.add("s2");
+
+    assertEquals(expected.size(), result.size());
+    assertTrue(result.containsAll(expected));
+  }
+
+  @Test
+  public void testGetActiveNonListenerOMNodeIdsFiltering() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String serviceId = "om-service-test1";
+
+    conf.set(org.apache.hadoop.ozone.ha.ConfUtils.addKeySuffixes(
+        org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY, serviceId),
+        "n1,n2,n3");
+    conf.set(org.apache.hadoop.ozone.ha.ConfUtils.addKeySuffixes(
+        org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_LISTENER_NODES_KEY,
+        serviceId), "n2");
+
+    java.util.Collection<String> result = 
OmUtils.getActiveNonListenerOMNodeIds(conf, serviceId);
+    java.util.Set<String> expected = new java.util.HashSet<>();
+    expected.add("n1");
+    expected.add("n3");
+
+    assertEquals(expected.size(), result.size());
+    assertTrue(result.containsAll(expected));
+  }
 }
 
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index c06358cfbe8..aa1c0cb7e2d 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -22,6 +22,7 @@
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
@@ -45,7 +46,7 @@ public class TestOMFailoverProxyProvider {
   private static final String OM_SERVICE_ID = "om-service-test1";
   private static final String NODE_ID_BASE_STR = "omNode-";
   private static final String DUMMY_NODE_ADDR = "0.0.0.0:8080";
-  private HadoopRpcOMFailoverProxyProvider provider;
+  private HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> provider;
   private long waitBetweenRetries;
   private int numNodes = 3;
   private OzoneConfiguration config;
@@ -65,7 +66,7 @@ public void init() throws Exception {
     }
     config.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
         allNodeIds.toString());
-    provider = new HadoopRpcOMFailoverProxyProvider(config,
+    provider = new HadoopRpcOMFailoverProxyProvider<>(config,
         UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
         OzoneManagerProtocolPB.class);
   }
@@ -140,6 +141,38 @@ public void 
testWaitTimeResetWhenAllNodeFailoverAndSameNode() {
     failoverToNextNode(1, waitBetweenRetries);
   }
 
+  /**
+   * Ensure listener nodes are excluded from provider's proxy list.
+   */
+  @Test
+  public void testExcludesListenerNodes() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    StringJoiner allNodeIds = new StringJoiner(",");
+    for (int i = 1; i <= numNodes; i++) {
+      String nodeId = NODE_ID_BASE_STR + i;
+      conf.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID,
+          nodeId), DUMMY_NODE_ADDR);
+      allNodeIds.add(nodeId);
+    }
+    conf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
+        allNodeIds.toString());
+    // Mark one of the nodes as listener (omNode-2)
+    String listenerNode = NODE_ID_BASE_STR + 2;
+    conf.set(ConfUtils.addKeySuffixes(
+        org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_LISTENER_NODES_KEY,
+        OM_SERVICE_ID), listenerNode);
+
+    try (HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
providerWithListeners =
+             new HadoopRpcOMFailoverProxyProvider<>(conf,
+                 UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
+                 OzoneManagerProtocolPB.class)) {
+      // Verify listener node is not included in proxy map
+      
assertTrue(providerWithListeners.getOMProxyInfoMap().containsKey(NODE_ID_BASE_STR
 + 1));
+      
assertTrue(providerWithListeners.getOMProxyInfoMap().containsKey(NODE_ID_BASE_STR
 + 3));
+      
assertFalse(providerWithListeners.getOMProxyInfoMap().containsKey(listenerNode));
+    }
+  }
+
   /**
    * Failover to next node and wait time should be same as waitTimeAfter.
    */
@@ -184,17 +217,17 @@ public void testCanonicalTokenServiceName() throws 
IOException {
     }
     ozoneConf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
         allNodeIds.toString());
-    HadoopRpcOMFailoverProxyProvider prov =
-        new HadoopRpcOMFailoverProxyProvider(ozoneConf,
-            UserGroupInformation.getCurrentUser(),
-            OM_SERVICE_ID,
-            OzoneManagerProtocolPB.class);
-
-    Text dtService = prov.getCurrentProxyDelegationToken();
-
-    Collections.sort(nodeAddrs);
-    String expectedDtService = String.join(",", nodeAddrs);
-    assertEquals(expectedDtService, dtService.toString());
+    try (HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> prov =
+             new HadoopRpcOMFailoverProxyProvider<>(ozoneConf,
+                 UserGroupInformation.getCurrentUser(),
+                 OM_SERVICE_ID,
+                 OzoneManagerProtocolPB.class)) {
+      Text dtService = prov.getCurrentProxyDelegationToken();
+
+      Collections.sort(nodeAddrs);
+      String expectedDtService = String.join(",", nodeAddrs);
+      assertEquals(expectedDtService, dtService.toString());
+    }
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to