This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8829f964d6e HDDS-14455. Fix HadoopRpcOMFailoverProxyProvider generic
warnings. (#9657)
8829f964d6e is described below
commit 8829f964d6e7b1ab94f34db6323dab20ca140e77
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Jan 21 22:06:05 2026 -0800
HDDS-14455. Fix HadoopRpcOMFailoverProxyProvider generic warnings. (#9657)
---
.../ozone/om/protocolPB/Hadoop3OmTransport.java | 26 ++----------
.../protocolPB/OMAdminProtocolClientSideImpl.java | 5 +--
.../OMInterServiceProtocolClientSideImpl.java | 4 +-
.../om/protocolPB/OzoneManagerProtocolPB.java | 7 ++++
.../ozone/client/rpc/OzoneRpcClientTests.java | 14 +++----
.../{OmFailoverProxyUtil.java => OmTestUtil.java} | 28 +++++--------
.../hadoop/ozone/om/TestOMRatisSnapshots.java | 28 ++++---------
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 9 +---
.../ozone/om/TestOzoneManagerHAWithAllRunning.java | 48 ++++++++--------------
.../om/TestOzoneManagerHAWithStoppedNodes.java | 24 ++++-------
.../snapshot/TestOzoneManagerSnapshotProvider.java | 6 +--
.../snapshot/TestSnapshotBackgroundServices.java | 6 +--
.../hadoop/fs/ozone/Hadoop27RpcTransport.java | 25 ++---------
13 files changed, 70 insertions(+), 160 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 60dd34ead2a..f774f884554 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -24,7 +24,6 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtobufHelper;
import org.apache.hadoop.ipc_.ProtobufRpcEngine;
import org.apache.hadoop.ipc_.RPC;
@@ -45,7 +44,7 @@ public class Hadoop3OmTransport implements OmTransport {
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
- private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
+ private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
private final OzoneManagerProtocolPB rpcProxy;
@@ -56,14 +55,13 @@ public Hadoop3OmTransport(ConfigurationSource conf,
OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+ this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
-
- this.rpcProxy = createRetryProxy(omFailoverProxyProvider, maxFailovers);
+ this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider,
maxFailovers);
}
@Override
@@ -96,24 +94,8 @@ public Text getDelegationTokenService() {
return omFailoverProxyProvider.getCurrentProxyDelegationToken();
}
- /**
- * Creates a {@link RetryProxy} encapsulating the
- * {@link HadoopRpcOMFailoverProxyProvider}. The retry proxy
- * fails over on network exception or if the current proxy
- * is not the leader OM.
- */
- private OzoneManagerProtocolPB createRetryProxy(
- HadoopRpcOMFailoverProxyProvider failoverProxyProvider,
- int maxFailovers) {
-
- OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
- OzoneManagerProtocolPB.class, failoverProxyProvider,
- failoverProxyProvider.getRetryPolicy(maxFailovers));
- return proxy;
- }
-
@VisibleForTesting
- public HadoopRpcOMFailoverProxyProvider getOmFailoverProxyProvider() {
+ public HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
getOmFailoverProxyProvider() {
return omFailoverProxyProvider;
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
index 126818fb237..8919a2479ef 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
@@ -132,9 +132,8 @@ public static OMAdminProtocolClientSideImpl
createProxyForOMHA(
RPC.setProtocolEngine(OzoneConfiguration.of(conf),
OMAdminProtocolPB.class, ProtobufRpcEngine.class);
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- new HadoopRpcOMFailoverProxyProvider(conf, ugi, omServiceId,
- OMAdminProtocolPB.class);
+ final HadoopRpcOMFailoverProxyProvider<OMAdminProtocolPB>
omFailoverProxyProvider
+ = new HadoopRpcOMFailoverProxyProvider<>(conf, ugi, omServiceId,
OMAdminProtocolPB.class);
// Multiple the max number of retries with number of OMs to calculate the
// max number of failovers.
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
index f227e37463d..28924f02d17 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
@@ -48,7 +48,7 @@ public class OMInterServiceProtocolClientSideImpl implements
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
- private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
+ private final HadoopRpcOMFailoverProxyProvider<OMInterServiceProtocolPB>
omFailoverProxyProvider;
private final OMInterServiceProtocolPB rpcProxy;
@@ -58,7 +58,7 @@ public
OMInterServiceProtocolClientSideImpl(ConfigurationSource conf,
RPC.setProtocolEngine(OzoneConfiguration.of(conf),
OMInterServiceProtocolPB.class, ProtobufRpcEngine.class);
- this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+ this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
conf, ugi, omServiceId, OMInterServiceProtocolPB.class);
int maxFailovers = conf.getInt(
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
index f2d248e3b95..f4f7c54403a 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.ozone.om.protocolPB;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtocolInfo;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService;
import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
import org.apache.hadoop.security.KerberosInfo;
@@ -37,4 +39,9 @@
@InterfaceAudience.Private
public interface OzoneManagerProtocolPB
extends OzoneManagerService.BlockingInterface {
+ static OzoneManagerProtocolPB
newProxy(OMFailoverProxyProviderBase<OzoneManagerProtocolPB>
failoverProxyProvider,
+ int maxFailovers) {
+ return (OzoneManagerProtocolPB)
RetryProxy.create(OzoneManagerProtocolPB.class, failoverProxyProvider,
+ failoverProxyProvider.getRetryPolicy(maxFailovers));
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
index dedc67fece3..31c826fabb0 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
@@ -131,7 +131,6 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -167,13 +166,12 @@
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmConfig;
-import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
+import org.apache.hadoop.ozone.om.OmTestUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ResolvedBucket;
import org.apache.hadoop.ozone.om.S3SecretManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -189,6 +187,7 @@
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.protocol.S3Auth;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -324,17 +323,14 @@ public static MiniOzoneCluster getCluster() {
*/
@Test
public void testOMClientProxyProvider() {
-
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
-
- List<ProxyInfo> omProxies = omFailoverProxyProvider.getOMProxies();
+ final List<OMProxyInfo<OzoneManagerProtocolPB>> omProxies
+ = OmTestUtil.getFailoverProxyProvider(store).getOMProxies();
// For a non-HA OM service, there should be only one OM proxy.
assertEquals(1, omProxies.size());
// The address in OMProxyInfo object, which client will connect to,
// should match the OM's RPC address.
- assertEquals(((OMProxyInfo) omProxies.get(0)).getAddress(),
+ assertEquals(omProxies.get(0).getAddress(),
ozoneManager.getOmRpcServerAddr());
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
similarity index 69%
rename from
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
rename to
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
index d494d82c885..a90aacd4b19 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
@@ -17,32 +17,24 @@
package org.apache.hadoop.ozone.om;
-import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransport;
import
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
-/**
- * Test utility to get the FailoverProxyProvider with cast.
- */
-public final class OmFailoverProxyUtil {
-
- private OmFailoverProxyUtil() {
- }
-
- /**
- * Get FailoverProxyProvider from RpcClient / ClientProtocol.
- */
- public static HadoopRpcOMFailoverProxyProvider getFailoverProxyProvider(
- ClientProtocol clientProtocol) {
-
+/** Utilities for testing {@link OzoneManager}. */
+public interface OmTestUtil {
+ static HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
getFailoverProxyProvider(ObjectStore store) {
OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient =
- (OzoneManagerProtocolClientSideTranslatorPB)
- ((RpcClient) clientProtocol).getOzoneManagerClient();
+ (OzoneManagerProtocolClientSideTranslatorPB)
store.getClientProxy().getOzoneManagerClient();
Hadoop3OmTransport transport =
(Hadoop3OmTransport) ozoneManagerClient.getTransport();
return transport.getOmFailoverProxyProvider();
}
+
+ static String getCurrentOmProxyNodeId(ObjectStore store) {
+ return getFailoverProxyProvider(store).getCurrentProxyOMNodeId();
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index c52aa5c91e4..7134463db3a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -199,9 +199,7 @@ public void shutdown() {
@Test
public void testInstallSnapshot(@TempDir Path tempDir) throws Exception {
// Get the leader OM
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
@@ -394,9 +392,7 @@ private void checkSnapshot(OzoneManager leaderOM,
OzoneManager followerOM,
public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
throws Exception {
// Get the leader OM
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
@@ -605,9 +601,7 @@ private IncrementData getNextIncrementalTarball(
@Unhealthy("HDDS-13300")
public void testInstallIncrementalSnapshotWithFailure() throws Exception {
// Get the leader OM
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
@@ -747,9 +741,7 @@ public void testInstallIncrementalSnapshotWithFailure()
throws Exception {
@Test
public void testInstallSnapshotWithClientWrite() throws Exception {
// Get the leader OM
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
@@ -846,9 +838,7 @@ public void testInstallSnapshotWithClientWrite() throws
Exception {
@Test
public void testInstallSnapshotWithClientRead() throws Exception {
// Get the leader OM
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
@@ -933,9 +923,7 @@ public void testInstallSnapshotWithClientRead() throws
Exception {
@Test
public void testInstallOldCheckpointFailure() throws Exception {
// Get the leader OM
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
@@ -996,9 +984,7 @@ public void testInstallOldCheckpointFailure() throws
Exception {
@Test
public void testInstallCorruptedCheckpointFailure() throws Exception {
// Get the leader OM
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index d3b338c6dd6..8b5edc177d4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -62,8 +62,6 @@
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.client.rpc.RpcClient;
-import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.junit.jupiter.api.AfterAll;
@@ -274,13 +272,8 @@ protected OzoneBucket linkBucket(OzoneBucket srcBuk)
throws Exception {
* Stop the current leader OM.
*/
protected void stopLeaderOM() {
- //Stop the leader OM.
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil.getFailoverProxyProvider(
- (RpcClient) objectStore.getClientProxy());
-
// The omFailoverProxyProvider will point to the current leader OM node.
- String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(getObjectStore());
// Stop one of the ozone manager, to see when the OM leader changes
// multipart upload is happening successfully or not.
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 8c55eb49caa..9e002899d55 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -52,14 +52,12 @@
import javax.management.ObjectName;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.protocol.StorageType;
-import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -67,6 +65,7 @@
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -287,23 +286,15 @@ public void testAllBucketOperations() throws Exception {
*/
@Test
void testOMProxyProviderInitialization() {
- OzoneClient rpcClient = getClient();
-
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil.getFailoverProxyProvider(
- rpcClient.getObjectStore().getClientProxy());
-
- List<ProxyInfo> omProxies =
- omFailoverProxyProvider.getOMProxies();
-
+ final List<OMProxyInfo<OzoneManagerProtocolPB>> omProxies
+ =
OmTestUtil.getFailoverProxyProvider(getClient().getObjectStore()).getOMProxies();
assertEquals(getNumOfOMs(), omProxies.size());
for (int i = 0; i < getNumOfOMs(); i++) {
OzoneManager om = getCluster().getOzoneManager(i);
InetSocketAddress omRpcServerAddr = om.getOmRpcServerAddr();
boolean omClientProxyExists = false;
- for (ProxyInfo proxyInfo : omProxies) {
- OMProxyInfo omProxyInfo = (OMProxyInfo) proxyInfo;
+ for (OMProxyInfo<OzoneManagerProtocolPB> omProxyInfo : omProxies) {
if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
omClientProxyExists = true;
break;
@@ -321,9 +312,8 @@ void testOMProxyProviderInitialization() {
@Test
public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
ObjectStore objectStore = getObjectStore();
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy());
+ final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider
+ = OmTestUtil.getFailoverProxyProvider(objectStore);
// Run couple of createVolume tests to discover the current Leader OM
createVolumeTest(true);
@@ -360,17 +350,15 @@ public void testOMProxyProviderFailoverToCurrentLeader()
throws Exception {
*/
@Test
public void testFailoverWithSuggestedLeader() throws Exception {
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil
- .getFailoverProxyProvider(getObjectStore().getClientProxy());
+ final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider
+ = OmTestUtil.getFailoverProxyProvider(getObjectStore());
// Make sure All OMs are ready.
createVolumeTest(true);
// The OMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
- String leaderOMAddress = ((OMProxyInfo)
- omFailoverProxyProvider.getOMProxyMap().get(leaderOMNodeId))
+ String leaderOMAddress =
omFailoverProxyProvider.getOMProxyMap().get(leaderOMNodeId)
.getAddress().getAddress().toString();
OzoneManager followerOM = null;
for (OzoneManager om: getCluster().getOzoneManagersList()) {
@@ -404,9 +392,8 @@ public void testReadRequest() throws Exception {
ObjectStore objectStore = getObjectStore();
objectStore.createVolume(volumeName);
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy());
+ final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider
+ = OmTestUtil.getFailoverProxyProvider(objectStore);
String leaderId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
@@ -417,8 +404,8 @@ public void testReadRequest() throws Exception {
// Get the ObjectStore and FailoverProxyProvider for OM at index i
final ObjectStore store = getClient().getObjectStore();
- final HadoopRpcOMFailoverProxyProvider proxyProvider =
- OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
+ final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
proxyProvider
+ = OmTestUtil.getFailoverProxyProvider(store);
// Failover to the OM node that the objectStore points to
omFailoverProxyProvider.setNextOmProxy(
@@ -455,9 +442,8 @@ public void testOMRetryCache() throws Exception {
objectStore.createVolume(randomUUID().toString());
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy());
+ final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider
+ = OmTestUtil.getFailoverProxyProvider(objectStore);
String currentLeaderNodeId = omFailoverProxyProvider
.getCurrentProxyOMNodeId();
@@ -1097,9 +1083,7 @@ void testOMRatisSnapshot() throws Exception {
retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager ozoneManager = getCluster().getOzoneManager(leaderOMNodeId);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java
index eed911223dc..ceda27f5f24 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java
@@ -65,6 +65,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.UserGroupInformation;
@@ -166,9 +167,8 @@ private void testMultipartUploadWithOneOmNodeDown() throws
Exception {
// Stop leader OM, to see when the OM leader changes
// multipart upload is happening successfully or not.
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil
- .getFailoverProxyProvider(getObjectStore().getClientProxy());
+ final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider
+ = OmTestUtil.getFailoverProxyProvider(getObjectStore());
// The omFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
@@ -234,9 +234,8 @@ private void createMultipartKeyAndReadKey(OzoneBucket
ozoneBucket,
public void testOMProxyProviderFailoverOnConnectionFailure()
throws Exception {
ObjectStore objectStore = getObjectStore();
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy());
+ final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider
+ = OmTestUtil.getFailoverProxyProvider(objectStore);
String firstProxyNodeId =
omFailoverProxyProvider.getCurrentProxyOMNodeId();
createVolumeTest(true);
@@ -267,10 +266,7 @@ void testOMRestart() throws Exception {
ObjectStore objectStore = getObjectStore();
// Get the leader OM
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
-
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = getCluster().getOzoneManager(leaderOMNodeId);
// Get follower OM
@@ -439,8 +435,7 @@ public void testKeyDeletion() throws Exception {
OzoneManager ozoneManager = getCluster().getOMLeader();
- KeyDeletingService keyDeletingService =
- (KeyDeletingService) ozoneManager.getKeyManager().getDeletingService();
+ final KeyDeletingService keyDeletingService =
ozoneManager.getKeyManager().getDeletingService();
// Check on leader OM Count.
GenericTestUtils.waitFor(() ->
@@ -482,9 +477,8 @@ void testIncrementalWaitTimeWithSameNodeFailover() throws
Exception {
long waitBetweenRetries = getConf().getLong(
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
- HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
- OmFailoverProxyUtil
- .getFailoverProxyProvider(getObjectStore().getClientProxy());
+ final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider
+ = OmTestUtil.getFailoverProxyProvider(getObjectStore());
// The omFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
index 53f9a7a7d13..0b89eb1b67c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
@@ -37,7 +37,7 @@
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
+import org.apache.hadoop.ozone.om.OmTestUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.junit.jupiter.api.AfterEach;
@@ -96,9 +96,7 @@ public void testDownloadCheckpoint() throws Exception {
retVolumeinfo.createBucket(bucketName);
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java
index 5e2fdfdc1f4..e5da202421a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java
@@ -61,8 +61,8 @@
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.conf.OMClientConfig;
import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmTestUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SstFilteringService;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
@@ -366,9 +366,7 @@ private OzoneManager getInactiveFollowerOM(OzoneManager
leaderOM) {
}
private OzoneManager getLeaderOM() {
- String leaderOMNodeId = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy())
- .getCurrentProxyOMNodeId();
+ final String leaderOMNodeId =
OmTestUtil.getCurrentOmProxyNodeId(objectStore);
return cluster.getOzoneManager(leaderOMNodeId);
}
diff --git
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index 61a25abda26..0d38d357b8a 100644
---
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -23,7 +23,6 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtobufHelper;
import org.apache.hadoop.ipc_.ProtobufRpcEngine;
import org.apache.hadoop.ipc_.RPC;
@@ -45,7 +44,7 @@ public class Hadoop27RpcTransport implements OmTransport {
private final OzoneManagerProtocolPB rpcProxy;
- private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
+ private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
public Hadoop27RpcTransport(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -54,15 +53,13 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+ this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
-
- rpcProxy = createRetryProxy(omFailoverProxyProvider, maxFailovers);
-
+ this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider,
maxFailovers);
}
@Override
@@ -95,24 +92,8 @@ public Text getDelegationTokenService() {
return null;
}
- /**
- * Creates a {@link RetryProxy} encapsulating the
- * {@link HadoopRpcOMFailoverProxyProvider}. The retry proxy fails over on
- * network exception or if the current proxy is not the leader OM.
- */
- private OzoneManagerProtocolPB createRetryProxy(
- HadoopRpcOMFailoverProxyProvider failoverProxyProvider,
- int maxFailovers) {
-
- OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
- OzoneManagerProtocolPB.class, failoverProxyProvider,
- failoverProxyProvider.getRetryPolicy(maxFailovers));
- return proxy;
- }
-
@Override
public void close() throws IOException {
omFailoverProxyProvider.close();
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]