This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new cd87901aa2 ThriftTransportPool checker thread change (#5613)
cd87901aa2 is described below

commit cd87901aa21c5634a5b9324729eabd958e9608aa
Author: Kevin Rathbun <krath...@apache.org>
AuthorDate: Thu Jun 12 08:58:40 2025 -0400

    ThriftTransportPool checker thread change (#5613)
    
    * ThriftTransportPool checker thread change:
    
    Now critical if running in a server process and non-critical if running
    in a client process. Was previously always non-critical
    
    * log warn on InterruptedException for same thread
---
 .../accumulo/core/clientImpl/ClientContext.java    |  7 ++++++-
 .../core/clientImpl/ThriftTransportPool.java       | 24 +++++++++++++++++-----
 .../org/apache/accumulo/server/ServerContext.java  |  6 ++++++
 3 files changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 2b154c0519..c2bcabe01e 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -1118,9 +1118,14 @@ public class ClientContext implements AccumuloClient {
   }
 
   public synchronized ThriftTransportPool getTransportPool() {
+    return getTransportPoolImpl(false);
+  }
+
+  protected synchronized ThriftTransportPool getTransportPoolImpl(boolean 
shouldHalt) {
     ensureOpen();
     if (thriftTransportPool == null) {
-      thriftTransportPool = 
ThriftTransportPool.startNew(this::getTransportPoolMaxAgeMillis);
+      thriftTransportPool =
+          ThriftTransportPool.startNew(this::getTransportPoolMaxAgeMillis, 
shouldHalt);
     }
     return thriftTransportPool;
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index ceb35c5471..75db7bffed 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@ -71,9 +71,19 @@ public class ThriftTransportPool {
 
   private final LongSupplier maxAgeMillis;
 
-  private ThriftTransportPool(LongSupplier maxAgeMillis) {
+  private ThriftTransportPool(LongSupplier maxAgeMillis, boolean shouldHalt) {
     this.maxAgeMillis = maxAgeMillis;
-    this.checkThread = Threads.createNonCriticalThread("Thrift Connection Pool 
Checker", () -> {
+    final String threadName = "Thrift Connection Pool Checker";
+    final Runnable checkRunner = thriftConnectionPoolChecker();
+    if (shouldHalt) {
+      this.checkThread = Threads.createCriticalThread(threadName, checkRunner);
+    } else {
+      this.checkThread = Threads.createNonCriticalThread(threadName, 
checkRunner);
+    }
+  }
+
+  private Runnable thriftConnectionPoolChecker() {
+    return () -> {
       try {
         final long minNanos = MILLISECONDS.toNanos(250);
         final long maxNanos = MINUTES.toNanos(1);
@@ -90,21 +100,25 @@ public class ThriftTransportPool {
           }
         }
       } catch (InterruptedException e) {
+        log.warn("Thread " + Thread.currentThread().getName() + " was 
interrupted. Exiting.");
         Thread.currentThread().interrupt();
       } catch (TransportPoolShutdownException e) {
         log.debug("Error closing expired connections", e);
       }
-    });
+    };
   }
 
   /**
    * Create a new instance and start its checker thread, returning the 
instance.
    *
    * @param maxAgeMillis the supplier for the max age of idle transports 
before they are cleaned up
+   * @param shouldHalt true if the death of the checker thread via a 
RuntimeException should halt
+   *        the JVM; false otherwise. This should be true if the thread is 
running within a server
+   *        and false if the thread is running within a client
    * @return a new instance with its checker thread started to clean up idle 
transports
    */
-  static ThriftTransportPool startNew(LongSupplier maxAgeMillis) {
-    var pool = new ThriftTransportPool(maxAgeMillis);
+  static ThriftTransportPool startNew(LongSupplier maxAgeMillis, boolean 
shouldHalt) {
+    var pool = new ThriftTransportPool(maxAgeMillis, shouldHalt);
     log.debug("Set thrift transport pool idle time to {}ms", 
maxAgeMillis.getAsLong());
     pool.checkThread.start();
     return pool;
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java 
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index 4366967c2e..a2a1f93fd4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -42,6 +42,7 @@ import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -445,6 +446,11 @@ public class ServerContext extends ClientContext {
     return getClientTimeoutInMillis();
   }
 
+  @Override
+  public synchronized ThriftTransportPool getTransportPool() {
+    return getTransportPoolImpl(true);
+  }
+
   public AuditedSecurityOperation getSecurityOperation() {
     return securityOperation.get();
   }

Reply via email to