This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a3f777c86fc HDDS-13537. Client skip send request to listener OM (#8989)
a3f777c86fc is described below
commit a3f777c86fc0c12827faddac4a73e46a3106c6c8
Author: Peter Lee <[email protected]>
AuthorDate: Mon Sep 8 12:55:37 2025 +0800
HDDS-13537. Client skip send request to listener OM (#8989)
---
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 20 ++++++--
.../ozone/om/ha/GrpcOMFailoverProxyProvider.java | 2 +-
.../om/ha/HadoopRpcOMFailoverProxyProvider.java | 2 +-
.../java/org/apache/hadoop/ozone/TestOmUtils.java | 39 ++++++++++++++
.../ozone/om/ha/TestOMFailoverProxyProvider.java | 59 +++++++++++++++++-----
5 files changed, 104 insertions(+), 18 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 5c6c020d74c..24fb8b3a5b1 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -381,6 +381,22 @@ public static Collection<String>
getActiveOMNodeIds(ConfigurationSource conf,
return nodeIds;
}
+ /**
+ * Returns active OM node IDs that are not listener nodes for the given
service
+ * ID.
+ *
+ * @param conf Configuration source
+ * @param omServiceId OM service ID
+ * @return Collection of active non-listener node IDs
+ */
+ public static Collection<String> getActiveNonListenerOMNodeIds(
+ ConfigurationSource conf, String omServiceId) {
+ Collection<String> nodeIds = getActiveOMNodeIds(conf, omServiceId);
+ Collection<String> listenerNodeIds = getListenerOMNodeIds(conf,
omServiceId);
+ nodeIds.removeAll(listenerNodeIds);
+ return nodeIds;
+ }
+
/**
* Returns a collection of configured nodeId's that are to be decommissioned.
* Aggregate results from both config keys - with and without serviceId
@@ -410,10 +426,8 @@ public static Collection<String>
getListenerOMNodeIds(ConfigurationSource conf,
String omServiceId) {
String listenerNodesKey = ConfUtils.addKeySuffixes(
OZONE_OM_LISTENER_NODES_KEY, omServiceId);
- Collection<String> listenerNodeIds = conf.getTrimmedStringCollection(
+ return conf.getTrimmedStringCollection(
listenerNodesKey);
-
- return listenerNodeIds;
}
/**
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
index f7bff7237dd..0688b66911a 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -67,7 +67,7 @@ public GrpcOMFailoverProxyProvider(ConfigurationSource
configuration,
protected void loadOMClientConfigs(ConfigurationSource config, String
omSvcId)
throws IOException {
- Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+ Collection<String> omNodeIds =
OmUtils.getActiveNonListenerOMNodeIds(config, omSvcId);
Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
List<String> omNodeIDList = new ArrayList<>();
Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
index feef3b1e012..53db370d27c 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
@@ -74,7 +74,7 @@ protected void loadOMClientConfigs(ConfigurationSource
config, String omSvcId)
List<String> omNodeIDList = new ArrayList<>();
Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
- Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
+ Collection<String> omNodeIds =
OmUtils.getActiveNonListenerOMNodeIds(config,
omSvcId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
index 5ed2efded20..b08c041fd56 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
@@ -218,5 +218,44 @@ void getOmSocketAddressEmpty() {
assertEquals("0.0.0.0", addr.getHostString());
assertEquals(OMConfigKeys.OZONE_OM_PORT_DEFAULT, addr.getPort());
}
+
+ @Test
+ public void testGetListenerOMNodeIdsUnion() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ String serviceId = "om-service-test1";
+ conf.set(org.apache.hadoop.ozone.ha.ConfUtils.addKeySuffixes(
+ org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_LISTENER_NODES_KEY,
+ serviceId), "s1,s2");
+
+ java.util.Collection<String> result = OmUtils.getListenerOMNodeIds(conf,
serviceId);
+ java.util.Set<String> expected = new java.util.HashSet<>();
+ expected.add("s1");
+ expected.add("s2");
+
+ assertEquals(expected.size(), result.size());
+ assertTrue(result.containsAll(expected));
+ }
+
+ @Test
+ public void testGetActiveNonListenerOMNodeIdsFiltering() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ String serviceId = "om-service-test1";
+
+ conf.set(org.apache.hadoop.ozone.ha.ConfUtils.addKeySuffixes(
+ org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY, serviceId),
+ "n1,n2,n3");
+ conf.set(org.apache.hadoop.ozone.ha.ConfUtils.addKeySuffixes(
+ org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_LISTENER_NODES_KEY,
+ serviceId), "n2");
+
+ java.util.Collection<String> result =
OmUtils.getActiveNonListenerOMNodeIds(conf, serviceId);
+ java.util.Set<String> expected = new java.util.HashSet<>();
+ expected.add("n1");
+ expected.add("n3");
+
+ assertEquals(expected.size(), result.size());
+ assertTrue(result.containsAll(expected));
+ }
}
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index c06358cfbe8..aa1c0cb7e2d 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -22,6 +22,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
@@ -45,7 +46,7 @@ public class TestOMFailoverProxyProvider {
private static final String OM_SERVICE_ID = "om-service-test1";
private static final String NODE_ID_BASE_STR = "omNode-";
private static final String DUMMY_NODE_ADDR = "0.0.0.0:8080";
- private HadoopRpcOMFailoverProxyProvider provider;
+ private HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> provider;
private long waitBetweenRetries;
private int numNodes = 3;
private OzoneConfiguration config;
@@ -65,7 +66,7 @@ public void init() throws Exception {
}
config.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
allNodeIds.toString());
- provider = new HadoopRpcOMFailoverProxyProvider(config,
+ provider = new HadoopRpcOMFailoverProxyProvider<>(config,
UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
OzoneManagerProtocolPB.class);
}
@@ -140,6 +141,38 @@ public void
testWaitTimeResetWhenAllNodeFailoverAndSameNode() {
failoverToNextNode(1, waitBetweenRetries);
}
+ /**
+ * Ensure listener nodes are excluded from provider's proxy list.
+ */
+ @Test
+ public void testExcludesListenerNodes() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ StringJoiner allNodeIds = new StringJoiner(",");
+ for (int i = 1; i <= numNodes; i++) {
+ String nodeId = NODE_ID_BASE_STR + i;
+ conf.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID,
+ nodeId), DUMMY_NODE_ADDR);
+ allNodeIds.add(nodeId);
+ }
+ conf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
+ allNodeIds.toString());
+ // Mark one of the nodes as listener (omNode-2)
+ String listenerNode = NODE_ID_BASE_STR + 2;
+ conf.set(ConfUtils.addKeySuffixes(
+ org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_LISTENER_NODES_KEY,
+ OM_SERVICE_ID), listenerNode);
+
+ try (HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
providerWithListeners =
+ new HadoopRpcOMFailoverProxyProvider<>(conf,
+ UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
+ OzoneManagerProtocolPB.class)) {
+ // Verify listener node is not included in proxy map
+
assertTrue(providerWithListeners.getOMProxyInfoMap().containsKey(NODE_ID_BASE_STR
+ 1));
+
assertTrue(providerWithListeners.getOMProxyInfoMap().containsKey(NODE_ID_BASE_STR
+ 3));
+
assertFalse(providerWithListeners.getOMProxyInfoMap().containsKey(listenerNode));
+ }
+ }
+
/**
* Failover to next node and wait time should be same as waitTimeAfter.
*/
@@ -184,17 +217,17 @@ public void testCanonicalTokenServiceName() throws
IOException {
}
ozoneConf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
allNodeIds.toString());
- HadoopRpcOMFailoverProxyProvider prov =
- new HadoopRpcOMFailoverProxyProvider(ozoneConf,
- UserGroupInformation.getCurrentUser(),
- OM_SERVICE_ID,
- OzoneManagerProtocolPB.class);
-
- Text dtService = prov.getCurrentProxyDelegationToken();
-
- Collections.sort(nodeAddrs);
- String expectedDtService = String.join(",", nodeAddrs);
- assertEquals(expectedDtService, dtService.toString());
+ try (HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> prov =
+ new HadoopRpcOMFailoverProxyProvider<>(ozoneConf,
+ UserGroupInformation.getCurrentUser(),
+ OM_SERVICE_ID,
+ OzoneManagerProtocolPB.class)) {
+ Text dtService = prov.getCurrentProxyDelegationToken();
+
+ Collections.sort(nodeAddrs);
+ String expectedDtService = String.join(",", nodeAddrs);
+ assertEquals(expectedDtService, dtService.toString());
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]