This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push: new 0c4c31625b Enable user to specify specific server for Thrift client calls (#4880) 0c4c31625b is described below commit 0c4c31625b2eb6680eef4818b61b324a1914a794 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Oct 3 16:12:00 2024 -0400 Enable user to specify specific server for Thrift client calls (#4880) Allow the user to set a system property to the address of a server to use when making calls to the Client Thrift API. Example: org.apache.accumulo.client.rpc.debug.host="localhost:1234" Closes #4823 --- .../accumulo/core/rpc/clients/TServerClient.java | 77 ++++++++++++++++------ .../test/functional/DebugClientConnectionIT.java | 71 ++++++++++++++++++++ 2 files changed, 127 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java index 4027f4b0c9..c09f46ab00 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java @@ -23,6 +23,8 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -54,6 +56,8 @@ import com.google.common.net.HostAndPort; public interface TServerClient<C extends TServiceClient> { + static final String DEBUG_HOST = "org.apache.accumulo.client.rpc.debug.host"; + Pair<String,C> getThriftServerConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException; @@ -62,7 +66,9 @@ public interface TServerClient<C extends TServiceClient> { ThriftService service) throws TTransportException { checkArgument(context != null, "context is null"); - if (preferCachedConnections) { + final String debugHost = System.getProperty(DEBUG_HOST, null); + + if (preferCachedConnections && debugHost == null) { Pair<String,TTransport> cachedTransport = context.getTransportPool().getAnyCachedTransport(type); if (cachedTransport != null) { @@ -79,28 +85,40 @@ public interface TServerClient<C extends TServiceClient> { final ZooCache zc = context.getZooCache(); final List<String> serverPaths = new ArrayList<>(); - zc.getChildren(tserverZooPath).forEach(tserverAddress -> { - serverPaths.add(tserverZooPath + "/" + tserverAddress); - }); - if (type == ThriftClientTypes.CLIENT) { - zc.getChildren(sserverZooPath).forEach(sserverAddress -> { - serverPaths.add(sserverZooPath + "/" + sserverAddress); - }); + if (type == ThriftClientTypes.CLIENT && debugHost != null) { + // add all three paths to the set even though they may not be correct. + // The entire set will be checked in the code below to validate + // that the path is correct and the lock is held and will return the + // correct one. + serverPaths.add(tserverZooPath + "/" + debugHost); + serverPaths.add(sserverZooPath + "/" + debugHost); zc.getChildren(compactorZooPath).forEach(compactorGroup -> { - zc.getChildren(compactorZooPath + "/" + compactorGroup).forEach(compactorAddress -> { - serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + compactorAddress); - }); + serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + debugHost); }); - } - - if (serverPaths.isEmpty()) { - if (warned.compareAndSet(false, true)) { - LOG.warn( - "There are no servers serving the {} api: check that zookeeper and accumulo are running.", - type); + } else { + zc.getChildren(tserverZooPath).forEach(tserverAddress -> { + serverPaths.add(tserverZooPath + "/" + tserverAddress); + }); + if (type == ThriftClientTypes.CLIENT) { + zc.getChildren(sserverZooPath).forEach(sserverAddress -> { + serverPaths.add(sserverZooPath + "/" + sserverAddress); + }); + zc.getChildren(compactorZooPath).forEach(compactorGroup -> { + zc.getChildren(compactorZooPath + "/" + compactorGroup).forEach(compactorAddress -> { + serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + compactorAddress); + }); + }); + } + if (serverPaths.isEmpty()) { + if (warned.compareAndSet(false, true)) { + LOG.warn( + "There are no servers serving the {} api: check that zookeeper and accumulo are running.", + type); + } + throw new TTransportException("There are no servers for type: " + type); } - throw new TTransportException("There are no servers for type: " + type); } + Collections.shuffle(serverPaths, RANDOM.get()); for (String serverPath : serverPaths) { @@ -113,10 +131,19 @@ public interface TServerClient<C extends TServiceClient> { TTransport transport = context.getTransportPool().getTransport(type, tserverClientAddress, rpcTimeout, context, preferCachedConnections); C client = ThriftUtil.createClient(type, transport); + if (type == ThriftClientTypes.CLIENT && debugHost != null) { + LOG.info("Connecting to debug host: {}", debugHost); + } warned.set(false); return new Pair<String,C>(tserverClientAddress.toString(), client); } catch (TTransportException e) { - LOG.trace("Error creating transport to {}", tserverClientAddress); + if (type == ThriftClientTypes.CLIENT && debugHost != null) { + LOG.error( + "Error creating transport to debug host: {}. If this server is down, then you will need to remove or change the system property {}.", + debugHost, DEBUG_HOST); + } else { + LOG.trace("Error creating transport to {}", tserverClientAddress); + } continue; } } @@ -127,7 +154,15 @@ public interface TServerClient<C extends TServiceClient> { LOG.warn("Failed to find an available server in the list of servers: {} for API type: {}", serverPaths, type); } - throw new TTransportException("Failed to connect to any server for API type " + type); + // Need to throw a different exception, when a TTransportException is + // thrown below, then the operation will be retried endlessly. + if (type == ThriftClientTypes.CLIENT && debugHost != null) { + throw new UncheckedIOException("Error creating transport to debug host: " + debugHost + + ". If this server is down, then you will need to remove or change the system property " + + DEBUG_HOST + ".", new IOException("")); + } else { + throw new TTransportException("Failed to connect to any server for API type " + type); + } } default <R> R execute(Logger LOG, ClientContext context, Exec<R,C> exec) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java new file mode 100644 index 0000000000..9769f4c10e --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.UncheckedIOException; +import java.util.List; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.rpc.clients.TServerClient; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class DebugClientConnectionIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(2); + } + + private List<String> tservers = null; + + @BeforeEach + public void getTServerAddresses() { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + tservers = client.instanceOperations().getTabletServers(); + } + assertNotNull(tservers); + assertEquals(2, tservers.size()); + } + + @Test + public void testPreferredConnection() throws Exception { + System.setProperty(TServerClient.DEBUG_HOST, tservers.get(0)); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertNotNull(client.instanceOperations().getSiteConfiguration()); + } + System.setProperty(TServerClient.DEBUG_HOST, tservers.get(1)); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertNotNull(client.instanceOperations().getSiteConfiguration()); + } + System.setProperty(TServerClient.DEBUG_HOST, "localhost:1"); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertThrows(UncheckedIOException.class, + () -> client.instanceOperations().getSiteConfiguration()); + } + } +}