This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new cd87901aa2 ThriftTransportPool checker thread change (#5613)
cd87901aa2 is described below
commit cd87901aa21c5634a5b9324729eabd958e9608aa
Author: Kevin Rathbun <[email protected]>
AuthorDate: Thu Jun 12 08:58:40 2025 -0400
ThriftTransportPool checker thread change (#5613)
* ThriftTransportPool checker thread change:
Now critical if running in a server process and non-critical if running
in a client process. Was previously always non-critical
* log warn on InterruptedException for same thread
---
.../accumulo/core/clientImpl/ClientContext.java | 7 ++++++-
.../core/clientImpl/ThriftTransportPool.java | 24 +++++++++++++++++-----
.../org/apache/accumulo/server/ServerContext.java | 6 ++++++
3 files changed, 31 insertions(+), 6 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 2b154c0519..c2bcabe01e 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -1118,9 +1118,14 @@ public class ClientContext implements AccumuloClient {
}
public synchronized ThriftTransportPool getTransportPool() {
+ return getTransportPoolImpl(false);
+ }
+
+ protected synchronized ThriftTransportPool getTransportPoolImpl(boolean
shouldHalt) {
ensureOpen();
if (thriftTransportPool == null) {
- thriftTransportPool =
ThriftTransportPool.startNew(this::getTransportPoolMaxAgeMillis);
+ thriftTransportPool =
+ ThriftTransportPool.startNew(this::getTransportPoolMaxAgeMillis,
shouldHalt);
}
return thriftTransportPool;
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index ceb35c5471..75db7bffed 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@ -71,9 +71,19 @@ public class ThriftTransportPool {
private final LongSupplier maxAgeMillis;
- private ThriftTransportPool(LongSupplier maxAgeMillis) {
+ private ThriftTransportPool(LongSupplier maxAgeMillis, boolean shouldHalt) {
this.maxAgeMillis = maxAgeMillis;
- this.checkThread = Threads.createNonCriticalThread("Thrift Connection Pool
Checker", () -> {
+ final String threadName = "Thrift Connection Pool Checker";
+ final Runnable checkRunner = thriftConnectionPoolChecker();
+ if (shouldHalt) {
+ this.checkThread = Threads.createCriticalThread(threadName, checkRunner);
+ } else {
+ this.checkThread = Threads.createNonCriticalThread(threadName,
checkRunner);
+ }
+ }
+
+ private Runnable thriftConnectionPoolChecker() {
+ return () -> {
try {
final long minNanos = MILLISECONDS.toNanos(250);
final long maxNanos = MINUTES.toNanos(1);
@@ -90,21 +100,25 @@ public class ThriftTransportPool {
}
}
} catch (InterruptedException e) {
+ log.warn("Thread " + Thread.currentThread().getName() + " was
interrupted. Exiting.");
Thread.currentThread().interrupt();
} catch (TransportPoolShutdownException e) {
log.debug("Error closing expired connections", e);
}
- });
+ };
}
/**
* Create a new instance and start its checker thread, returning the
instance.
*
* @param maxAgeMillis the supplier for the max age of idle transports
before they are cleaned up
+ * @param shouldHalt true if the death of the checker thread via a
RuntimeException should halt
+ * the JVM; false otherwise. This should be true if the thread is
running within a server
+ * and false if the thread is running within a client
* @return a new instance with its checker thread started to clean up idle
transports
*/
- static ThriftTransportPool startNew(LongSupplier maxAgeMillis) {
- var pool = new ThriftTransportPool(maxAgeMillis);
+ static ThriftTransportPool startNew(LongSupplier maxAgeMillis, boolean
shouldHalt) {
+ var pool = new ThriftTransportPool(maxAgeMillis, shouldHalt);
log.debug("Set thrift transport pool idle time to {}ms",
maxAgeMillis.getAsLong());
pool.checkThread.start();
return pool;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index 4366967c2e..a2a1f93fd4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -42,6 +42,7 @@ import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -445,6 +446,11 @@ public class ServerContext extends ClientContext {
return getClientTimeoutInMillis();
}
+ @Override
+ public synchronized ThriftTransportPool getTransportPool() {
+ return getTransportPoolImpl(true);
+ }
+
public AuditedSecurityOperation getSecurityOperation() {
return securityOperation.get();
}