This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new e2efd04  Remove unnecessary TServerUtils stop method (#2455)
e2efd04 is described below

commit e2efd0400d4206de188d4960b273892d725f3a82
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Wed Feb 2 10:39:29 2022 -0500

    Remove unnecessary TServerUtils stop method (#2455)
    
    With newer versions of Thrift, the executor service used inside the
    server is reachable by calling `getInvoker()`. So, it is no longer
    necessary to use reflection to try to access the private field to shut
    it down. This change removes that reflection code and shuts down the
    executor service in the stop() method of our custom subclass. Since that
    is substantially simpler, it is no longer necessary to have a dedicated
    static utility method to stop a server. That method is removed and
    stop() is called directly.
---
 .../server/rpc/CustomNonBlockingServer.java        | 13 ++++
 .../apache/accumulo/server/rpc/TServerUtils.java   | 23 ------
 .../accumulo/server/util/TServerUtilsTest.java     | 82 ++++------------------
 .../org/apache/accumulo/compactor/Compactor.java   |  4 +-
 .../apache/accumulo/compactor/CompactorTest.java   |  4 --
 .../java/org/apache/accumulo/manager/Manager.java  |  5 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  8 ++-
 .../apache/accumulo/test/rpc/ThriftBehaviorIT.java |  7 +-
 8 files changed, 39 insertions(+), 107 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
index 57f7824..761b987 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
@@ -29,6 +29,8 @@ import 
org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TNonblockingTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class implements a custom non-blocking thrift server that stores the 
client address in
@@ -36,6 +38,7 @@ import org.apache.thrift.transport.TTransportException;
  */
 public class CustomNonBlockingServer extends THsHaServer {
 
+  private static final Logger log = 
LoggerFactory.getLogger(CustomNonBlockingServer.class);
   private final Field selectAcceptThreadField;
 
   public CustomNonBlockingServer(Args args) {
@@ -50,6 +53,16 @@ public class CustomNonBlockingServer extends THsHaServer {
   }
 
   @Override
+  public void stop() {
+    super.stop();
+    try {
+      getInvoker().shutdownNow();
+    } catch (Exception e) {
+      log.error("Unable to call shutdownNow", e);
+    }
+  }
+
+  @Override
   protected boolean startThreads() {
     // Yet another dirty/gross hack to get access to the client's address.
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 429746c..f4bc078 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.server.rpc;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -693,28 +692,6 @@ public class TServerUtils {
   }
 
   /**
-   * Stop a Thrift TServer. Existing connections will keep our thread running; 
use reflection to
-   * forcibly shut down the threadpool.
-   *
-   * @param s
-   *          The TServer to stop
-   */
-  public static void stopTServer(TServer s) {
-    if (s == null) {
-      return;
-    }
-    s.stop();
-    try {
-      Field f = s.getClass().getDeclaredField("executorService_");
-      f.setAccessible(true);
-      ExecutorService es = (ExecutorService) f.get(s);
-      es.shutdownNow();
-    } catch (Exception e) {
-      log.error("Unable to call shutdownNow", e);
-    }
-  }
-
-  /**
    * Wrap the provided processor in the {@link UGIAssumingProcessor} so 
Kerberos authentication
    * works. Requires the <code>serverType</code> to be {@link 
ThriftServerType#SASL} and throws an
    * exception when it is not.
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index ad32d6a..c75c7d2 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.accumulo.server.util;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
@@ -35,7 +33,6 @@ import java.net.ServerSocket;
 import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.accumulo.core.clientImpl.thrift.ClientService.Iface;
 import org.apache.accumulo.core.clientImpl.thrift.ClientService.Processor;
@@ -49,7 +46,6 @@ import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftServerType;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TServerSocket;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
@@ -59,60 +55,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 public class TServerUtilsTest {
 
-  private static class TServerWithoutES extends TServer {
-    boolean stopCalled;
-
-    TServerWithoutES(TServerSocket socket) {
-      super(new TServer.Args(socket));
-      stopCalled = false;
-    }
-
-    @Override
-    public void serve() {}
-
-    @Override
-    public void stop() {
-      stopCalled = true;
-    }
-  }
-
-  private static class TServerWithES extends TServerWithoutES {
-    final ExecutorService executorService_;
-
-    TServerWithES(TServerSocket socket) {
-      super(socket);
-      executorService_ = createMock(ExecutorService.class);
-      expect(executorService_.shutdownNow()).andReturn(null);
-      replay(executorService_);
-    }
-  }
-
-  @Test
-  public void testStopTServer_ES() {
-    TServerSocket socket = createNiceMock(TServerSocket.class);
-    replay(socket);
-    TServerWithES s = new TServerWithES(socket);
-    TServerUtils.stopTServer(s);
-    assertTrue(s.stopCalled);
-    verify(socket, s.executorService_);
-  }
-
-  @Test
-  public void testStopTServer_NoES() {
-    TServerSocket socket = createNiceMock(TServerSocket.class);
-    replay(socket);
-    TServerWithoutES s = new TServerWithoutES(socket);
-    TServerUtils.stopTServer(s);
-    assertTrue(s.stopCalled);
-    verify(socket);
-  }
-
-  @Test
-  public void testStopTServer_Null() {
-    TServerUtils.stopTServer(null);
-    // not dying is enough
-  }
-
   private ServerContext context;
   private final ConfigurationCopy conf = new 
ConfigurationCopy(DefaultConfiguration.getInstance());
 
@@ -149,8 +91,8 @@ public class TServerUtilsTest {
       assertNotNull(server);
       assertTrue(address.getAddress().getPort() > 1024);
     } finally {
-      if (null != server) {
-        TServerUtils.stopTServer(server);
+      if (server != null) {
+        server.stop();
       }
     }
   }
@@ -167,8 +109,8 @@ public class TServerUtilsTest {
       assertNotNull(server);
       assertEquals(port, address.getAddress().getPort());
     } finally {
-      if (null != server) {
-        TServerUtils.stopTServer(server);
+      if (server != null) {
+        server.stop();
       }
     }
   }
@@ -203,8 +145,8 @@ public class TServerUtilsTest {
       assertNotNull(server);
       assertEquals(port[1], address.getAddress().getPort());
     } finally {
-      if (null != server) {
-        TServerUtils.stopTServer(server);
+      if (server != null) {
+        server.stop();
       }
 
     }
@@ -266,8 +208,8 @@ public class TServerUtilsTest {
       // Finally ensure that the TServer is using the last port (i.e. port 
search worked)
       assertTrue(address.getAddress().getPort() == tserverFinalPort);
     } finally {
-      if (null != server) {
-        TServerUtils.stopTServer(server);
+      if (server != null) {
+        server.stop();
       }
 
     }
@@ -287,8 +229,8 @@ public class TServerUtilsTest {
       assertTrue(
           port[0] == address.getAddress().getPort() || port[1] == 
address.getAddress().getPort());
     } finally {
-      if (null != server) {
-        TServerUtils.stopTServer(server);
+      if (server != null) {
+        server.stop();
       }
     }
   }
@@ -310,8 +252,8 @@ public class TServerUtilsTest {
       assertNotNull(server);
       assertEquals(port[1], address.getAddress().getPort());
     } finally {
-      if (null != server) {
-        TServerUtils.stopTServer(server);
+      if (server != null) {
+        server.stop();
       }
     }
   }
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 3717572..5838777 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -831,7 +831,9 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     } finally {
       // Shutdown local thrift server
       LOG.info("Stopping Thrift Servers");
-      TServerUtils.stopTServer(compactorAddress.server);
+      if (compactorAddress.server != null) {
+        compactorAddress.server.stop();
+      }
 
       try {
         LOG.debug("Closing filesystems");
diff --git 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index f1132d2..6e1bbdc 100644
--- 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -49,7 +49,6 @@ import org.apache.accumulo.server.ServerContext;
 import 
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.rpc.ServerAddress;
-import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.easymock.EasyMock;
@@ -322,7 +321,6 @@ public class CompactorTest {
 
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
-    PowerMock.suppress(PowerMock.methods(TServerUtils.class, "stopTServer"));
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
 
     ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
@@ -375,7 +373,6 @@ public class CompactorTest {
 
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
-    PowerMock.suppress(PowerMock.methods(TServerUtils.class, "stopTServer"));
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
 
     ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
@@ -430,7 +427,6 @@ public class CompactorTest {
 
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
-    PowerMock.suppress(PowerMock.methods(TServerUtils.class, "stopTServer"));
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
 
     ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 6e6b5ae..c0b8894 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1232,7 +1232,10 @@ public class Manager extends AbstractServer
     } catch (InterruptedException e) {
       throw new IllegalStateException("Exception stopping replication 
workers", e);
     }
-    TServerUtils.stopTServer(replServer.get());
+    var nullableReplServer = replServer.get();
+    if (nullableReplServer != null) {
+      nullableReplServer.stop();
+    }
 
     // Signal that we want it to stop, and wait for it to do so.
     if (authenticationTokenKeyManager != null) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index d335278f..e875360 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -906,10 +906,14 @@ public class TabletServer extends AbstractServer {
       }
     }
     log.debug("Stopping Replication Server");
-    TServerUtils.stopTServer(this.replServer);
+    if (this.replServer != null) {
+      this.replServer.stop();
+    }
 
     log.debug("Stopping Thrift Servers");
-    TServerUtils.stopTServer(server);
+    if (server != null) {
+      server.stop();
+    }
 
     try {
       log.debug("Closing filesystems");
diff --git 
a/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java 
b/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java
index 2515aa5..d43cfa2 100644
--- a/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java
@@ -88,12 +88,7 @@ public class ThriftBehaviorIT {
 
   @Test
   public void echoFail() throws TException {
-    try {
-      client.echoFail(KITTY_MSG);
-      fail("Thrift client did not throw an expected exception");
-    } catch (Exception e) {
-      assertEquals(TApplicationException.class.getName(), 
e.getClass().getName());
-    }
+    assertThrows(TApplicationException.class, () -> 
client.echoFail(KITTY_MSG));
     // verify normal two-way method still passes using same client
     echoPass();
   }

Reply via email to