This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 91bbf13e69 Set compaction start time in Coordinator from Compactor msg 
(#6225)
91bbf13e69 is described below

commit 91bbf13e691f63711a6223ce630bb76bd75c73ee
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 19 10:22:40 2026 -0400

    Set compaction start time in Coordinator from Compactor msg (#6225)
    
    In #6221 we removed the RunningCompaction object in favor
    of using TExternalCompaction. The RunningCompaction object
    was setting its start time variable when the STARTED update
    msg was added. This behavior was lost in #6221. This change
    adds it back by setting the start time on the TExternalCompaction
    object when the STARTED msg is received.
---
 .../main/java/org/apache/accumulo/compactor/Compactor.java | 14 +++++++++++---
 .../compaction/coordinator/CompactionCoordinator.java      |  3 +++
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 92239742ab..82301f9817 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -473,7 +473,12 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
    */
   protected void updateCompactionState(TExternalCompactionJob job, 
TCompactionStatusUpdate update)
       throws RetriesExceededException {
-    JOB_HOLDER.getCurrentCompaction().putToUpdates(System.currentTimeMillis(), 
update);
+    long updateTime = System.currentTimeMillis();
+    TExternalCompaction tec = JOB_HOLDER.getCurrentCompaction();
+    if (update.getState() == TCompactionState.STARTED) {
+      tec.setStartTime(updateTime);
+    }
+    tec.putToUpdates(updateTime, update);
     RetryableThriftCall<String> thriftCall =
         new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25, 
() -> {
           Client coordinatorClient = getCoordinatorClient();
@@ -481,7 +486,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
             LOG.trace("Attempting to update compaction state in coordinator 
{}",
                 job.getExternalCompactionId());
             coordinatorClient.updateCompactionStatus(TraceUtil.traceInfo(), 
getContext().rpcCreds(),
-                job.getExternalCompactionId(), update, 
System.currentTimeMillis());
+                job.getExternalCompactionId(), update, updateTime);
             return "";
           } finally {
             ThriftUtil.returnClient(coordinatorClient, getContext());
@@ -902,6 +907,10 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
         current.setCompactor(clientAddress.toString());
         current.setGroupName(getResourceGroup().canonical());
         current.setJob(job);
+        // start time for the current compaction is set when the
+        // STARTED msg is sent to the coordinator. The coordinator
+        // updates its copy of the TExternalCompaction when it
+        // receives the STARTED msg.
         JOB_HOLDER.set(current, compactionThread, fcr.getFileCompactor());
 
         try {
@@ -924,7 +933,6 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
           }
 
           compactionThread.start(); // start the compactionThread
-          current.setStartTime(System.currentTimeMillis());
           started.await(); // wait until the compactor is started
           final long inputEntries = totalInputEntries.sum();
           final long waitTime = 
calculateProgressCheckTime(totalInputBytes.sum());
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 0729f09169..6c31302fc0 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -1055,6 +1055,9 @@ public class CompactionCoordinator
       tec.putToUpdates(timestamp, update);
       switch (update.state) {
         case STARTED:
+          // Start time is used by the comparator, so set it first
+          // before adding to the LONG_RUNNING_COMPACTIONS_BY_RG object.
+          tec.setStartTime(timestamp);
           LONG_RUNNING_COMPACTIONS_BY_RG
               .computeIfAbsent(tec.getGroupName(), k -> new 
TimeOrderedRunningCompactionSet())
               .add(tec);

Reply via email to