This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new b26d59d0bf removes CoordinatorSummaryLogger (#6237)
b26d59d0bf is described below

commit b26d59d0bf76e81c12d03c38e5204b8ee7bbc1e5
Author: Keith Turner <[email protected]>
AuthorDate: Wed Mar 25 09:53:22 2026 -0700

    removes CoordinatorSummaryLogger (#6237)
    
    This logger was useful in 2.1.  However in 4.0 its redundant with
    functionality in the monitor and is just extra code to maintain.
    Removing it also removes a usage of the running cache which may be
    helpful for #6217
---
 .../coordinator/CompactionCoordinator.java         | 10 ---
 .../coordinator/CoordinatorSummaryLogger.java      | 88 ----------------------
 .../compaction/CompactionCoordinatorTest.java      |  3 -
 .../accumulo/monitor/next/SystemInformation.java   | 12 +++
 4 files changed, 12 insertions(+), 101 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 37300e0b7d..b53ccdf6e6 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -362,7 +362,6 @@ public class CompactionCoordinator
     }
 
     startDeadCompactionDetector();
-    startQueueRunningSummaryLogging();
     startFailureSummaryLogging();
     startInternalStateCleaner(ctx.getScheduledExecutor());
 
@@ -768,15 +767,6 @@ public class CompactionCoordinator
     failingTables.compute(extent.tableId(), FailureCounts::incrementFailure);
   }
 
-  protected void startQueueRunningSummaryLogging() {
-    CoordinatorSummaryLogger summaryLogger =
-        new CoordinatorSummaryLogger(ctx, this.jobQueues, this.RUNNING_CACHE, 
compactorCounts);
-
-    ScheduledFuture<?> future = ctx.getScheduledExecutor()
-        .scheduleWithFixedDelay(summaryLogger::logSummary, 0, 1, 
TimeUnit.MINUTES);
-    ThreadPools.watchNonCriticalScheduledTask(future);
-  }
-
   protected void startFailureSummaryLogging() {
     ScheduledFuture<?> future =
         ctx.getScheduledExecutor().scheduleWithFixedDelay(this::printStats, 0, 
5, TimeUnit.MINUTES);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
deleted file mode 100644
index 2d71c2d796..0000000000
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.manager.compaction.coordinator;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
-import org.apache.accumulo.core.data.ResourceGroupId;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
-import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
-import org.apache.accumulo.server.ServerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.benmanes.caffeine.cache.Cache;
-
-public class CoordinatorSummaryLogger {
-  private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorSummaryLogger.class);
-
-  private final ServerContext ctx;
-  private final CompactionJobQueues jobQueues;
-  private final Map<ExternalCompactionId,TExternalCompaction> running;
-  private final Cache<ResourceGroupId,Integer> compactorCounts;
-
-  public CoordinatorSummaryLogger(ServerContext ctx, CompactionJobQueues 
jobQueues,
-      Map<ExternalCompactionId,TExternalCompaction> running,
-      Cache<ResourceGroupId,Integer> compactorCounts) {
-    this.ctx = ctx;
-    this.jobQueues = jobQueues;
-    this.running = running;
-    this.compactorCounts = compactorCounts;
-  }
-
-  public void logSummary() {
-
-    final Map<ResourceGroupId,AtomicLong> perQueueRunningCount = new 
HashMap<>();
-    final Map<String,AtomicLong> perTableRunningCount = new HashMap<>();
-
-    running.values().forEach(rc -> {
-      TableId tid = KeyExtent.fromThrift(rc.getJob().getExtent()).tableId();
-      String tableName = null;
-      try {
-        tableName = ctx.getQualifiedTableName(tid);
-      } catch (TableNotFoundException e) {
-        tableName = "Unmapped table id: " + tid.canonical();
-      }
-      perQueueRunningCount
-          .computeIfAbsent(ResourceGroupId.of(rc.getGroupName()), q -> new 
AtomicLong(0))
-          .incrementAndGet();
-      perTableRunningCount.computeIfAbsent(tableName, t -> new 
AtomicLong(0)).incrementAndGet();
-    });
-
-    perQueueRunningCount.forEach((groupId, count) -> {
-      LOG.info(
-          "Queue {}: compactors: {}, queued majc (minimum, possibly higher): 
{}, running majc: {}",
-          groupId, compactorCounts.asMap().getOrDefault(groupId, 0),
-          // This map only contains the highest priority for each tserver. So 
when tservers have
-          // other priorities that need to compact or have more than one 
compaction for a
-          // priority level this count will be lower than the actual number of 
queued.
-          jobQueues.getQueuedJobs(groupId), count.get());
-
-    });
-    perTableRunningCount
-        .forEach((t, count) -> LOG.info("Running compactions for table {}: 
{}", t, count));
-  }
-
-}
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index e22ef9b7b4..3aa3f848d6 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -117,9 +117,6 @@ public class CompactionCoordinatorTest {
       return 3;
     }
 
-    @Override
-    protected void startQueueRunningSummaryLogging() {}
-
     @Override
     protected void startFailureSummaryLogging() {}
 
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
index 27b22d1700..e7291a877e 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
@@ -400,6 +401,9 @@ public class SystemInformation {
   protected final Map<String,TimeOrderedRunningCompactionSet> 
longRunningCompactionsByRg =
       new ConcurrentHashMap<>();
 
+  protected final Map<TableId,LongAdder> runningCompactionsPerTable = new 
ConcurrentHashMap<>();
+  protected final Map<String,LongAdder> runningCompactionsPerGroup = new 
ConcurrentHashMap<>();
+
   // Table Information
   private final Map<TableId,TableSummary> tables = new ConcurrentHashMap<>();
   private final Map<TableId,List<TabletInformation>> tablets = new 
ConcurrentHashMap<>();
@@ -441,6 +445,8 @@ public class SystemInformation {
     tablets.clear();
     deployment.clear();
     suggestions.clear();
+    runningCompactionsPerGroup.clear();
+    runningCompactionsPerTable.clear();
     scanServerView = null;
   }
 
@@ -538,6 +544,12 @@ public class SystemInformation {
   }
 
   public void processExternalCompaction(TExternalCompaction tec) {
+
+    var tableId = KeyExtent.fromThrift(tec.getJob().extent).tableId();
+    runningCompactionsPerTable.computeIfAbsent(tableId, t -> new 
LongAdder()).increment();
+    runningCompactionsPerGroup.computeIfAbsent(tec.getGroupName(), t -> new 
LongAdder())
+        .increment();
+
     this.longRunningCompactionsByRg.computeIfAbsent(tec.getGroupName(),
         k -> new 
TimeOrderedRunningCompactionSet(rgLongRunningCompactionSize)).add(tec);
   }

Reply via email to