This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 0bc14b84e1 adds time outs to coordinator tserver RPCs (#5771)
0bc14b84e1 is described below
commit 0bc14b84e11e1bef319b469b4a0ce925417cd7ac
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 1 17:06:33 2025 -0400
adds time outs to coordinator tserver RPCs (#5771)
The coordinator was making RPCs to tservers w/ no timeout set on the TCP
connection. This could cause the coordinator to get stuck indefinitely on
problematic tservers causing external compactions to stop. Added a
timeout to the connection. Also modified the thread name to include the
tserver when making a RPC. Also made the coordinator more aggressive
when it does see a tserver RPC error, made it clear all summary info
related to that tserver instead of a subset. This more aggresive
clearing is meant to minimize threads going to a problem tserver.
---
.../accumulo/coordinator/CompactionCoordinator.java | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 873ce2abd9..0c51fbd4a7 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -455,6 +455,9 @@ public class CompactionCoordinator extends AbstractServer
implements
}
private void updateSummaries(TServerInstance tsi, Set<String> queuesSeen) {
+
+ String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(originalThreadName + " [tserver=" +
tsi.getHostPort() + "]");
try {
TabletClientService.Client client = null;
try {
@@ -480,6 +483,8 @@ public class CompactionCoordinator extends AbstractServer
implements
LOG.warn("Error getting external compaction summaries from tablet
server: {}",
tsi.getHostAndPort(), e);
QUEUE_SUMMARIES.remove(Set.of(tsi));
+ } finally {
+ Thread.currentThread().setName(originalThreadName);
}
}
@@ -546,6 +551,9 @@ public class CompactionCoordinator extends AbstractServer
implements
LOG.trace("Getting compaction for queue {} from tserver {}", queue,
tserver.getHostAndPort());
// Get a compaction from the tserver
TabletClientService.Client client = null;
+ String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread()
+ .setName(originalThreadName + " [tserver=" + tserver.getHostPort() +
"]");
try {
client = getTabletServerConnection(tserver);
if (client == null) {
@@ -578,10 +586,11 @@ public class CompactionCoordinator extends AbstractServer
implements
} catch (TException e) {
LOG.warn("Error from tserver {} while trying to reserve compaction,
trying next tserver",
ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e);
- QUEUE_SUMMARIES.removeSummary(tserver, queue, prioTserver.prio);
+ QUEUE_SUMMARIES.remove(Set.of(tserver));
prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
} finally {
ThriftUtil.returnClient(client, getContext());
+ Thread.currentThread().setName(originalThreadName);
}
}
@@ -609,8 +618,9 @@ public class CompactionCoordinator extends AbstractServer
implements
return null;
}
ServerContext serverContext = getContext();
- TTransport transport = serverContext.getTransportPool().getTransport(
- ThriftClientTypes.TABLET_SERVER, connection.getAddress(), 0,
serverContext, true);
+ TTransport transport =
+
serverContext.getTransportPool().getTransport(ThriftClientTypes.TABLET_SERVER,
+ connection.getAddress(), getContext().getClientTimeoutInMillis(),
serverContext, true);
return ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, transport);
}