This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 0bc14b84e1 adds time outs to coordinator tserver RPCs (#5771)
0bc14b84e1 is described below

commit 0bc14b84e11e1bef319b469b4a0ce925417cd7ac
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 1 17:06:33 2025 -0400

    adds time outs to coordinator tserver RPCs (#5771)
    
    The coordinator was making RPCs to tservers w/ no timeout set on the TCP
    connection. This could cause the coordinator to get stuck indefinitely on
    problematic tservers causing external compactions to stop.  Added a
    timeout to the connection.  Also modified the thread name to include the
    tserver when making a RPC.  Also made the coordinator more aggressive
    when it does see a tserver RPC error, made it clear all summary info
    related to that tserver instead of a subset.  This more aggresive
    clearing is meant to minimize threads going to a problem tserver.
---
 .../accumulo/coordinator/CompactionCoordinator.java      | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 873ce2abd9..0c51fbd4a7 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -455,6 +455,9 @@ public class CompactionCoordinator extends AbstractServer 
implements
   }
 
   private void updateSummaries(TServerInstance tsi, Set<String> queuesSeen) {
+
+    String originalThreadName = Thread.currentThread().getName();
+    Thread.currentThread().setName(originalThreadName + " [tserver=" + 
tsi.getHostPort() + "]");
     try {
       TabletClientService.Client client = null;
       try {
@@ -480,6 +483,8 @@ public class CompactionCoordinator extends AbstractServer 
implements
       LOG.warn("Error getting external compaction summaries from tablet 
server: {}",
           tsi.getHostAndPort(), e);
       QUEUE_SUMMARIES.remove(Set.of(tsi));
+    } finally {
+      Thread.currentThread().setName(originalThreadName);
     }
   }
 
@@ -546,6 +551,9 @@ public class CompactionCoordinator extends AbstractServer 
implements
       LOG.trace("Getting compaction for queue {} from tserver {}", queue, 
tserver.getHostAndPort());
       // Get a compaction from the tserver
       TabletClientService.Client client = null;
+      String originalThreadName = Thread.currentThread().getName();
+      Thread.currentThread()
+          .setName(originalThreadName + " [tserver=" + tserver.getHostPort() + 
"]");
       try {
         client = getTabletServerConnection(tserver);
         if (client == null) {
@@ -578,10 +586,11 @@ public class CompactionCoordinator extends AbstractServer 
implements
       } catch (TException e) {
         LOG.warn("Error from tserver {} while trying to reserve compaction, 
trying next tserver",
             
ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e);
-        QUEUE_SUMMARIES.removeSummary(tserver, queue, prioTserver.prio);
+        QUEUE_SUMMARIES.remove(Set.of(tserver));
         prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
       } finally {
         ThriftUtil.returnClient(client, getContext());
+        Thread.currentThread().setName(originalThreadName);
       }
     }
 
@@ -609,8 +618,9 @@ public class CompactionCoordinator extends AbstractServer 
implements
       return null;
     }
     ServerContext serverContext = getContext();
-    TTransport transport = serverContext.getTransportPool().getTransport(
-        ThriftClientTypes.TABLET_SERVER, connection.getAddress(), 0, 
serverContext, true);
+    TTransport transport =
+        
serverContext.getTransportPool().getTransport(ThriftClientTypes.TABLET_SERVER,
+            connection.getAddress(), getContext().getClientTimeoutInMillis(), 
serverContext, true);
     return ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, transport);
   }
 

Reply via email to