This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 05c2f45042 Reduced warning logs under normal conditions in compaction
coordinator (#4362)
05c2f45042 is described below
commit 05c2f45042ee91a5fe04702caa77ab19f78c0f9a
Author: Dave Marion <[email protected]>
AuthorDate: Wed Mar 13 11:52:47 2024 -0400
Reduced warning logs under normal conditions in compaction coordinator
(#4362)
Fixes #4219
---
.../coordinator/CompactionCoordinator.java | 32 ++++++++++++++++++++--
.../accumulo/coordinator/QueueSummaries.java | 8 ++++++
.../coordinator/CompactionCoordinatorTest.java | 6 ++++
3 files changed, 43 insertions(+), 3 deletions(-)
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index f4819ebefb..b0ec498a9e 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -23,6 +23,7 @@ import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -303,9 +304,12 @@ public class CompactionCoordinator extends AbstractServer
updateSummaries();
long now = System.currentTimeMillis();
- TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> {
- if ((now - v) > getMissingCompactorWarningTime()) {
- LOG.warn("No compactors have checked in with coordinator for queue
{} in {}ms", k,
+
+ Map<String,List<HostAndPort>> idleCompactors = getIdleCompactors();
+ TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> {
+ if ((now - lastCheckTime) > getMissingCompactorWarningTime()
+ && QUEUE_SUMMARIES.isCompactionsQueued(queue) &&
idleCompactors.containsKey(queue)) {
+ LOG.warn("No compactors have checked in with coordinator for queue
{} in {}ms", queue,
getMissingCompactorWarningTime());
}
});
@@ -321,6 +325,28 @@ public class CompactionCoordinator extends AbstractServer
LOG.info("Shutting down");
}
+ private Map<String,List<HostAndPort>> getIdleCompactors() {
+
+ Map<String,List<HostAndPort>> allCompactors =
+ ExternalCompactionUtil.getCompactorAddrs(getContext());
+
+ Set<String> emptyQueues = new HashSet<>();
+
+ // Remove all of the compactors that are running a compaction
+ RUNNING_CACHE.values().forEach(rc -> {
+ List<HostAndPort> busyCompactors = allCompactors.get(rc.getQueueName());
+ if (busyCompactors != null
+ &&
busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) {
+ if (busyCompactors.isEmpty()) {
+ emptyQueues.add(rc.getQueueName());
+ }
+ }
+ });
+ // Remove entries with empty queues
+ emptyQueues.forEach(e -> allCompactors.remove(e));
+ return allCompactors;
+ }
+
private void updateSummaries() {
ExecutorService executor =
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
"Compaction Summary Gatherer", false);
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
index 6edb2c0f36..1d89cd0321 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
@@ -100,6 +100,14 @@ public class QueueSummaries {
}
}
+ synchronized boolean isCompactionsQueued(String queue) {
+ var q = QUEUES.get(queue);
+ if (q == null) {
+ return false;
+ }
+ return !q.isEmpty();
+ }
+
synchronized PrioTserver getNextTserver(String queue) {
Entry<Short,TreeSet<TServerInstance>> entry = getNextTserverEntry(queue);
diff --git
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 117d50108a..87e7471bef 100644
---
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -214,6 +214,7 @@ public class CompactionCoordinatorTest {
var coordinator = new TestCoordinator(null, null, null, null, context,
null);
// Should be equal to 3 * 15_000 milliseconds
assertEquals(45_000, coordinator.getMissingCompactorWarningTime());
+ coordinator.close();
}
@Test
@@ -231,6 +232,7 @@ public class CompactionCoordinatorTest {
List<RunningCompaction> runningCompactions = new ArrayList<>();
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
+
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
CompactionFinalizer finalizer =
PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
@@ -284,6 +286,7 @@ public class CompactionCoordinatorTest {
List<RunningCompaction> runningCompactions = new ArrayList<>();
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
+
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
CompactionFinalizer finalizer =
PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
@@ -363,6 +366,7 @@ public class CompactionCoordinatorTest {
List<RunningCompaction> runningCompactions = new ArrayList<>();
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
+
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
@@ -443,6 +447,7 @@ public class CompactionCoordinatorTest {
runningCompactions.add(new RunningCompaction(job,
tserverAddress.toString(), "queue"));
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
+
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
@@ -508,6 +513,7 @@ public class CompactionCoordinatorTest {
List<RunningCompaction> runningCompactions = new ArrayList<>();
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
+
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
CompactionFinalizer finalizer =
PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);