This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new e2efd04 Remove unnecessary TServerUtils stop method (#2455) e2efd04 is described below commit e2efd0400d4206de188d4960b273892d725f3a82 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Wed Feb 2 10:39:29 2022 -0500 Remove unnecessary TServerUtils stop method (#2455) With newer versions of Thrift, the executor service used inside the server is reachable by calling `getInvoker()`. So, it is no longer necessary to use reflection to try to access the private field to shut it down. This change removes that reflection code and shuts down the executor service in the stop() method of our custom subclass. Since that is substantially simpler, it is no longer necessary to have a dedicated static utility method to stop a server. That method is removed and stop() is called directly. --- .../server/rpc/CustomNonBlockingServer.java | 13 ++++ .../apache/accumulo/server/rpc/TServerUtils.java | 23 ------ .../accumulo/server/util/TServerUtilsTest.java | 82 ++++------------------ .../org/apache/accumulo/compactor/Compactor.java | 4 +- .../apache/accumulo/compactor/CompactorTest.java | 4 -- .../java/org/apache/accumulo/manager/Manager.java | 5 +- .../org/apache/accumulo/tserver/TabletServer.java | 8 ++- .../apache/accumulo/test/rpc/ThriftBehaviorIT.java | 7 +- 8 files changed, 39 insertions(+), 107 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java index 57f7824..761b987 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java @@ -29,6 +29,8 @@ import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingSocket; import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class implements a custom non-blocking thrift server that stores the client address in @@ -36,6 +38,7 @@ import org.apache.thrift.transport.TTransportException; */ public class CustomNonBlockingServer extends THsHaServer { + private static final Logger log = LoggerFactory.getLogger(CustomNonBlockingServer.class); private final Field selectAcceptThreadField; public CustomNonBlockingServer(Args args) { @@ -50,6 +53,16 @@ public class CustomNonBlockingServer extends THsHaServer { } @Override + public void stop() { + super.stop(); + try { + getInvoker().shutdownNow(); + } catch (Exception e) { + log.error("Unable to call shutdownNow", e); + } + } + + @Override protected boolean startThreads() { // Yet another dirty/gross hack to get access to the client's address. diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 429746c..f4bc078 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -21,7 +21,6 @@ package org.apache.accumulo.server.rpc; import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; -import java.lang.reflect.Field; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -693,28 +692,6 @@ public class TServerUtils { } /** - * Stop a Thrift TServer. Existing connections will keep our thread running; use reflection to - * forcibly shut down the threadpool. - * - * @param s - * The TServer to stop - */ - public static void stopTServer(TServer s) { - if (s == null) { - return; - } - s.stop(); - try { - Field f = s.getClass().getDeclaredField("executorService_"); - f.setAccessible(true); - ExecutorService es = (ExecutorService) f.get(s); - es.shutdownNow(); - } catch (Exception e) { - log.error("Unable to call shutdownNow", e); - } - } - - /** * Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication * works. Requires the <code>serverType</code> to be {@link ThriftServerType#SASL} and throws an * exception when it is not. diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java index ad32d6a..c75c7d2 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.server.util; -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; @@ -35,7 +33,6 @@ import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutorService; import org.apache.accumulo.core.clientImpl.thrift.ClientService.Iface; import org.apache.accumulo.core.clientImpl.thrift.ClientService.Processor; @@ -49,7 +46,6 @@ import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.thrift.server.TServer; -import org.apache.thrift.transport.TServerSocket; import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; @@ -59,60 +55,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class TServerUtilsTest { - private static class TServerWithoutES extends TServer { - boolean stopCalled; - - TServerWithoutES(TServerSocket socket) { - super(new TServer.Args(socket)); - stopCalled = false; - } - - @Override - public void serve() {} - - @Override - public void stop() { - stopCalled = true; - } - } - - private static class TServerWithES extends TServerWithoutES { - final ExecutorService executorService_; - - TServerWithES(TServerSocket socket) { - super(socket); - executorService_ = createMock(ExecutorService.class); - expect(executorService_.shutdownNow()).andReturn(null); - replay(executorService_); - } - } - - @Test - public void testStopTServer_ES() { - TServerSocket socket = createNiceMock(TServerSocket.class); - replay(socket); - TServerWithES s = new TServerWithES(socket); - TServerUtils.stopTServer(s); - assertTrue(s.stopCalled); - verify(socket, s.executorService_); - } - - @Test - public void testStopTServer_NoES() { - TServerSocket socket = createNiceMock(TServerSocket.class); - replay(socket); - TServerWithoutES s = new TServerWithoutES(socket); - TServerUtils.stopTServer(s); - assertTrue(s.stopCalled); - verify(socket); - } - - @Test - public void testStopTServer_Null() { - TServerUtils.stopTServer(null); - // not dying is enough - } - private ServerContext context; private final ConfigurationCopy conf = new ConfigurationCopy(DefaultConfiguration.getInstance()); @@ -149,8 +91,8 @@ public class TServerUtilsTest { assertNotNull(server); assertTrue(address.getAddress().getPort() > 1024); } finally { - if (null != server) { - TServerUtils.stopTServer(server); + if (server != null) { + server.stop(); } } } @@ -167,8 +109,8 @@ public class TServerUtilsTest { assertNotNull(server); assertEquals(port, address.getAddress().getPort()); } finally { - if (null != server) { - TServerUtils.stopTServer(server); + if (server != null) { + server.stop(); } } } @@ -203,8 +145,8 @@ public class TServerUtilsTest { assertNotNull(server); assertEquals(port[1], address.getAddress().getPort()); } finally { - if (null != server) { - TServerUtils.stopTServer(server); + if (server != null) { + server.stop(); } } @@ -266,8 +208,8 @@ public class TServerUtilsTest { // Finally ensure that the TServer is using the last port (i.e. port search worked) assertTrue(address.getAddress().getPort() == tserverFinalPort); } finally { - if (null != server) { - TServerUtils.stopTServer(server); + if (server != null) { + server.stop(); } } @@ -287,8 +229,8 @@ public class TServerUtilsTest { assertTrue( port[0] == address.getAddress().getPort() || port[1] == address.getAddress().getPort()); } finally { - if (null != server) { - TServerUtils.stopTServer(server); + if (server != null) { + server.stop(); } } } @@ -310,8 +252,8 @@ public class TServerUtilsTest { assertNotNull(server); assertEquals(port[1], address.getAddress().getPort()); } finally { - if (null != server) { - TServerUtils.stopTServer(server); + if (server != null) { + server.stop(); } } } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 3717572..5838777 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -831,7 +831,9 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac } finally { // Shutdown local thrift server LOG.info("Stopping Thrift Servers"); - TServerUtils.stopTServer(compactorAddress.server); + if (compactorAddress.server != null) { + compactorAddress.server.stop(); + } try { LOG.debug("Closing filesystems"); diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index f1132d2..6e1bbdc 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -49,7 +49,6 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.rpc.ServerAddress; -import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.easymock.EasyMock; @@ -322,7 +321,6 @@ public class CompactorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.methods(Halt.class, "halt")); - PowerMock.suppress(PowerMock.methods(TServerUtils.class, "stopTServer")); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); @@ -375,7 +373,6 @@ public class CompactorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.methods(Halt.class, "halt")); - PowerMock.suppress(PowerMock.methods(TServerUtils.class, "stopTServer")); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); @@ -430,7 +427,6 @@ public class CompactorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.methods(Halt.class, "halt")); - PowerMock.suppress(PowerMock.methods(TServerUtils.class, "stopTServer")); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 6e6b5ae..c0b8894 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1232,7 +1232,10 @@ public class Manager extends AbstractServer } catch (InterruptedException e) { throw new IllegalStateException("Exception stopping replication workers", e); } - TServerUtils.stopTServer(replServer.get()); + var nullableReplServer = replServer.get(); + if (nullableReplServer != null) { + nullableReplServer.stop(); + } // Signal that we want it to stop, and wait for it to do so. if (authenticationTokenKeyManager != null) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index d335278f..e875360 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -906,10 +906,14 @@ public class TabletServer extends AbstractServer { } } log.debug("Stopping Replication Server"); - TServerUtils.stopTServer(this.replServer); + if (this.replServer != null) { + this.replServer.stop(); + } log.debug("Stopping Thrift Servers"); - TServerUtils.stopTServer(server); + if (server != null) { + server.stop(); + } try { log.debug("Closing filesystems"); diff --git a/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java b/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java index 2515aa5..d43cfa2 100644 --- a/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java @@ -88,12 +88,7 @@ public class ThriftBehaviorIT { @Test public void echoFail() throws TException { - try { - client.echoFail(KITTY_MSG); - fail("Thrift client did not throw an expected exception"); - } catch (Exception e) { - assertEquals(TApplicationException.class.getName(), e.getClass().getName()); - } + assertThrows(TApplicationException.class, () -> client.echoFail(KITTY_MSG)); // verify normal two-way method still passes using same client echoPass(); }