This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new bd184af6fd Added summary of queued and running compactions to 
coordinator (#5989)
bd184af6fd is described below

commit bd184af6fdf06ca7f90355de9e763189775df388
Author: Dave Marion <[email protected]>
AuthorDate: Tue Nov 25 07:43:46 2025 -0500

    Added summary of queued and running compactions to coordinator (#5989)
    
    This commit adds periodic logging of queued and running external
    compaction information to the coordinator. The logging is emitted
    by a new class, CoordinatorSummaryLogger, so that users can easily
    redirect this log to a new file in the logging configuration.
    
    At each interval this new class will log the number of compactions
    running for each table, and will log the number of compactors,
    queued compactions and running compactions for each compaction queue.
    
    The number of queued compactions is an estimate as each tablet server
    only reports up to 100 different compaction priorities to conserve
    memory space in the Coordinator (see ExternalCompactionExecutor.summarize).
    
    The metrics are a more accurate source of the number of queued external
    compactions, but that requires aggregating all of the METRICS_MAJC_QUEUED
    Meters from all of the TabletServers.
    
    Related to #5965
    
    
    Co-authored-by: Christopher Tubbs <[email protected]>
---
 .../coordinator/CompactionCoordinator.java         | 11 ++-
 .../coordinator/CoordinatorSummaryLogger.java      | 78 ++++++++++++++++++++++
 .../coordinator/CompactionCoordinatorTest.java     |  3 +
 3 files changed, 91 insertions(+), 1 deletion(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 0c51fbd4a7..387b1559b3 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -168,6 +168,7 @@ public class CompactionCoordinator extends AbstractServer 
implements
   protected final AccumuloConfiguration aconf;
   protected CompactionFinalizer compactionFinalizer;
   protected LiveTServerSet tserverSet;
+  private final CoordinatorSummaryLogger summaryLogger;
 
   private ServiceLock coordinatorLock;
 
@@ -187,8 +188,9 @@ public class CompactionCoordinator extends AbstractServer 
implements
     printStartupMsg();
     startCompactionCleaner();
     startRunningCleaner();
-    compactorCounts = Caffeine.newBuilder().expireAfterWrite(30, 
TimeUnit.SECONDS)
+    compactorCounts = Caffeine.newBuilder().expireAfterWrite(2, 
TimeUnit.MINUTES)
         .build(queue -> ExternalCompactionUtil.countCompactors(queue, 
getContext()));
+    summaryLogger = new CoordinatorSummaryLogger(super.getContext(), 
compactorCounts);
   }
 
   @Override
@@ -345,6 +347,7 @@ public class CompactionCoordinator extends AbstractServer 
implements
 
     tserverSet.startListeningForTabletServerChanges();
     startDeadCompactionDetector();
+    startQueueRunningSummaryLogging();
     startFailureSummaryLogging();
 
     LOG.info("Starting loop to check tservers for compaction summaries");
@@ -685,6 +688,12 @@ public class CompactionCoordinator extends AbstractServer 
implements
     failingTables.compute(extent.tableId(), FailureCounts::incrementFailure);
   }
 
+  protected void startQueueRunningSummaryLogging() {
+    ScheduledFuture<?> future = getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(summaryLogger::logSummary, 0, 1, 
TimeUnit.MINUTES);
+    ThreadPools.watchNonCriticalScheduledTask(future);
+  }
+
   protected void startFailureSummaryLogging() {
     ScheduledFuture<?> future = getContext().getScheduledExecutor()
         .scheduleWithFixedDelay(this::printStats, 0, 5, TimeUnit.MINUTES);
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorSummaryLogger.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorSummaryLogger.java
new file mode 100644
index 0000000000..388d100b74
--- /dev/null
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorSummaryLogger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.server.ServerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+
+public class CoordinatorSummaryLogger {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorSummaryLogger.class);
+
+  private static final TreeMap<Short,TreeSet<TServerInstance>> EMPTY = new 
TreeMap<>();
+  private final ServerContext ctx;
+  private final Cache<String,Integer> compactorCounts;
+
+  public CoordinatorSummaryLogger(ServerContext ctx, Cache<String,Integer> 
compactorCounts) {
+    this.ctx = ctx;
+    this.compactorCounts = compactorCounts;
+  }
+
+  public void logSummary() {
+
+    final Map<TableId,String> tableMap = ctx.getTableIdToNameMap();
+    final Map<String,AtomicLong> perQueueRunningCount = new HashMap<>();
+    final Map<String,AtomicLong> perTableRunningCount = new HashMap<>();
+
+    CompactionCoordinator.RUNNING_CACHE.values().forEach(rc -> {
+      TableId tid = KeyExtent.fromThrift(rc.getJob().getExtent()).tableId();
+      String tableName = tableMap.getOrDefault(tid, "Unmapped table id: " + 
tid.canonical());
+      perQueueRunningCount.computeIfAbsent(rc.getQueueName(), q -> new 
AtomicLong(0))
+          .incrementAndGet();
+      perTableRunningCount.computeIfAbsent(tableName, t -> new 
AtomicLong(0)).incrementAndGet();
+    });
+
+    perQueueRunningCount.forEach((q, count) -> {
+      LOG.info(
+          "Queue {}: compactors: {}, queued majc (minimum, possibly higher): 
{}, running majc: {}",
+          q, compactorCounts.asMap().getOrDefault(q, 0),
+          // This map only contains the highest priority for each tserver. So 
when tservers have
+          // other priorities that need to compact or have more than one 
compaction for a
+          // priority level this count will be lower than the actual number of 
queued.
+          CompactionCoordinator.QUEUE_SUMMARIES.QUEUES.getOrDefault(q, 
EMPTY).values().stream()
+              .mapToLong(TreeSet::size).sum(),
+          count.get());
+
+    });
+    perTableRunningCount
+        .forEach((t, count) -> LOG.info("Running compactions for table {}: 
{}", t, count));
+  }
+
+}
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index a38bcb61a0..a74ee54286 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -117,6 +117,9 @@ public class CompactionCoordinatorTest {
     @Override
     protected void startDeadCompactionDetector() {}
 
+    @Override
+    protected void startQueueRunningSummaryLogging() {}
+
     @Override
     protected void startFailureSummaryLogging() {}
 

Reply via email to