This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 14bc7bee48 Expand types of servers that respond to ping command (#4958) 14bc7bee48 is described below commit 14bc7bee48515d335042cea1cdfb95b1840e161b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Oct 15 10:24:54 2024 -0400 Expand types of servers that respond to ping command (#4958) Closes #4952 --- .../core/client/admin/InstanceOperations.java | 15 +++++++-- .../core/clientImpl/InstanceOperationsImpl.java | 14 +++++--- .../server/client/ClientServiceHandler.java | 2 +- .../accumulo/shell/commands/PingCommand.java | 27 ++++++++++++---- .../accumulo/shell/commands/PingIterator.java | 12 +++---- .../apache/accumulo/test/InstanceOperationsIT.java | 36 +++++++++++++++++++++ .../apache/accumulo/test/shell/ShellServerIT.java | 37 ++++++++++++++++++---- 7 files changed, 115 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java index 110812390a..783b353430 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java @@ -314,14 +314,23 @@ public interface InstanceOperations { List<ActiveCompaction> getActiveCompactions() throws AccumuloException, AccumuloSecurityException; /** - * Throws an exception if a tablet server can not be contacted. + * Check to see if a server process at the host and port is up and responding to RPC requests. * - * @param tserver The tablet server address. This should be of the form - * {@code <ip address>:<port>} + * @param tserver The server address. This should be of the form {@code <ip address>:<port>} + * @throws AccumuloException if the server cannot be contacted * @since 1.5.0 */ void ping(String tserver) throws AccumuloException; + /** + * Check to see if a server process at the host and port is up and responding to RPC requests. + * + * @param server ServerId object for the server to be pinged, only the host and port is used. + * @throws AccumuloException if the server cannot be contacted + * @since 4.0.0 + */ + void ping(ServerId server) throws AccumuloException; + /** * Test to see if the instance can load the given class as the given type. This check does not * consider per table classpaths, see diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index b5646ce294..1ba125ab0f 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -50,6 +50,7 @@ import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.clientImpl.thrift.ClientService; import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType; import org.apache.accumulo.core.clientImpl.thrift.TVersionedProperties; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; @@ -427,15 +428,20 @@ public class InstanceOperationsImpl implements InstanceOperations { } @Override - public void ping(String tserver) throws AccumuloException { - try (TTransport transport = createTransport(AddressUtil.parseAddress(tserver), context)) { - Client client = createClient(ThriftClientTypes.TABLET_SERVER, transport); - client.getTabletServerStatus(TraceUtil.traceInfo(), context.rpcCreds()); + public void ping(String server) throws AccumuloException { + try (TTransport transport = createTransport(AddressUtil.parseAddress(server), context)) { + ClientService.Client client = createClient(ThriftClientTypes.CLIENT, transport); + client.ping(context.rpcCreds()); } catch (TException e) { throw new AccumuloException(e); } } + @Override + public void ping(ServerId server) throws AccumuloException { + ping(server.toHostPortString()); + } + @Override public void waitForBalance() throws AccumuloException { try { diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java index 61040fc54a..05b62ebe75 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java @@ -110,7 +110,7 @@ public class ClientServiceHandler implements ClientService.Iface { @Override public void ping(TCredentials credentials) { // anybody can call this; no authentication check - log.info("Manager reports: I just got pinged!"); + log.info("I just got pinged!"); } @Override diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java index 006a7e3b12..fb73bd34d7 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java @@ -31,31 +31,37 @@ import org.apache.commons.cli.Options; public class PingCommand extends Command { - private Option tserverOption, disablePaginationOpt; + private Option serverOption, tserverOption, disablePaginationOpt; @Override public String description() { - return "ping tablet servers"; + return "ping compactors, scan servers, or tablet servers"; } @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { - final List<String> tservers = new ArrayList<>(); + final List<String> servers = new ArrayList<>(); final InstanceOperations instanceOps = shellState.getAccumuloClient().instanceOperations(); final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt()); if (cl.hasOption(tserverOption.getOpt())) { - tservers.add(cl.getOptionValue(tserverOption.getOpt())); + servers.add(cl.getOptionValue(tserverOption.getOpt())); + } else if (cl.hasOption(serverOption.getOpt())) { + servers.add(cl.getOptionValue(serverOption.getOpt())); } else { + instanceOps.getServers(ServerId.Type.COMPACTOR) + .forEach(s -> servers.add(s.toHostPortString())); + instanceOps.getServers(ServerId.Type.SCAN_SERVER) + .forEach(s -> servers.add(s.toHostPortString())); instanceOps.getServers(ServerId.Type.TABLET_SERVER) - .forEach(s -> tservers.add(s.toHostPortString())); + .forEach(s -> servers.add(s.toHostPortString())); } - shellState.printLines(new PingIterator(tservers, instanceOps), paginate); + shellState.printLines(new PingIterator(servers, instanceOps), paginate); return 0; } @@ -69,7 +75,14 @@ public class PingCommand extends Command { public Options getOptions() { final Options opts = new Options(); - tserverOption = new Option("ts", "tabletServer", true, "tablet server to ping"); + serverOption = + new Option("s", "server", true, "compactor, scan server, or tablet server address to ping"); + serverOption.setArgName("server address"); + opts.addOption(serverOption); + + // Leaving here for backwards compatibility + tserverOption = new Option("ts", "tabletServer", true, + "compactor, scan server, or tablet server address to ping"); tserverOption.setArgName("tablet server"); opts.addOption(tserverOption); diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/PingIterator.java b/shell/src/main/java/org/apache/accumulo/shell/commands/PingIterator.java index bcd3a8a811..c3d81e7f99 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/PingIterator.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/PingIterator.java @@ -29,8 +29,8 @@ class PingIterator implements Iterator<String> { private Iterator<String> iter; private InstanceOperations instanceOps; - PingIterator(List<String> tservers, InstanceOperations instanceOps) { - iter = tservers.iterator(); + PingIterator(List<String> servers, InstanceOperations instanceOps) { + iter = servers.iterator(); this.instanceOps = instanceOps; } @@ -41,15 +41,15 @@ class PingIterator implements Iterator<String> { @Override public String next() { - String tserver = iter.next(); + String server = iter.next(); try { - instanceOps.ping(tserver); + instanceOps.ping(server); } catch (AccumuloException e) { - return tserver + " ERROR " + e.getMessage(); + return server + " ERROR " + e.getMessage(); } - return tserver + " OK"; + return server + " OK"; } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java index 5372bad803..3fd18e353d 100644 --- a/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -37,6 +38,7 @@ import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; @@ -90,6 +92,40 @@ public class InstanceOperationsIT extends AccumuloClusterHarness { } } + @Test + public void testPing() throws Exception { + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.COMPACTOR).size() == 3); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).size() == 2); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1); + + final InstanceOperations io = client.instanceOperations(); + Set<ServerId> servers = io.getServers(ServerId.Type.COMPACTOR); + for (ServerId sid : servers) { + io.ping(sid); + } + + servers = io.getServers(ServerId.Type.SCAN_SERVER); + for (ServerId sid : servers) { + io.ping(sid); + } + + servers = io.getServers(ServerId.Type.TABLET_SERVER); + for (ServerId sid : servers) { + io.ping(sid); + } + + ServerId fake = new ServerId(ServerId.Type.COMPACTOR, Constants.DEFAULT_RESOURCE_GROUP_NAME, + "localhost", 1024); + assertThrows(AccumuloException.class, () -> io.ping(fake)); + } + + } + private void validateAddresses(Collection<String> e, Set<ServerId> addresses) { List<String> actual = new ArrayList<>(addresses.size()); addresses.forEach(a -> actual.add(a.toHostPortString())); diff --git a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java index 8ced6d6128..3bd99c5015 100644 --- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java @@ -48,6 +48,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.SortedSet; import java.util.regex.Pattern; @@ -1571,16 +1572,38 @@ public class ShellServerIT extends SharedMiniClusterBase { @Test public void ping() throws Exception { - for (int i = 0; i < 10; i++) { - ts.exec("ping", true, "OK", true); - // wait for both tservers to start up - if (ts.output.get().split("\n").length == 3) { - break; + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.COMPACTOR).size() == 1); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).size() == 1); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1); + + Set<ServerId> servers = client.instanceOperations().getServers(ServerId.Type.COMPACTOR); + for (ServerId sid : servers) { + ts.exec("ping -s " + sid.toHostPortString(), true, "OK", true); + ts.exec("ping -ts " + sid.toHostPortString(), true, "OK", true); + } + + servers = client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER); + for (ServerId sid : servers) { + ts.exec("ping -s " + sid.toHostPortString(), true, "OK", true); + ts.exec("ping -ts " + sid.toHostPortString(), true, "OK", true); + } + + servers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); + for (ServerId sid : servers) { + ts.exec("ping -s " + sid.toHostPortString(), true, "OK", true); + ts.exec("ping -ts " + sid.toHostPortString(), true, "OK", true); } - Thread.sleep(SECONDS.toMillis(1)); + ts.exec("ping", true, "OK", true); + // Should be 3, but there is an extra line with a single apostrophe trailing + assertEquals(4, ts.output.get().split("\n").length); } - assertEquals(2, ts.output.get().split("\n").length); + } @Test