This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 0c4c31625b Enable user to specify specific server for Thrift client 
calls (#4880)
0c4c31625b is described below

commit 0c4c31625b2eb6680eef4818b61b324a1914a794
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Oct 3 16:12:00 2024 -0400

    Enable user to specify specific server for Thrift client calls (#4880)
    
    Allow the user to set a system property to the address of a server
    to use when making calls to the Client Thrift API. Example:
    org.apache.accumulo.client.rpc.debug.host="localhost:1234"
    
    Closes #4823
---
 .../accumulo/core/rpc/clients/TServerClient.java   | 77 ++++++++++++++++------
 .../test/functional/DebugClientConnectionIT.java   | 71 ++++++++++++++++++++
 2 files changed, 127 insertions(+), 21 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index 4027f4b0c9..c09f46ab00 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@ -23,6 +23,8 @@ import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -54,6 +56,8 @@ import com.google.common.net.HostAndPort;
 
 public interface TServerClient<C extends TServiceClient> {
 
+  static final String DEBUG_HOST = "org.apache.accumulo.client.rpc.debug.host";
+
   Pair<String,C> getThriftServerConnection(ClientContext context, boolean 
preferCachedConnections)
       throws TTransportException;
 
@@ -62,7 +66,9 @@ public interface TServerClient<C extends TServiceClient> {
       ThriftService service) throws TTransportException {
     checkArgument(context != null, "context is null");
 
-    if (preferCachedConnections) {
+    final String debugHost = System.getProperty(DEBUG_HOST, null);
+
+    if (preferCachedConnections && debugHost == null) {
       Pair<String,TTransport> cachedTransport =
           context.getTransportPool().getAnyCachedTransport(type);
       if (cachedTransport != null) {
@@ -79,28 +85,40 @@ public interface TServerClient<C extends TServiceClient> {
     final ZooCache zc = context.getZooCache();
 
     final List<String> serverPaths = new ArrayList<>();
-    zc.getChildren(tserverZooPath).forEach(tserverAddress -> {
-      serverPaths.add(tserverZooPath + "/" + tserverAddress);
-    });
-    if (type == ThriftClientTypes.CLIENT) {
-      zc.getChildren(sserverZooPath).forEach(sserverAddress -> {
-        serverPaths.add(sserverZooPath + "/" + sserverAddress);
-      });
+    if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+      // add all three paths to the set even though they may not be correct.
+      // The entire set will be checked in the code below to validate
+      // that the path is correct and the lock is held and will return the
+      // correct one.
+      serverPaths.add(tserverZooPath + "/" + debugHost);
+      serverPaths.add(sserverZooPath + "/" + debugHost);
       zc.getChildren(compactorZooPath).forEach(compactorGroup -> {
-        zc.getChildren(compactorZooPath + "/" + 
compactorGroup).forEach(compactorAddress -> {
-          serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + 
compactorAddress);
-        });
+        serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + 
debugHost);
       });
-    }
-
-    if (serverPaths.isEmpty()) {
-      if (warned.compareAndSet(false, true)) {
-        LOG.warn(
-            "There are no servers serving the {} api: check that zookeeper and 
accumulo are running.",
-            type);
+    } else {
+      zc.getChildren(tserverZooPath).forEach(tserverAddress -> {
+        serverPaths.add(tserverZooPath + "/" + tserverAddress);
+      });
+      if (type == ThriftClientTypes.CLIENT) {
+        zc.getChildren(sserverZooPath).forEach(sserverAddress -> {
+          serverPaths.add(sserverZooPath + "/" + sserverAddress);
+        });
+        zc.getChildren(compactorZooPath).forEach(compactorGroup -> {
+          zc.getChildren(compactorZooPath + "/" + 
compactorGroup).forEach(compactorAddress -> {
+            serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + 
compactorAddress);
+          });
+        });
+      }
+      if (serverPaths.isEmpty()) {
+        if (warned.compareAndSet(false, true)) {
+          LOG.warn(
+              "There are no servers serving the {} api: check that zookeeper 
and accumulo are running.",
+              type);
+        }
+        throw new TTransportException("There are no servers for type: " + 
type);
       }
-      throw new TTransportException("There are no servers for type: " + type);
     }
+
     Collections.shuffle(serverPaths, RANDOM.get());
 
     for (String serverPath : serverPaths) {
@@ -113,10 +131,19 @@ public interface TServerClient<C extends TServiceClient> {
             TTransport transport = 
context.getTransportPool().getTransport(type,
                 tserverClientAddress, rpcTimeout, context, 
preferCachedConnections);
             C client = ThriftUtil.createClient(type, transport);
+            if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+              LOG.info("Connecting to debug host: {}", debugHost);
+            }
             warned.set(false);
             return new Pair<String,C>(tserverClientAddress.toString(), client);
           } catch (TTransportException e) {
-            LOG.trace("Error creating transport to {}", tserverClientAddress);
+            if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+              LOG.error(
+                  "Error creating transport to debug host: {}. If this server 
is down, then you will need to remove or change the system property {}.",
+                  debugHost, DEBUG_HOST);
+            } else {
+              LOG.trace("Error creating transport to {}", 
tserverClientAddress);
+            }
             continue;
           }
         }
@@ -127,7 +154,15 @@ public interface TServerClient<C extends TServiceClient> {
       LOG.warn("Failed to find an available server in the list of servers: {} 
for API type: {}",
           serverPaths, type);
     }
-    throw new TTransportException("Failed to connect to any server for API 
type " + type);
+    // Need to throw a different exception, when a TTransportException is
+    // thrown below, then the operation will be retried endlessly.
+    if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+      throw new UncheckedIOException("Error creating transport to debug host: 
" + debugHost
+          + ". If this server is down, then you will need to remove or change 
the system property "
+          + DEBUG_HOST + ".", new IOException(""));
+    } else {
+      throw new TTransportException("Failed to connect to any server for API 
type " + type);
+    }
   }
 
   default <R> R execute(Logger LOG, ClientContext context, Exec<R,C> exec)
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
new file mode 100644
index 0000000000..9769f4c10e
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.UncheckedIOException;
+import java.util.List;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.rpc.clients.TServerClient;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class DebugClientConnectionIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setNumTservers(2);
+  }
+
+  private List<String> tservers = null;
+
+  @BeforeEach
+  public void getTServerAddresses() {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      tservers = client.instanceOperations().getTabletServers();
+    }
+    assertNotNull(tservers);
+    assertEquals(2, tservers.size());
+  }
+
+  @Test
+  public void testPreferredConnection() throws Exception {
+    System.setProperty(TServerClient.DEBUG_HOST, tservers.get(0));
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      assertNotNull(client.instanceOperations().getSiteConfiguration());
+    }
+    System.setProperty(TServerClient.DEBUG_HOST, tservers.get(1));
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      assertNotNull(client.instanceOperations().getSiteConfiguration());
+    }
+    System.setProperty(TServerClient.DEBUG_HOST, "localhost:1");
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      assertThrows(UncheckedIOException.class,
+          () -> client.instanceOperations().getSiteConfiguration());
+    }
+  }
+}

Reply via email to