Apache9 commented on code in PR #8100:
URL: https://github.com/apache/hbase/pull/8100#discussion_r3104239438


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterIdFetcher.java:
##########
@@ -88,10 +91,9 @@ class ClusterIdFetcher {
   private void getClusterId(int index) {
     ServerName server = bootstrapServers.get(index);
     LOG.debug("Going to request {} for getting cluster id", server);
-    // user and rpcTimeout are both not important here, as we will not 
actually send any rpc calls
-    // out, only a preamble connection header, but if we pass null as user, 
there will be NPE in
-    // some code paths...
-    RpcChannel channel = rpcClient.createRpcChannel(server, user, 0);
+    // The preamble exchange still requires TCP connect and potentially TLS 
negotiation, so we use
+    // the configured rpc timeout to bound the connection attempt.
+    RpcChannel channel = rpcClient.createRpcChannel(server, user, 
rpcTimeoutMs);

Review Comment:
   We only send a preamble header here, I wonder whether the rpc timeout 
actually work. And I can make sure that we do not have TLS negotiation when 
sending preamble header.



##########
hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryRpcClientFailure.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import 
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
+
+/**
+ * Test that ConnectionRegistryRpcStubHolder properly completes its future 
when RPC client creation
+ * fails. Before the fix, an exception thrown by 
RpcClientFactory.createClient() inside the
+ * FutureUtils.addListener callback would be swallowed, leaving the 
CompletableFuture permanently
+ * incomplete and hanging all callers.
+ */
+@Tag(ClientTests.TAG)
+@Tag(SmallTests.TAG)
+public class TestConnectionRegistryRpcClientFailure {
+
+  private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = 
"hbase.test.hedged.reqs.fanout";
+  private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
+    "hbase.test.refresh.initial.delay.secs";
+  private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
+    "hbase.test.refresh.interval.secs";
+  private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
+    "hbase.test.min.refresh.interval.secs";
+
+  private static final HBaseCommonTestingUtil UTIL = new 
HBaseCommonTestingUtil();
+
+  private static Set<ServerName> BOOTSTRAP_NODES;
+
+  /**
+   * RPC client that succeeds for the preamble cluster ID fetch (clusterId is 
null) but throws on
+   * the real client creation (clusterId is non-null). This simulates the 
production failure where
+   * TLS certificate provisioning fails during RPC client construction.
+   */
+  public static final class FailingRpcClientImpl implements RpcClient {
+
+    public FailingRpcClientImpl(Configuration configuration, String clusterId,
+      SocketAddress localAddress, MetricsConnection metrics, Map<String, 
byte[]> attributes) {
+      if (clusterId != null) {
+        throw new RuntimeException("Simulated RPC client creation failure");
+      }
+    }
+
+    @Override
+    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User 
user, int rpcTimeout) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RpcChannel createRpcChannel(ServerName sn, User user, int 
rpcTimeout) {
+      return new PreambleOnlyRpcChannel();
+    }
+
+    @Override
+    public void cancelConnections(ServerName sn) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean hasCellBlockSupport() {
+      return false;
+    }
+  }
+
+  /**
+   * RPC channel that only handles the preamble GetConnectionRegistry call, 
returning a valid
+   * cluster ID. All other RPCs are ignored.
+   */
+  public static final class PreambleOnlyRpcChannel implements RpcChannel {
+
+    @Override
+    public void callMethod(MethodDescriptor method, RpcController controller, 
Message request,
+      Message responsePrototype, RpcCallback<Message> done) {
+      if 
(method.getService().equals(ConnectionRegistryService.getDescriptor())) {
+        done.run(
+          
GetConnectionRegistryResponse.newBuilder().setClusterId("test-cluster-id").build());
+      } else {
+        controller.setFailed("unexpected call");
+        done.run(null);
+      }
+    }
+  }
+
+  @BeforeAll
+  public static void setUpBeforeClass() {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 
FailingRpcClientImpl.class,
+      RpcClient.class);
+    conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
+    conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
+    conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
+    BOOTSTRAP_NODES = IntStream.range(0, 3)
+      .mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), 
ServerName.NON_STARTCODE))
+      .collect(Collectors.toSet());
+  }
+
+  private AbstractRpcBasedConnectionRegistry createRegistry() throws 
IOException {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, 1);
+    return new AbstractRpcBasedConnectionRegistry(conf, User.getCurrent(),
+      HEDGED_REQS_FANOUT_CONFIG_NAME, INITIAL_DELAY_SECS_CONFIG_NAME,
+      REFRESH_INTERVAL_SECS_CONFIG_NAME, 
MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
+
+      @Override
+      protected Set<ServerName> getBootstrapNodes(Configuration conf) throws 
IOException {
+        return BOOTSTRAP_NODES;
+      }
+
+      @Override
+      protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
+        return CompletableFuture.completedFuture(BOOTSTRAP_NODES);
+      }
+
+      @Override
+      public String getConnectionString() {
+        return "unimplemented";
+      }
+    };
+  }
+
+  /**
+   * Verify that when RPC client creation throws during stub initialization, 
the registry's future
+   * completes exceptionally rather than hanging indefinitely.
+   */
+  @Test
+  public void testRpcClientCreationFailureCompletesExceptionally() throws 
Exception {
+    try (AbstractRpcBasedConnectionRegistry registry = createRegistry()) {
+      CompletableFuture<String> future = registry.getClusterId();
+      // The future must complete within a few seconds. Before the fix, this 
would hang forever.
+      IOException e = assertThrows(IOException.class,
+        () -> FutureUtils.get(future, 5, TimeUnit.SECONDS));
+      assertTrue(e.getCause().getMessage().contains("Simulated RPC client 
creation failure"),
+        "Expected simulated failure in cause chain, got: " + e);
+    }
+  }
+
+  /**
+   * Verify that after a failed stub initialization, subsequent getClusterId() 
calls also fail
+   * promptly rather than returning a zombie future.
+   */
+  @Test
+  public void testSubsequentCallsAfterFailure() throws Exception {

Review Comment:
   Does this test fail before the patch? We will create a ClusterIdFetcher 
every time when fetching in RpcConnectionRegistry, so I do not get the point 
why we could return a zombie future...



##########
hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryRpcClientFailure.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import 
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
+
+/**
+ * Test that ConnectionRegistryRpcStubHolder properly completes its future 
when RPC client creation
+ * fails. Before the fix, an exception thrown by 
RpcClientFactory.createClient() inside the
+ * FutureUtils.addListener callback would be swallowed, leaving the 
CompletableFuture permanently
+ * incomplete and hanging all callers.
+ */
+@Tag(ClientTests.TAG)
+@Tag(SmallTests.TAG)
+public class TestConnectionRegistryRpcClientFailure {
+
+  private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = 
"hbase.test.hedged.reqs.fanout";
+  private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
+    "hbase.test.refresh.initial.delay.secs";
+  private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
+    "hbase.test.refresh.interval.secs";
+  private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
+    "hbase.test.min.refresh.interval.secs";
+
+  private static final HBaseCommonTestingUtil UTIL = new 
HBaseCommonTestingUtil();
+
+  private static Set<ServerName> BOOTSTRAP_NODES;
+
+  /**
+   * RPC client that succeeds for the preamble cluster ID fetch (clusterId is 
null) but throws on
+   * the real client creation (clusterId is non-null). This simulates the 
production failure where
+   * TLS certificate provisioning fails during RPC client construction.
+   */
+  public static final class FailingRpcClientImpl implements RpcClient {
+
+    public FailingRpcClientImpl(Configuration configuration, String clusterId,
+      SocketAddress localAddress, MetricsConnection metrics, Map<String, 
byte[]> attributes) {
+      if (clusterId != null) {
+        throw new RuntimeException("Simulated RPC client creation failure");

Review Comment:
   So the actually problem is here, we may throw exception when creating 
RpcClient?
   
   Looking at the code, neither BlockingRpcClient nor NettyRpcClient will throw 
exception when constructing.
   
   ```
     @SuppressWarnings("FutureReturnValueIgnored")
     public static <T> void addListener(CompletableFuture<T> future,
       BiConsumer<? super T, ? super Throwable> action) {
       future.whenComplete((resp, error) -> {
         try {
           // See this post on stack overflow(shorten since the url is too 
long),
           // https://s.apache.org/completionexception
           // For a chain of CompletableFuture, only the first child 
CompletableFuture can get the
           // original exception, others will get a CompletionException, which 
wraps the original
           // exception. So here we unwrap it before passing it to the callback 
action.
           action.accept(resp, unwrapCompletionException(error));
         } catch (Throwable t) {
           LOG.error("Unexpected error caught when processing 
CompletableFuture", t);
         }
       });
     }
   ```
   
   We will log something out when the callback throws exception, have you seen 
something like this?



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java:
##########
@@ -106,23 +106,35 @@ private ImmutableMap<ServerName, 
ClientMetaService.Interface> createStubs(RpcCli
     CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future =
       new CompletableFuture<>();
     addr2StubFuture = future;
-    FutureUtils.addListener(
-      new ClusterIdFetcher(noAuthConf, user, rpcControllerFactory, 
bootstrapNodes).fetchClusterId(),
-      (clusterId, error) -> {
-        synchronized (ConnectionRegistryRpcStubHolder.this) {
-          if (error != null) {
-            addr2StubFuture.completeExceptionally(error);
-          } else {
-            RpcClient c = RpcClientFactory.createClient(conf, clusterId);
-            ImmutableMap<ServerName, ClientMetaService.Interface> m =
-              createStubs(c, bootstrapNodes);
-            rpcClient = c;
-            addr2Stub = m;
-            addr2StubFuture.complete(m);
+    try {

Review Comment:
   Why we need the try catch here? FutureUtils.addListener does not throw any 
exception out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to