This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new cd87901aa2 ThriftTransportPool checker thread change (#5613) cd87901aa2 is described below commit cd87901aa21c5634a5b9324729eabd958e9608aa Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Thu Jun 12 08:58:40 2025 -0400 ThriftTransportPool checker thread change (#5613) * ThriftTransportPool checker thread change: Now critical if running in a server process and non-critical if running in a client process. Was previously always non-critical * log warn on InterruptedException for same thread --- .../accumulo/core/clientImpl/ClientContext.java | 7 ++++++- .../core/clientImpl/ThriftTransportPool.java | 24 +++++++++++++++++----- .../org/apache/accumulo/server/ServerContext.java | 6 ++++++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 2b154c0519..c2bcabe01e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -1118,9 +1118,14 @@ public class ClientContext implements AccumuloClient { } public synchronized ThriftTransportPool getTransportPool() { + return getTransportPoolImpl(false); + } + + protected synchronized ThriftTransportPool getTransportPoolImpl(boolean shouldHalt) { ensureOpen(); if (thriftTransportPool == null) { - thriftTransportPool = ThriftTransportPool.startNew(this::getTransportPoolMaxAgeMillis); + thriftTransportPool = + ThriftTransportPool.startNew(this::getTransportPoolMaxAgeMillis, shouldHalt); } return thriftTransportPool; } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java index ceb35c5471..75db7bffed 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java @@ -71,9 +71,19 @@ public class ThriftTransportPool { private final LongSupplier maxAgeMillis; - private ThriftTransportPool(LongSupplier maxAgeMillis) { + private ThriftTransportPool(LongSupplier maxAgeMillis, boolean shouldHalt) { this.maxAgeMillis = maxAgeMillis; - this.checkThread = Threads.createNonCriticalThread("Thrift Connection Pool Checker", () -> { + final String threadName = "Thrift Connection Pool Checker"; + final Runnable checkRunner = thriftConnectionPoolChecker(); + if (shouldHalt) { + this.checkThread = Threads.createCriticalThread(threadName, checkRunner); + } else { + this.checkThread = Threads.createNonCriticalThread(threadName, checkRunner); + } + } + + private Runnable thriftConnectionPoolChecker() { + return () -> { try { final long minNanos = MILLISECONDS.toNanos(250); final long maxNanos = MINUTES.toNanos(1); @@ -90,21 +100,25 @@ public class ThriftTransportPool { } } } catch (InterruptedException e) { + log.warn("Thread " + Thread.currentThread().getName() + " was interrupted. Exiting."); Thread.currentThread().interrupt(); } catch (TransportPoolShutdownException e) { log.debug("Error closing expired connections", e); } - }); + }; } /** * Create a new instance and start its checker thread, returning the instance. * * @param maxAgeMillis the supplier for the max age of idle transports before they are cleaned up + * @param shouldHalt true if the death of the checker thread via a RuntimeException should halt + * the JVM; false otherwise. This should be true if the thread is running within a server + * and false if the thread is running within a client * @return a new instance with its checker thread started to clean up idle transports */ - static ThriftTransportPool startNew(LongSupplier maxAgeMillis) { - var pool = new ThriftTransportPool(maxAgeMillis); + static ThriftTransportPool startNew(LongSupplier maxAgeMillis, boolean shouldHalt) { + var pool = new ThriftTransportPool(maxAgeMillis, shouldHalt); log.debug("Set thrift transport pool idle time to {}ms", maxAgeMillis.getAsLong()); pool.checkThread.start(); return pool; diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java index 4366967c2e..a2a1f93fd4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java @@ -42,6 +42,7 @@ import java.util.function.Supplier; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.ThriftTransportPool; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; @@ -445,6 +446,11 @@ public class ServerContext extends ClientContext { return getClientTimeoutInMillis(); } + @Override + public synchronized ThriftTransportPool getTransportPool() { + return getTransportPoolImpl(true); + } + public AuditedSecurityOperation getSecurityOperation() { return securityOperation.get(); }