This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new b26d59d0bf removes CoordinatorSummaryLogger (#6237)
b26d59d0bf is described below
commit b26d59d0bf76e81c12d03c38e5204b8ee7bbc1e5
Author: Keith Turner <[email protected]>
AuthorDate: Wed Mar 25 09:53:22 2026 -0700
removes CoordinatorSummaryLogger (#6237)
This logger was useful in 2.1. However in 4.0 its redundant with
functionality in the monitor and is just extra code to maintain.
Removing it also removes a usage of the running cache which may be
helpful for #6217
---
.../coordinator/CompactionCoordinator.java | 10 ---
.../coordinator/CoordinatorSummaryLogger.java | 88 ----------------------
.../compaction/CompactionCoordinatorTest.java | 3 -
.../accumulo/monitor/next/SystemInformation.java | 12 +++
4 files changed, 12 insertions(+), 101 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 37300e0b7d..b53ccdf6e6 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -362,7 +362,6 @@ public class CompactionCoordinator
}
startDeadCompactionDetector();
- startQueueRunningSummaryLogging();
startFailureSummaryLogging();
startInternalStateCleaner(ctx.getScheduledExecutor());
@@ -768,15 +767,6 @@ public class CompactionCoordinator
failingTables.compute(extent.tableId(), FailureCounts::incrementFailure);
}
- protected void startQueueRunningSummaryLogging() {
- CoordinatorSummaryLogger summaryLogger =
- new CoordinatorSummaryLogger(ctx, this.jobQueues, this.RUNNING_CACHE,
compactorCounts);
-
- ScheduledFuture<?> future = ctx.getScheduledExecutor()
- .scheduleWithFixedDelay(summaryLogger::logSummary, 0, 1,
TimeUnit.MINUTES);
- ThreadPools.watchNonCriticalScheduledTask(future);
- }
-
protected void startFailureSummaryLogging() {
ScheduledFuture<?> future =
ctx.getScheduledExecutor().scheduleWithFixedDelay(this::printStats, 0,
5, TimeUnit.MINUTES);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
deleted file mode 100644
index 2d71c2d796..0000000000
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.manager.compaction.coordinator;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
-import org.apache.accumulo.core.data.ResourceGroupId;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
-import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
-import org.apache.accumulo.server.ServerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.benmanes.caffeine.cache.Cache;
-
-public class CoordinatorSummaryLogger {
- private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorSummaryLogger.class);
-
- private final ServerContext ctx;
- private final CompactionJobQueues jobQueues;
- private final Map<ExternalCompactionId,TExternalCompaction> running;
- private final Cache<ResourceGroupId,Integer> compactorCounts;
-
- public CoordinatorSummaryLogger(ServerContext ctx, CompactionJobQueues
jobQueues,
- Map<ExternalCompactionId,TExternalCompaction> running,
- Cache<ResourceGroupId,Integer> compactorCounts) {
- this.ctx = ctx;
- this.jobQueues = jobQueues;
- this.running = running;
- this.compactorCounts = compactorCounts;
- }
-
- public void logSummary() {
-
- final Map<ResourceGroupId,AtomicLong> perQueueRunningCount = new
HashMap<>();
- final Map<String,AtomicLong> perTableRunningCount = new HashMap<>();
-
- running.values().forEach(rc -> {
- TableId tid = KeyExtent.fromThrift(rc.getJob().getExtent()).tableId();
- String tableName = null;
- try {
- tableName = ctx.getQualifiedTableName(tid);
- } catch (TableNotFoundException e) {
- tableName = "Unmapped table id: " + tid.canonical();
- }
- perQueueRunningCount
- .computeIfAbsent(ResourceGroupId.of(rc.getGroupName()), q -> new
AtomicLong(0))
- .incrementAndGet();
- perTableRunningCount.computeIfAbsent(tableName, t -> new
AtomicLong(0)).incrementAndGet();
- });
-
- perQueueRunningCount.forEach((groupId, count) -> {
- LOG.info(
- "Queue {}: compactors: {}, queued majc (minimum, possibly higher):
{}, running majc: {}",
- groupId, compactorCounts.asMap().getOrDefault(groupId, 0),
- // This map only contains the highest priority for each tserver. So
when tservers have
- // other priorities that need to compact or have more than one
compaction for a
- // priority level this count will be lower than the actual number of
queued.
- jobQueues.getQueuedJobs(groupId), count.get());
-
- });
- perTableRunningCount
- .forEach((t, count) -> LOG.info("Running compactions for table {}:
{}", t, count));
- }
-
-}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index e22ef9b7b4..3aa3f848d6 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -117,9 +117,6 @@ public class CompactionCoordinatorTest {
return 3;
}
- @Override
- protected void startQueueRunningSummaryLogging() {}
-
@Override
protected void startFailureSummaryLogging() {}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
index 27b22d1700..e7291a877e 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
import org.apache.accumulo.core.Constants;
@@ -400,6 +401,9 @@ public class SystemInformation {
protected final Map<String,TimeOrderedRunningCompactionSet>
longRunningCompactionsByRg =
new ConcurrentHashMap<>();
+ protected final Map<TableId,LongAdder> runningCompactionsPerTable = new
ConcurrentHashMap<>();
+ protected final Map<String,LongAdder> runningCompactionsPerGroup = new
ConcurrentHashMap<>();
+
// Table Information
private final Map<TableId,TableSummary> tables = new ConcurrentHashMap<>();
private final Map<TableId,List<TabletInformation>> tablets = new
ConcurrentHashMap<>();
@@ -441,6 +445,8 @@ public class SystemInformation {
tablets.clear();
deployment.clear();
suggestions.clear();
+ runningCompactionsPerGroup.clear();
+ runningCompactionsPerTable.clear();
scanServerView = null;
}
@@ -538,6 +544,12 @@ public class SystemInformation {
}
public void processExternalCompaction(TExternalCompaction tec) {
+
+ var tableId = KeyExtent.fromThrift(tec.getJob().extent).tableId();
+ runningCompactionsPerTable.computeIfAbsent(tableId, t -> new
LongAdder()).increment();
+ runningCompactionsPerGroup.computeIfAbsent(tec.getGroupName(), t -> new
LongAdder())
+ .increment();
+
this.longRunningCompactionsByRg.computeIfAbsent(tec.getGroupName(),
k -> new
TimeOrderedRunningCompactionSet(rgLongRunningCompactionSize)).add(tec);
}