This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit a514131192ddebf6345a61e98714e5f10033aaac
Merge: f9d8afebba 0c4c31625b
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Oct 3 20:38:28 2024 +0000

    Merge branch '3.1'

 .../accumulo/core/rpc/clients/TServerClient.java   | 67 +++++++++++++++-----
 .../test/functional/DebugClientConnectionIT.java   | 71 ++++++++++++++++++++++
 2 files changed, 123 insertions(+), 15 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index 2c70796095,c09f46ab00..c99250d2b8
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@@ -72,26 -79,51 +78,40 @@@ public interface TServerClient<C extend
      }
  
      final long rpcTimeout = context.getClientTimeoutInMillis();
 -    final String tserverZooPath = context.getZooKeeperRoot() + 
Constants.ZTSERVERS;
 -    final String sserverZooPath = context.getZooKeeperRoot() + 
Constants.ZSSERVERS;
 -    final String compactorZooPath = context.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
      final ZooCache zc = context.getZooCache();
 -
 -    final List<String> serverPaths = new ArrayList<>();
 +    final List<ServiceLockPath> serverPaths = new ArrayList<>();
-     serverPaths
-         .addAll(context.getServerPaths().getCompactor(Optional.empty(), 
Optional.empty(), true));
-     serverPaths
-         .addAll(context.getServerPaths().getScanServer(Optional.empty(), 
Optional.empty(), true));
-     serverPaths
-         .addAll(context.getServerPaths().getTabletServer(Optional.empty(), 
Optional.empty(), true));
-     if (serverPaths.isEmpty()) {
-       if (warned.compareAndSet(false, true)) {
-         LOG.warn(
-             "There are no servers serving the {} api: check that zookeeper 
and accumulo are running.",
-             type);
+     if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+       // add all three paths to the set even though they may not be correct.
+       // The entire set will be checked in the code below to validate
+       // that the path is correct and the lock is held and will return the
+       // correct one.
 -      serverPaths.add(tserverZooPath + "/" + debugHost);
 -      serverPaths.add(sserverZooPath + "/" + debugHost);
 -      zc.getChildren(compactorZooPath).forEach(compactorGroup -> {
 -        serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + 
debugHost);
 -      });
++      Optional<HostAndPort> hp = 
Optional.of(HostAndPort.fromString(debugHost));
++      
serverPaths.addAll(context.getServerPaths().getCompactor(Optional.empty(), hp, 
true));
++      
serverPaths.addAll(context.getServerPaths().getScanServer(Optional.empty(), hp, 
true));
++      
serverPaths.addAll(context.getServerPaths().getTabletServer(Optional.empty(), 
hp, true));
+     } else {
 -      zc.getChildren(tserverZooPath).forEach(tserverAddress -> {
 -        serverPaths.add(tserverZooPath + "/" + tserverAddress);
 -      });
++      serverPaths.addAll(
++          context.getServerPaths().getTabletServer(Optional.empty(), 
Optional.empty(), true));
+       if (type == ThriftClientTypes.CLIENT) {
 -        zc.getChildren(sserverZooPath).forEach(sserverAddress -> {
 -          serverPaths.add(sserverZooPath + "/" + sserverAddress);
 -        });
 -        zc.getChildren(compactorZooPath).forEach(compactorGroup -> {
 -          zc.getChildren(compactorZooPath + "/" + 
compactorGroup).forEach(compactorAddress -> {
 -            serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + 
compactorAddress);
 -          });
 -        });
++        serverPaths.addAll(
++            context.getServerPaths().getCompactor(Optional.empty(), 
Optional.empty(), true));
++        serverPaths.addAll(
++            context.getServerPaths().getScanServer(Optional.empty(), 
Optional.empty(), true));
+       }
+       if (serverPaths.isEmpty()) {
+         if (warned.compareAndSet(false, true)) {
+           LOG.warn(
+               "There are no servers serving the {} api: check that zookeeper 
and accumulo are running.",
+               type);
+         }
+         throw new TTransportException("There are no servers for type: " + 
type);
        }
-       throw new TTransportException("There are no servers for type: " + type);
      }
+ 
      Collections.shuffle(serverPaths, RANDOM.get());
  
 -    for (String serverPath : serverPaths) {
 -      var zLocPath = ServiceLock.path(serverPath);
 -      Optional<ServiceLockData> data = zc.getLockData(zLocPath);
 +    for (ServiceLockPath path : serverPaths) {
 +      Optional<ServiceLockData> data = zc.getLockData(path);
        if (data != null && data.isPresent()) {
          HostAndPort tserverClientAddress = 
data.orElseThrow().getAddress(service);
          if (tserverClientAddress != null) {
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
index 0000000000,9769f4c10e..5bf6004fe1
mode 000000,100644..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
@@@ -1,0 -1,71 +1,71 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test.functional;
+ 
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertNotNull;
+ import static org.junit.jupiter.api.Assertions.assertThrows;
+ 
+ import java.io.UncheckedIOException;
+ import java.util.List;
+ 
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.rpc.clients.TServerClient;
+ import org.apache.accumulo.harness.AccumuloClusterHarness;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.hadoop.conf.Configuration;
+ import org.junit.jupiter.api.BeforeEach;
+ import org.junit.jupiter.api.Test;
+ 
+ public class DebugClientConnectionIT extends AccumuloClusterHarness {
+ 
+   @Override
+   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 -    cfg.setNumTservers(2);
++    cfg.getClusterServerConfiguration().setNumDefaultTabletServers(2);
+   }
+ 
+   private List<String> tservers = null;
+ 
+   @BeforeEach
+   public void getTServerAddresses() {
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       tservers = client.instanceOperations().getTabletServers();
+     }
+     assertNotNull(tservers);
+     assertEquals(2, tservers.size());
+   }
+ 
+   @Test
+   public void testPreferredConnection() throws Exception {
+     System.setProperty(TServerClient.DEBUG_HOST, tservers.get(0));
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       assertNotNull(client.instanceOperations().getSiteConfiguration());
+     }
+     System.setProperty(TServerClient.DEBUG_HOST, tservers.get(1));
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       assertNotNull(client.instanceOperations().getSiteConfiguration());
+     }
+     System.setProperty(TServerClient.DEBUG_HOST, "localhost:1");
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       assertThrows(UncheckedIOException.class,
+           () -> client.instanceOperations().getSiteConfiguration());
+     }
+   }
+ }

Reply via email to