This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 25141bd4d98 HDDS-14336. Remove OM Proxy duplication in
HadoopRpcOMFailoverProxyProvider (#9579)
25141bd4d98 is described below
commit 25141bd4d98d97c2a746a09003718011a8ca4b4b
Author: Ivan Andika <[email protected]>
AuthorDate: Tue Jan 13 04:31:12 2026 +0800
HDDS-14336. Remove OM Proxy duplication in HadoopRpcOMFailoverProxyProvider
(#9579)
---
.../ozone/om/ha/GrpcOMFailoverProxyProvider.java | 34 +++---
.../om/ha/HadoopRpcOMFailoverProxyProvider.java | 91 ++++++----------
.../ozone/om/ha/OMFailoverProxyProviderBase.java | 115 ++++++++++++---------
.../org/apache/hadoop/ozone/om/ha/OMProxyInfo.java | 38 ++++---
.../ozone/om/protocolPB/GrpcOmTransport.java | 17 ++-
.../ozone/om/ha/TestOMFailoverProxyProvider.java | 6 +-
.../ozone/client/rpc/OzoneRpcClientTests.java | 5 +-
.../ozone/om/TestOzoneManagerHAWithAllRunning.java | 10 +-
.../hadoop/ozone/om/failover/TestOMFailovers.java | 33 +++---
9 files changed, 167 insertions(+), 182 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
index 0688b66911a..3592367720e 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -26,6 +26,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,7 +35,6 @@
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ha.ConfUtils;
@@ -64,18 +64,17 @@ public GrpcOMFailoverProxyProvider(ConfigurationSource
configuration,
}
@Override
- protected void loadOMClientConfigs(ConfigurationSource config, String
omSvcId)
+ protected void initOmProxiesFromConfigs(ConfigurationSource config, String
omSvcId)
throws IOException {
Collection<String> omNodeIds =
OmUtils.getActiveNonListenerOMNodeIds(config, omSvcId);
- Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
+ Map<String, OMProxyInfo<T>> omProxies = new HashMap<>();
List<String> omNodeIDList = new ArrayList<>();
- Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
omSvcId, nodeId);
- Optional<String> hostaddr = getHostNameFromConfigKeys(config,
+ Optional<String> hostAddr = getHostNameFromConfigKeys(config,
rpcAddrKey);
OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
@@ -84,17 +83,13 @@ protected void loadOMClientConfigs(ConfigurationSource
config, String omSvcId)
if (nodeId == null) {
nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
}
- if (hostaddr.isPresent()) {
- int port = hostport.orElse(config
- .getObject(GrpcOmTransport
- .GrpcOmTransportConfig.class)
- .getPort());
- ProxyInfo<T> proxyInfo =
- new ProxyInfo<>(createOMProxy(),
- hostaddr.get() + ":" + port);
+ if (hostAddr.isPresent()) {
+ int port = hostport
+
.orElse(config.getObject(GrpcOmTransport.GrpcOmTransportConfig.class).getPort());
+ String rpcAddrStr = hostAddr.get() + ":" + port;
+ OMProxyInfo<T> proxyInfo =
+ new OMProxyInfo<>(createOMProxy(), omSvcId, nodeId, rpcAddrStr,
rpcAddrStr);
omProxies.put(nodeId, proxyInfo);
- omNodeAddressMap.put(nodeId,
- NetUtils.createSocketAddr(proxyInfo.proxyInfo));
} else {
LOG.error("expected host address not defined for: {}", rpcAddrKey);
throw new ConfigurationException(rpcAddrKey + "is not defined");
@@ -107,9 +102,8 @@ protected void loadOMClientConfigs(ConfigurationSource
config, String omSvcId)
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
- setOmProxies(omProxies);
- setOmNodeIDList(omNodeIDList);
- setOmNodeAddressMap(omNodeAddressMap);
+ Collections.shuffle(omNodeIDList);
+ initOmProxies(omProxies, omNodeIDList);
}
private T createOMProxy() throws IOException {
@@ -149,7 +143,7 @@ public synchronized void close() throws IOException { }
// need to throw if nodeID not in omAddresses
public String getGrpcProxyAddress(String nodeId) throws IOException {
- Map<String, ProxyInfo<T>> omProxies = getOMProxyMap();
+ Map<String, OMProxyInfo<T>> omProxies = getOMProxyMap();
if (omProxies.containsKey(nodeId)) {
return omProxies.get(nodeId).proxyInfo;
} else {
@@ -161,6 +155,6 @@ public String getGrpcProxyAddress(String nodeId) throws
IOException {
}
public List<String> getGrpcOmNodeIDList() {
- return getOmNodeIDList();
+ return getOmNodesInOrder();
}
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
index 78a07e23b3a..7d5e40e9f8d 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
@@ -19,9 +19,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.Closeable;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -50,7 +49,6 @@ public class HadoopRpcOMFailoverProxyProvider<T> extends
LoggerFactory.getLogger(HadoopRpcOMFailoverProxyProvider.class);
private final Text delegationTokenService;
- private Map<String, OMProxyInfo> omProxyInfos;
// HadoopRpcOMFailoverProxyProvider, on encountering certain exception,
// tries each OM once in a round robin fashion. After that it waits
@@ -67,12 +65,11 @@ public HadoopRpcOMFailoverProxyProvider(ConfigurationSource
configuration,
}
@Override
- protected void loadOMClientConfigs(ConfigurationSource config, String
omSvcId)
+ protected void initOmProxiesFromConfigs(ConfigurationSource config, String
omSvcId)
throws IOException {
- Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
- this.omProxyInfos = new HashMap<>();
+ Map<String, OMProxyInfo<T>> omProxies = new HashMap<>();
+
List<String> omNodeIDList = new ArrayList<>();
- Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
Collection<String> omNodeIds =
OmUtils.getActiveNonListenerOMNodeIds(config,
omSvcId);
@@ -86,8 +83,8 @@ protected void loadOMClientConfigs(ConfigurationSource
config, String omSvcId)
continue;
}
- OMProxyInfo omProxyInfo = new OMProxyInfo(omSvcId, nodeId,
- rpcAddrStr);
+ // ProxyInfo.proxy will be set during first time call to server.
+ OMProxyInfo<T> omProxyInfo = new OMProxyInfo<>(omSvcId, nodeId,
rpcAddrStr);
if (omProxyInfo.getAddress() != null) {
// For a non-HA OM setup, nodeId might be null. If so, we assign it
@@ -95,11 +92,8 @@ protected void loadOMClientConfigs(ConfigurationSource
config, String omSvcId)
if (nodeId == null) {
nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
}
- // ProxyInfo will be set during first time call to server.
- omProxies.put(nodeId, null);
- omProxyInfos.put(nodeId, omProxyInfo);
+ omProxies.put(nodeId, omProxyInfo);
omNodeIDList.add(nodeId);
- omNodeAddressMap.put(nodeId, omProxyInfo.getAddress());
} else {
LOG.error("Failed to create OM proxy for {} at address {}",
nodeId, rpcAddrStr);
@@ -111,9 +105,8 @@ protected void loadOMClientConfigs(ConfigurationSource
config, String omSvcId)
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
- setOmProxies(omProxies);
- setOmNodeIDList(omNodeIDList);
- setOmNodeAddressMap(omNodeAddressMap);
+ Collections.shuffle(omNodeIDList);
+ initOmProxies(omProxies, omNodeIDList);
}
/**
@@ -123,31 +116,24 @@ protected void loadOMClientConfigs(ConfigurationSource
config, String omSvcId)
*/
@Override
public synchronized ProxyInfo<T> getProxy() {
- ProxyInfo currentProxyInfo =
getOMProxyMap().get(getCurrentProxyOMNodeId());
- if (currentProxyInfo == null) {
- currentProxyInfo = createOMProxy(getCurrentProxyOMNodeId());
- }
- return currentProxyInfo;
+ OMProxyInfo<T> current = getOMProxyMap().get(getCurrentProxyOMNodeId());
+ return createOMProxyIfNeeded(current);
}
/**
* Creates proxy object.
*/
- protected ProxyInfo createOMProxy(String nodeId) {
- OMProxyInfo omProxyInfo = omProxyInfos.get(nodeId);
- InetSocketAddress address = omProxyInfo.getAddress();
- ProxyInfo proxyInfo;
- try {
- T proxy = createOMProxy(address);
- // Create proxyInfo here, to make it work with all Hadoop versions.
- proxyInfo = new ProxyInfo<>(proxy, omProxyInfo.toString());
- getOMProxyMap().put(nodeId, proxyInfo);
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to OM at {}",
- this.getClass().getSimpleName(), address, ioe);
- throw new RuntimeException(ioe);
+ protected ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
+ if (omProxyInfo.proxy == null) {
+ try {
+ omProxyInfo.proxy = createOMProxy(omProxyInfo.getAddress());
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to OM at {}",
+ this.getClass().getSimpleName(), omProxyInfo.getAddress(), ioe);
+ throw new RuntimeException(ioe);
+ }
}
- return proxyInfo;
+ return omProxyInfo;
}
public Text getCurrentProxyDelegationToken() {
@@ -158,8 +144,8 @@ protected Text computeDelegationTokenService() {
// For HA, this will return "," separated address of all OM's.
List<String> addresses = new ArrayList<>();
- for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
- omProxyInfos.entrySet()) {
+ for (Map.Entry<String, OMProxyInfo<T>> omProxyInfoSet :
+ getOMProxyMap().entrySet()) {
Text dtService = omProxyInfoSet.getValue().getDelegationTokenService();
// During client object creation when one of the OM configured address
@@ -186,33 +172,14 @@ protected Text computeDelegationTokenService() {
@Override
public synchronized void close() throws IOException {
for (ProxyInfo<T> proxyInfo : getOMProxies()) {
- if (proxyInfo != null) {
- RPC.stopProxy(proxyInfo.proxy);
+ if (proxyInfo.proxy != null) {
+ if (proxyInfo.proxy instanceof Closeable) {
+ ((Closeable)proxyInfo.proxy).close();
+ } else {
+ RPC.stopProxy(proxyInfo.proxy);
+ }
}
}
}
-
- @VisibleForTesting
- public List<OMProxyInfo> getOMProxyInfos() {
- return new ArrayList<OMProxyInfo>(omProxyInfos.values());
- }
-
- @VisibleForTesting
- public Map<String, OMProxyInfo> getOMProxyInfoMap() {
- return omProxyInfos;
- }
-
- @VisibleForTesting
- protected void setProxiesForTesting(
- Map<String, ProxyInfo<T>> setOMProxies,
- Map<String, OMProxyInfo> setOMProxyInfos,
- List<String> setOMNodeIDList,
- Map<String, InetSocketAddress> setOMNodeAddress) {
- setOmProxies(setOMProxies);
- this.omProxyInfos = setOMProxyInfos;
- setOmNodeIDList(setOMNodeIDList);
- setOmNodeAddressMap(setOMNodeAddress);
- }
-
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
index d58bbaf02e4..04cc40a6b0f 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,11 +70,25 @@ public abstract class OMFailoverProxyProviderBase<T>
implements
private final ConfigurationSource conf;
private final Class<T> protocolClass;
- // Map of OMNodeID to its proxy
- private Map<String, ProxyInfo<T>> omProxies;
- private List<String> omNodeIDList;
- private Map<String, InetSocketAddress> omNodeAddressMap;
-
+ // omProxies: Map of OMNodeID to its proxy
+ // omNodesInOrder: List that specifies the ordering of OM nodes that
+ // is used to determine the next OM node proxy to retrieve
+ // from omProxies.
+ // Invariants:
+ // 1. size(omProxies) == size(omNodesInOrder)
+ // 2. set(omProxies.keySet) == set(omNodesInOrder)
+ private Map<String, OMProxyInfo<T>> omProxies;
+ private List<String> omNodesInOrder;
+
+ // These are used to identify the current and next OM node
+ // OMNodeId is used to retrieve proxy from omProxies
+ // ProxyIndex is used to find the node ID from omNodesInOrder
+ // Invariants:
+ // 1. omProxies[currentProxyOMNodeId] = omNodesInOrder[currentProxyIndex]
+ // 2. omProxies[nextProxyOMNodeId] = omNodesInOrder[nextProxyIndex]
+ // Note that these fields need to be modified atomically (e.g. using
synchronized)
+ // Specifically (currentProxyOMNodeId, currentProxyNodeIndex) and
(nextProxyOMNodeId, nextProxyIndex)
+ // should be atomically updated
private String currentProxyOMNodeId;
private int currentProxyIndex;
private String nextProxyOMNodeId;
@@ -84,11 +99,11 @@ public abstract class OMFailoverProxyProviderBase<T>
implements
// before attempting to contact all the OMs again. For other exceptions
// such as LeaderNotReadyException, the same OM is contacted again with a
// linearly increasing wait time.
- private Set<String> attemptedOMs = new HashSet<>();
+ private final Set<String> attemptedOMs = new HashSet<>();
private String lastAttemptedOM;
private int numAttemptsOnSameOM = 0;
private final long waitBetweenRetries;
- private Set<String> accessControlExceptionOMs = new HashSet<>();
+ private final Set<String> accessControlExceptionOMs = new HashSet<>();
private boolean performFailoverDone;
private final UserGroupInformation ugi;
@@ -106,19 +121,23 @@ public OMFailoverProxyProviderBase(ConfigurationSource
configuration,
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
- loadOMClientConfigs(conf, omServiceId);
- Objects.requireNonNull(omProxies, "omProxies == null");
- Objects.requireNonNull(omNodeIDList, "omNodeIDList == null");
- Objects.requireNonNull(omNodeAddressMap, "omNodeAddressMap == null");
+ initOmProxiesFromConfigs(conf, omServiceId);
nextProxyIndex = 0;
- nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
+ nextProxyOMNodeId = omNodesInOrder.get(nextProxyIndex);
currentProxyIndex = 0;
currentProxyOMNodeId = nextProxyOMNodeId;
}
- protected abstract void loadOMClientConfigs(ConfigurationSource config,
- String omSvcId)
+ /**
+ * Initialize the OM proxies from the configuration and the OM service ID.
+ * The implementation initialize the OM proxies and ordered OM node ID list
+ * through {@link #initOmProxies(Map, List)}
+ * @param config configuration containing OM node information
+ * @param omSvcId OM service ID
+ * @throws IOException if any exception occurs while trying to initialize
the proxy.
+ */
+ protected abstract void initOmProxiesFromConfigs(ConfigurationSource config,
String omSvcId)
throws IOException;
/**
@@ -137,7 +156,7 @@ protected T createOMProxy(InetSocketAddress omAddress)
throws IOException {
// Ensure we do not attempt retry on the same OM in case of exceptions
RetryPolicy connectionRetryPolicy =
RetryPolicies.failoverOnNetworkException(0);
- return (T) RPC.getProtocolProxy(
+ return RPC.getProtocolProxy(
getInterface(),
RPC.getProtocolVersion(protocolClass),
omAddress,
@@ -160,9 +179,7 @@ protected synchronized boolean shouldFailover(Exception ex)
{
return false;
} else {
accessControlExceptionOMs.add(nextProxyOMNodeId);
- if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
- return false;
- }
+ return !accessControlExceptionOMs.containsAll(omNodesInOrder);
}
} else if (HddsUtils.shouldNotFailoverOnRpcException(unwrappedException)) {
return false;
@@ -220,8 +237,8 @@ public RetryAction shouldRetry(Exception exception, int
retries,
notLeaderException.getSuggestedLeaderNodeId();
if (suggestedLeaderAddress != null &&
suggestedNodeId != null &&
- omNodeAddressMap.containsKey(suggestedNodeId) &&
- omNodeAddressMap.get(suggestedNodeId).toString()
+ omProxies.containsKey(suggestedNodeId) &&
+ omProxies.get(suggestedNodeId).getRpcAddr().toString()
.equals(suggestedLeaderAddress)) {
setNextOmProxy(suggestedNodeId);
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY,
@@ -259,7 +276,7 @@ private RetryAction getRetryAction(RetryDecision
fallbackAction,
return new RetryAction(fallbackAction, getWaitTime());
} else {
LOG.error("Failed to connect to OMs: {}. Attempted {} failovers.",
- omNodeIDList, maxFailovers);
+ omNodesInOrder, maxFailovers);
return RetryAction.FAIL;
}
}
@@ -318,7 +335,7 @@ public synchronized void selectNextOmProxy() {
int newProxyIndex = incrementNextProxyIndex();
if (LOG.isDebugEnabled()) {
LOG.debug("Incrementing OM proxy index to {}, nodeId: {}",
- newProxyIndex, omNodeIDList.get(newProxyIndex));
+ newProxyIndex, omNodesInOrder.get(newProxyIndex));
}
}
}
@@ -334,7 +351,7 @@ private synchronized int incrementNextProxyIndex() {
attemptedOMs.add(nextProxyOMNodeId);
nextProxyIndex = (nextProxyIndex + 1) % omProxies.size();
- nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
+ nextProxyOMNodeId = omNodesInOrder.get(nextProxyIndex);
return nextProxyIndex;
}
@@ -348,7 +365,7 @@ private synchronized boolean updateLeaderOMNodeId(String
newLeaderOMNodeId) {
if (omProxies.containsKey(newLeaderOMNodeId)) {
lastAttemptedOM = nextProxyOMNodeId;
nextProxyOMNodeId = newLeaderOMNodeId;
- nextProxyIndex = omNodeIDList.indexOf(nextProxyOMNodeId);
+ nextProxyIndex = omNodesInOrder.indexOf(nextProxyOMNodeId);
return true;
}
} else {
@@ -391,19 +408,20 @@ public synchronized long getWaitTime() {
return waitBetweenRetries;
}
- public List<ProxyInfo> getOMProxies() {
- return new ArrayList<ProxyInfo>(omProxies.values());
+ public List<OMProxyInfo<T>> getOMProxies() {
+ return new ArrayList<>(omProxies.values());
}
- public Map<String, ProxyInfo<T>> getOMProxyMap() {
+ public Map<String, OMProxyInfo<T>> getOMProxyMap() {
return omProxies;
}
/**
- * Check if exception is OMLeaderNotReadyException.
+ * Unwrap the exception and return the wrapped OMLeaderNotReadyException if
any.
*
- * @param exception
- * @return OMLeaderNotReadyException
+ * @param exception exception to unwrap.
+ * @return the unwrapped OMLeaderNotReadyException or null if the wrapped
+ * exception is not OMLeaderNotReadyException.
*/
public static OMLeaderNotReadyException getLeaderNotReadyException(
Exception exception) {
@@ -419,9 +437,11 @@ public static OMLeaderNotReadyException
getLeaderNotReadyException(
}
/**
- * Check if exception is a OMNotLeaderException.
+ * Unwrap the exception and return the wrapped OMNotLeaderException if any.
*
- * @return OMNotLeaderException.
+ * @param exception exception to unwrap.
+ * @return the unwrapped OMNotLeaderException or null if the wrapped
+ * exception is not OMNotLeaderException.
*/
public static OMNotLeaderException getNotLeaderException(
Exception exception) {
@@ -440,23 +460,24 @@ protected ConfigurationSource getConf() {
return conf;
}
- protected synchronized void setOmProxies(Map<String,
- ProxyInfo<T>> omProxies) {
- this.omProxies = omProxies;
- }
-
- protected synchronized void setOmNodeIDList(List<String> omNodeIDList) {
- Collections.shuffle(omNodeIDList);
- this.omNodeIDList = Collections.unmodifiableList(omNodeIDList);
- }
-
- protected synchronized List<String> getOmNodeIDList() {
- return omNodeIDList;
+ /**
+ * Initialize the OM proxy map and the OM nodes ordering.
+ * @param omProxyMap OM Node ID to OMProxyInfo map
+ * @param omNodesInOrderList Ordered list of OM Node ID to define failover
ordering.
+ */
+ protected synchronized void initOmProxies(Map<String, OMProxyInfo<T>>
omProxyMap, List<String> omNodesInOrderList) {
+ Objects.requireNonNull(omProxyMap, "omProxyMap == null");
+ Objects.requireNonNull(omProxyMap, "omNodesInOrderList == null");
+ Preconditions.assertSame(omProxyMap.size(), omNodesInOrderList.size(),
+ "omProxyMap and omNodesInOrderList should have the same size");
+ Preconditions.assertEquals(omProxyMap.keySet(), new
HashSet<>(omNodesInOrderList),
+ "the OM node IDs of omProxies keys should be the same as
omNodesInOrder");
+ this.omProxies = omProxyMap;
+ this.omNodesInOrder = Collections.unmodifiableList(omNodesInOrderList);
}
- protected synchronized void setOmNodeAddressMap(
- Map<String, InetSocketAddress> map) {
- this.omNodeAddressMap = map;
+ protected synchronized List<String> getOmNodesInOrder() {
+ return omNodesInOrder;
}
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java
index 8ea1749db9a..e9c0883b4b9 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java
@@ -19,6 +19,7 @@
import java.net.InetSocketAddress;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.slf4j.Logger;
@@ -27,16 +28,17 @@
/**
* Class to store OM proxy information.
*/
-public class OMProxyInfo {
- private String nodeId;
- private String rpcAddrStr;
- private InetSocketAddress rpcAddr;
- private Text dtService;
+public class OMProxyInfo<T> extends ProxyInfo<T> {
+ private final String nodeId;
+ private final String rpcAddrStr;
+ private final InetSocketAddress rpcAddr;
+ private final Text dtService;
private static final Logger LOG =
LoggerFactory.getLogger(OMProxyInfo.class);
- OMProxyInfo(String serviceID, String nodeID, String rpcAddress) {
+ public OMProxyInfo(T proxy, String serviceID, String nodeID, String
rpcAddress, String proxyInfo) {
+ super(proxy, proxyInfo);
this.nodeId = nodeID;
this.rpcAddrStr = rpcAddress;
this.rpcAddr = NetUtils.createSocketAddr(rpcAddrStr);
@@ -47,24 +49,28 @@ public class OMProxyInfo {
rpcAddress, serviceID, nodeId);
this.dtService = null;
} else {
-
// This issue will be a problem with docker/kubernetes world where one of
// the container is killed, and that OM address will be unresolved.
// For now skip the unresolved OM address setting it to the token
// service field.
-
this.dtService = SecurityUtil.buildTokenService(rpcAddr);
}
}
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder()
- .append("nodeId=")
- .append(nodeId)
- .append(",nodeAddress=")
- .append(rpcAddrStr);
- return sb.toString();
+ public OMProxyInfo(String serviceID, String nodeID, String rpcAddress) {
+ this(null, serviceID, nodeID, rpcAddress, "nodeId=" + nodeID +
",nodeAddress=" + rpcAddress);
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getRpcAddrStr() {
+ return rpcAddrStr;
+ }
+
+ public InetSocketAddress getRpcAddr() {
+ return rpcAddr;
}
public InetSocketAddress getAddress() {
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index ef4ec32ddc2..a7f6953f01a 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -75,18 +75,18 @@ public class GrpcOmTransport implements OmTransport {
// gRPC specific
private static List<X509Certificate> caCerts = null;
- private Map<String,
+ private final Map<String,
OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients;
- private Map<String, ManagedChannel> channels;
- private ConfigurationSource conf;
+ private final Map<String, ManagedChannel> channels;
+ private final ConfigurationSource conf;
- private AtomicReference<String> host;
+ private final AtomicReference<String> host;
private AtomicInteger globalFailoverCount;
private final int maxSize;
- private SecurityConfig secConfig;
+ private final SecurityConfig secConfig;
private RetryPolicy retryPolicy;
- private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
+ private final GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
public static void setCaCerts(List<X509Certificate> x509Certificates) {
@@ -100,14 +100,14 @@ public GrpcOmTransport(ConfigurationSource conf,
this.channels = new HashMap<>();
this.clients = new HashMap<>();
this.conf = conf;
- this.host = new AtomicReference();
+ this.host = new AtomicReference<>();
this.globalFailoverCount = new AtomicInteger();
secConfig = new SecurityConfig(conf);
maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
- omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
+ omFailoverProxyProvider = new GrpcOMFailoverProxyProvider<>(
conf,
ugi,
omServiceId,
@@ -256,7 +256,6 @@ private boolean shouldRetry(Exception ex, int
expectedFailoverCount, int request
action = retryPolicy.shouldRetry(ex, 0, requestFailoverCount, true);
LOG.debug("grpc failover retry action {}", action.action);
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
- retry = false;
LOG.error("Retry request failed. Action : {}, {}",
action.action, ex.toString());
} else {
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index aa1c0cb7e2d..dc6b0a331ea 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -167,9 +167,9 @@ public void testExcludesListenerNodes() throws Exception {
UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
OzoneManagerProtocolPB.class)) {
// Verify listener node is not included in proxy map
-
assertTrue(providerWithListeners.getOMProxyInfoMap().containsKey(NODE_ID_BASE_STR
+ 1));
-
assertTrue(providerWithListeners.getOMProxyInfoMap().containsKey(NODE_ID_BASE_STR
+ 3));
-
assertFalse(providerWithListeners.getOMProxyInfoMap().containsKey(listenerNode));
+
assertTrue(providerWithListeners.getOMProxyMap().containsKey(NODE_ID_BASE_STR +
1));
+
assertTrue(providerWithListeners.getOMProxyMap().containsKey(NODE_ID_BASE_STR +
3));
+
assertFalse(providerWithListeners.getOMProxyMap().containsKey(listenerNode));
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
index 2832a281ebb..3bc30fa0c42 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
@@ -131,6 +131,7 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -327,13 +328,13 @@ public void testOMClientProxyProvider() {
HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
- List<OMProxyInfo> omProxies = omFailoverProxyProvider.getOMProxyInfos();
+ List<ProxyInfo> omProxies = omFailoverProxyProvider.getOMProxies();
// For a non-HA OM service, there should be only one OM proxy.
assertEquals(1, omProxies.size());
// The address in OMProxyInfo object, which client will connect to,
// should match the OM's RPC address.
- assertEquals(omProxies.get(0).getAddress(),
+ assertEquals(((OMProxyInfo) omProxies.get(0)).getAddress(),
ozoneManager.getOmRpcServerAddr());
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 5ba971d1967..8c55eb49caa 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -52,6 +52,7 @@
import javax.management.ObjectName;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneTestUtils;
@@ -292,8 +293,8 @@ void testOMProxyProviderInitialization() {
OmFailoverProxyUtil.getFailoverProxyProvider(
rpcClient.getObjectStore().getClientProxy());
- List<OMProxyInfo> omProxies =
- omFailoverProxyProvider.getOMProxyInfos();
+ List<ProxyInfo> omProxies =
+ omFailoverProxyProvider.getOMProxies();
assertEquals(getNumOfOMs(), omProxies.size());
@@ -301,7 +302,8 @@ void testOMProxyProviderInitialization() {
OzoneManager om = getCluster().getOzoneManager(i);
InetSocketAddress omRpcServerAddr = om.getOmRpcServerAddr();
boolean omClientProxyExists = false;
- for (OMProxyInfo omProxyInfo : omProxies) {
+ for (ProxyInfo proxyInfo : omProxies) {
+ OMProxyInfo omProxyInfo = (OMProxyInfo) proxyInfo;
if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
omClientProxyExists = true;
break;
@@ -368,7 +370,7 @@ public void testFailoverWithSuggestedLeader() throws
Exception {
// The OMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
String leaderOMAddress = ((OMProxyInfo)
- omFailoverProxyProvider.getOMProxyInfoMap().get(leaderOMNodeId))
+ omFailoverProxyProvider.getOMProxyMap().get(leaderOMNodeId))
.getAddress().getAddress().toString();
OzoneManager followerOM = null;
for (OzoneManager om: getCluster().getOzoneManagersList()) {
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index 0de7052c4ed..c7463a96890 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -23,7 +23,6 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -53,7 +52,7 @@ public class TestOMFailovers {
private Exception testException;
@Test
- public void testAccessContorlExceptionFailovers() throws Exception {
+ public void testAccessControlExceptionFailovers() throws Exception {
testException = new AccessControlException();
@@ -105,8 +104,7 @@ public OMResponse submitRequest(RpcController controller,
}
}
- private final class MockFailoverProxyProvider
- extends HadoopRpcOMFailoverProxyProvider {
+ private final class MockFailoverProxyProvider extends
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> {
private MockFailoverProxyProvider(ConfigurationSource configuration)
throws IOException {
@@ -114,31 +112,28 @@ private MockFailoverProxyProvider(ConfigurationSource
configuration)
}
@Override
- protected ProxyInfo createOMProxy(String nodeId) {
- ProxyInfo proxyInfo = new ProxyInfo<>(new
MockOzoneManagerProtocol(nodeId,
- testException), nodeId);
- getOMProxyMap().put(nodeId, proxyInfo);
- return proxyInfo;
+ protected ProxyInfo<OzoneManagerProtocolPB> createOMProxyIfNeeded(
+ OMProxyInfo<OzoneManagerProtocolPB> omProxyInfo) {
+ if (omProxyInfo.proxy == null) {
+ omProxyInfo.proxy = new
MockOzoneManagerProtocol(omProxyInfo.getNodeId(), testException);
+ }
+ return omProxyInfo;
}
@Override
- protected void loadOMClientConfigs(ConfigurationSource config,
+ protected void initOmProxiesFromConfigs(ConfigurationSource config,
String omSvcId) {
- HashMap<String, ProxyInfo<OzoneManagerProtocolPB>> omProxies =
- new HashMap<>();
- HashMap<String, OMProxyInfo> omProxyInfos = new HashMap<>();
- HashMap<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
+ HashMap<String, OMProxyInfo<OzoneManagerProtocolPB>> omProxyInfos = new
HashMap<>();
ArrayList<String> omNodeIDList = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
String nodeId = "om" + i;
- omProxies.put(nodeId, null);
- omProxyInfos.put(nodeId, null);
+ OMProxyInfo<OzoneManagerProtocolPB> omProxyInfo = new
OMProxyInfo<>(omSvcId, nodeId,
+ "127.0.0.1:9862");
+ omProxyInfos.put(nodeId, omProxyInfo);
omNodeIDList.add(nodeId);
- omNodeAddressMap.put(nodeId, null);
}
- setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList,
- omNodeAddressMap);
+ initOmProxies(omProxyInfos, omNodeIDList);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]