This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new f9551d0e6d Ensure compaction duration is not reset on coordinator 
restart (#4667)
f9551d0e6d is described below

commit f9551d0e6dd44ba6566cd3281096c70ec8424550
Author: Dom G <domgargu...@apache.org>
AuthorDate: Fri Jun 14 14:39:03 2024 -0400

    Ensure compaction duration is not reset on coordinator restart (#4667)
    
    * Ensure compaction duration is not reset on coordinator restart by adding 
time tracking to FileCompactorRunnable
    * clean up old age calculation logic from monitor code and use age value 
directly
    * Improve accuracy of compactor time measurement in FileCompactor
---
 .../util/compaction/RunningCompactionInfo.java     |  21 ++---
 .../compaction/thrift/TCompactionStatusUpdate.java | 104 ++++++++++++++++++++-
 core/src/main/thrift/compaction-coordinator.thrift |   1 +
 .../accumulo/server/compaction/CompactionInfo.java |   6 +-
 .../accumulo/server/compaction/FileCompactor.java  |  17 +++-
 .../org/apache/accumulo/compactor/Compactor.java   |  36 +++++--
 .../apache/accumulo/compactor/CompactorTest.java   |   6 ++
 .../compaction/ExternalCompactionProgressIT.java   |  99 +++++++++++++++++++-
 .../compaction/ExternalDoNothingCompactor.java     |   6 ++
 9 files changed, 260 insertions(+), 36 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
index 53e838678c..baf8d1ddae 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.util.compaction;
 
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 import java.util.TreeMap;
 
@@ -62,37 +63,34 @@ public class RunningCompactionInfo {
 
     // parse the updates map
     long nowMillis = System.currentTimeMillis();
-    long startedMillis = nowMillis;
     float percent = 0f;
     long updateMillis;
     TCompactionStatusUpdate last;
 
     // sort updates by key, which is a timestamp
     TreeMap<Long,TCompactionStatusUpdate> sorted = new TreeMap<>(updates);
-    var firstEntry = sorted.firstEntry();
     var lastEntry = sorted.lastEntry();
-    if (firstEntry != null) {
-      startedMillis = firstEntry.getKey();
-    }
-    duration = nowMillis - startedMillis;
-    long durationMinutes = MILLISECONDS.toMinutes(duration);
-    if (durationMinutes > 15) {
-      log.warn("Compaction {} has been running for {} minutes", ecid, 
durationMinutes);
-    }
 
     // last entry is all we care about so bail if null
     if (lastEntry != null) {
       last = lastEntry.getValue();
       updateMillis = lastEntry.getKey();
+      duration = last.getCompactionAgeNanos();
     } else {
       log.debug("No updates found for {}", ecid);
       lastUpdate = 1;
       progress = percent;
       status = "na";
+      duration = 0;
       return;
     }
+    long durationMinutes = NANOSECONDS.toMinutes(duration);
+    if (durationMinutes > 15) {
+      log.warn("Compaction {} has been running for {} minutes", ecid, 
durationMinutes);
+    }
 
-    long sinceLastUpdateSeconds = MILLISECONDS.toSeconds(nowMillis - 
updateMillis);
+    lastUpdate = nowMillis - updateMillis;
+    long sinceLastUpdateSeconds = MILLISECONDS.toSeconds(lastUpdate);
     log.debug("Time since Last update {} - {} = {} seconds", nowMillis, 
updateMillis,
         sinceLastUpdateSeconds);
 
@@ -100,7 +98,6 @@ public class RunningCompactionInfo {
     if (total > 0) {
       percent = (last.getEntriesRead() / (float) total) * 100;
     }
-    lastUpdate = nowMillis - updateMillis;
     progress = percent;
 
     if (updates.isEmpty()) {
diff --git 
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java
 
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java
index 8fcfd7e468..5833f7cd79 100644
--- 
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java
+++ 
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java
@@ -33,6 +33,7 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
   private static final org.apache.thrift.protocol.TField 
ENTRIES_TO_BE_COMPACTED_FIELD_DESC = new 
org.apache.thrift.protocol.TField("entriesToBeCompacted", 
org.apache.thrift.protocol.TType.I64, (short)3);
   private static final org.apache.thrift.protocol.TField 
ENTRIES_READ_FIELD_DESC = new org.apache.thrift.protocol.TField("entriesRead", 
org.apache.thrift.protocol.TType.I64, (short)4);
   private static final org.apache.thrift.protocol.TField 
ENTRIES_WRITTEN_FIELD_DESC = new 
org.apache.thrift.protocol.TField("entriesWritten", 
org.apache.thrift.protocol.TType.I64, (short)5);
+  private static final org.apache.thrift.protocol.TField 
COMPACTION_AGE_NANOS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("compactionAgeNanos", 
org.apache.thrift.protocol.TType.I64, (short)6);
 
   private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new TCompactionStatusUpdateStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new TCompactionStatusUpdateTupleSchemeFactory();
@@ -46,6 +47,7 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
   public long entriesToBeCompacted; // required
   public long entriesRead; // required
   public long entriesWritten; // required
+  public long compactionAgeNanos; // required
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -57,7 +59,8 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
     MESSAGE((short)2, "message"),
     ENTRIES_TO_BE_COMPACTED((short)3, "entriesToBeCompacted"),
     ENTRIES_READ((short)4, "entriesRead"),
-    ENTRIES_WRITTEN((short)5, "entriesWritten");
+    ENTRIES_WRITTEN((short)5, "entriesWritten"),
+    COMPACTION_AGE_NANOS((short)6, "compactionAgeNanos");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new 
java.util.HashMap<java.lang.String, _Fields>();
 
@@ -83,6 +86,8 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
           return ENTRIES_READ;
         case 5: // ENTRIES_WRITTEN
           return ENTRIES_WRITTEN;
+        case 6: // COMPACTION_AGE_NANOS
+          return COMPACTION_AGE_NANOS;
         default:
           return null;
       }
@@ -129,6 +134,7 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
   private static final int __ENTRIESTOBECOMPACTED_ISSET_ID = 0;
   private static final int __ENTRIESREAD_ISSET_ID = 1;
   private static final int __ENTRIESWRITTEN_ISSET_ID = 2;
+  private static final int __COMPACTIONAGENANOS_ISSET_ID = 3;
   private byte __isset_bitfield = 0;
   public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -143,6 +149,8 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
         new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     tmpMap.put(_Fields.ENTRIES_WRITTEN, new 
org.apache.thrift.meta_data.FieldMetaData("entriesWritten", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
         new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.COMPACTION_AGE_NANOS, new 
org.apache.thrift.meta_data.FieldMetaData("compactionAgeNanos", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TCompactionStatusUpdate.class,
 metaDataMap);
   }
@@ -155,7 +163,8 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
     java.lang.String message,
     long entriesToBeCompacted,
     long entriesRead,
-    long entriesWritten)
+    long entriesWritten,
+    long compactionAgeNanos)
   {
     this();
     this.state = state;
@@ -166,6 +175,8 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
     setEntriesReadIsSet(true);
     this.entriesWritten = entriesWritten;
     setEntriesWrittenIsSet(true);
+    this.compactionAgeNanos = compactionAgeNanos;
+    setCompactionAgeNanosIsSet(true);
   }
 
   /**
@@ -182,6 +193,7 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
     this.entriesToBeCompacted = other.entriesToBeCompacted;
     this.entriesRead = other.entriesRead;
     this.entriesWritten = other.entriesWritten;
+    this.compactionAgeNanos = other.compactionAgeNanos;
   }
 
   @Override
@@ -199,6 +211,8 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
     this.entriesRead = 0;
     setEntriesWrittenIsSet(false);
     this.entriesWritten = 0;
+    setCompactionAgeNanosIsSet(false);
+    this.compactionAgeNanos = 0;
   }
 
   /**
@@ -328,6 +342,29 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
     __isset_bitfield = 
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, 
__ENTRIESWRITTEN_ISSET_ID, value);
   }
 
+  public long getCompactionAgeNanos() {
+    return this.compactionAgeNanos;
+  }
+
+  public TCompactionStatusUpdate setCompactionAgeNanos(long 
compactionAgeNanos) {
+    this.compactionAgeNanos = compactionAgeNanos;
+    setCompactionAgeNanosIsSet(true);
+    return this;
+  }
+
+  public void unsetCompactionAgeNanos() {
+    __isset_bitfield = 
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, 
__COMPACTIONAGENANOS_ISSET_ID);
+  }
+
+  /** Returns true if field compactionAgeNanos is set (has been assigned a 
value) and false otherwise */
+  public boolean isSetCompactionAgeNanos() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, 
__COMPACTIONAGENANOS_ISSET_ID);
+  }
+
+  public void setCompactionAgeNanosIsSet(boolean value) {
+    __isset_bitfield = 
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, 
__COMPACTIONAGENANOS_ISSET_ID, value);
+  }
+
   @Override
   public void setFieldValue(_Fields field, 
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
@@ -371,6 +408,14 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
       }
       break;
 
+    case COMPACTION_AGE_NANOS:
+      if (value == null) {
+        unsetCompactionAgeNanos();
+      } else {
+        setCompactionAgeNanos((java.lang.Long)value);
+      }
+      break;
+
     }
   }
 
@@ -393,6 +438,9 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
     case ENTRIES_WRITTEN:
       return getEntriesWritten();
 
+    case COMPACTION_AGE_NANOS:
+      return getCompactionAgeNanos();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -415,6 +463,8 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
       return isSetEntriesRead();
     case ENTRIES_WRITTEN:
       return isSetEntriesWritten();
+    case COMPACTION_AGE_NANOS:
+      return isSetCompactionAgeNanos();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -477,6 +527,15 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
         return false;
     }
 
+    boolean this_present_compactionAgeNanos = true;
+    boolean that_present_compactionAgeNanos = true;
+    if (this_present_compactionAgeNanos || that_present_compactionAgeNanos) {
+      if (!(this_present_compactionAgeNanos && 
that_present_compactionAgeNanos))
+        return false;
+      if (this.compactionAgeNanos != that.compactionAgeNanos)
+        return false;
+    }
+
     return true;
   }
 
@@ -498,6 +557,8 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
 
     hashCode = hashCode * 8191 + 
org.apache.thrift.TBaseHelper.hashCode(entriesWritten);
 
+    hashCode = hashCode * 8191 + 
org.apache.thrift.TBaseHelper.hashCode(compactionAgeNanos);
+
     return hashCode;
   }
 
@@ -559,6 +620,16 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetCompactionAgeNanos(), 
other.isSetCompactionAgeNanos());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetCompactionAgeNanos()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.compactionAgeNanos, 
other.compactionAgeNanos);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -610,6 +681,10 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
     sb.append("entriesWritten:");
     sb.append(this.entriesWritten);
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("compactionAgeNanos:");
+    sb.append(this.compactionAgeNanos);
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -697,6 +772,14 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 6: // COMPACTION_AGE_NANOS
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.compactionAgeNanos = iprot.readI64();
+              struct.setCompactionAgeNanosIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -732,6 +815,9 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
       oprot.writeFieldBegin(ENTRIES_WRITTEN_FIELD_DESC);
       oprot.writeI64(struct.entriesWritten);
       oprot.writeFieldEnd();
+      oprot.writeFieldBegin(COMPACTION_AGE_NANOS_FIELD_DESC);
+      oprot.writeI64(struct.compactionAgeNanos);
+      oprot.writeFieldEnd();
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -766,7 +852,10 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
       if (struct.isSetEntriesWritten()) {
         optionals.set(4);
       }
-      oprot.writeBitSet(optionals, 5);
+      if (struct.isSetCompactionAgeNanos()) {
+        optionals.set(5);
+      }
+      oprot.writeBitSet(optionals, 6);
       if (struct.isSetState()) {
         oprot.writeI32(struct.state.getValue());
       }
@@ -782,12 +871,15 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
       if (struct.isSetEntriesWritten()) {
         oprot.writeI64(struct.entriesWritten);
       }
+      if (struct.isSetCompactionAgeNanos()) {
+        oprot.writeI64(struct.compactionAgeNanos);
+      }
     }
 
     @Override
     public void read(org.apache.thrift.protocol.TProtocol prot, 
TCompactionStatusUpdate struct) throws org.apache.thrift.TException {
       org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
-      java.util.BitSet incoming = iprot.readBitSet(5);
+      java.util.BitSet incoming = iprot.readBitSet(6);
       if (incoming.get(0)) {
         struct.state = 
org.apache.accumulo.core.compaction.thrift.TCompactionState.findByValue(iprot.readI32());
         struct.setStateIsSet(true);
@@ -808,6 +900,10 @@ public class TCompactionStatusUpdate implements 
org.apache.thrift.TBase<TCompact
         struct.entriesWritten = iprot.readI64();
         struct.setEntriesWrittenIsSet(true);
       }
+      if (incoming.get(5)) {
+        struct.compactionAgeNanos = iprot.readI64();
+        struct.setCompactionAgeNanosIsSet(true);
+      }
     }
   }
 
diff --git a/core/src/main/thrift/compaction-coordinator.thrift 
b/core/src/main/thrift/compaction-coordinator.thrift
index 7cb090be45..f8c2d764fb 100644
--- a/core/src/main/thrift/compaction-coordinator.thrift
+++ b/core/src/main/thrift/compaction-coordinator.thrift
@@ -46,6 +46,7 @@ struct TCompactionStatusUpdate {
   3:i64 entriesToBeCompacted
   4:i64 entriesRead
   5:i64 entriesWritten
+  6:i64 compactionAgeNanos
 }
 
 struct TExternalCompaction {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
index 500653964d..b505c38cb9 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
@@ -98,8 +98,8 @@ public class CompactionInfo {
     }
     List<String> files = 
compactor.getFilesToCompact().stream().map(StoredTabletFile::getPathStr)
         .collect(Collectors.toList());
-    return new ActiveCompaction(compactor.extent.toThrift(),
-        System.currentTimeMillis() - compactor.getStartTime(), files, 
compactor.getOutputFile(),
-        type, reason, localityGroup, entriesRead, entriesWritten, iiList, 
iterOptions);
+    return new ActiveCompaction(compactor.extent.toThrift(), 
compactor.getAge().toMillis(), files,
+        compactor.getOutputFile(), type, reason, localityGroup, entriesRead, 
entriesWritten, iiList,
+        iterOptions);
   }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 3825a51d88..2645962887 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -122,7 +122,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
   // things to report
   private String currentLocalityGroup = "";
-  private final long startTime;
+  private volatile long startTime = -1;
 
   private final AtomicLong currentEntriesRead = new AtomicLong(0);
   private final AtomicLong currentEntriesWritten = new AtomicLong(0);
@@ -248,8 +248,6 @@ public class FileCompactor implements 
Callable<CompactionStats> {
     this.env = env;
     this.iterators = iterators;
     this.cryptoService = cs;
-
-    startTime = System.currentTimeMillis();
   }
 
   public VolumeManager getVolumeManager() {
@@ -280,6 +278,8 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
     CompactionStats majCStats = new CompactionStats();
 
+    startTime = System.nanoTime();
+
     boolean remove = runningCompactions.add(this);
 
     String threadStartDate = dateFormatter.format(new Date());
@@ -570,8 +570,15 @@ public class FileCompactor implements 
Callable<CompactionStats> {
     return currentEntriesWritten.get();
   }
 
-  long getStartTime() {
-    return startTime;
+  /**
+   * @return the duration since {@link #call()} was called
+   */
+  Duration getAge() {
+    if (startTime == -1) {
+      // call() has not been called yet
+      return Duration.ZERO;
+    }
+    return Duration.ofNanos(System.nanoTime() - startTime);
   }
 
   Iterable<IteratorSetting> getIterators() {
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index b82358581a..10ce776891 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.net.UnknownHostException;
 import java.security.SecureRandom;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -134,6 +135,8 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     void initialize() throws RetriesExceededException;
 
     AtomicReference<FileCompactor> getFileCompactor();
+
+    Duration getCompactionAge();
   }
 
   private static final SecureRandom random = new SecureRandom();
@@ -515,13 +518,15 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
 
     return new FileCompactorRunnable() {
 
-      private AtomicReference<FileCompactor> compactor = new 
AtomicReference<>();
+      private final AtomicReference<FileCompactor> compactor = new 
AtomicReference<>();
+      private volatile long startTimeNanos = -1;
 
       @Override
       public void initialize() throws RetriesExceededException {
         LOG.info("Starting up compaction runnable for job: {}", job);
-        TCompactionStatusUpdate update =
-            new TCompactionStatusUpdate(TCompactionState.STARTED, "Compaction 
started", -1, -1, -1);
+        this.startTimeNanos = System.nanoTime();
+        TCompactionStatusUpdate update = new 
TCompactionStatusUpdate(TCompactionState.STARTED,
+            "Compaction started", -1, -1, -1, getCompactionAge().toNanos());
         updateCompactionState(job, update);
         final var extent = KeyExtent.fromThrift(job.getExtent());
         final AccumuloConfiguration aConfig;
@@ -589,7 +594,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
           LOG.info("Compaction completed successfully {} ", 
job.getExternalCompactionId());
           // Update state when completed
           TCompactionStatusUpdate update2 = new 
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
-              "Compaction completed successfully", -1, -1, -1);
+              "Compaction completed successfully", -1, -1, -1, 
this.getCompactionAge().toNanos());
           updateCompactionState(job, update2);
         } catch (FileCompactor.CompactionCanceledException cce) {
           LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
@@ -605,6 +610,15 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
         }
       }
 
+      @Override
+      public Duration getCompactionAge() {
+        if (startTimeNanos == -1) {
+          // compaction hasn't started yet
+          return Duration.ZERO;
+        }
+        return Duration.ofNanos(System.nanoTime() - startTimeNanos);
+      }
+
     };
 
   }
@@ -760,9 +774,9 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
                 watcher.run();
                 try {
                   LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
-                  TCompactionStatusUpdate update =
-                      new 
TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message,
-                          inputEntries, entriesRead, entriesWritten);
+                  TCompactionStatusUpdate update = new TCompactionStatusUpdate(
+                      TCompactionState.IN_PROGRESS, message, inputEntries, 
entriesRead,
+                      entriesWritten, fcr.getCompactionAge().toNanos());
                   updateCompactionState(job, update);
                 } catch (RetriesExceededException e) {
                   LOG.warn("Error updating coordinator with compaction 
progress, error: {}",
@@ -789,8 +803,9 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
               || (err.get() != null && 
err.get().getClass().equals(InterruptedException.class))) {
             LOG.warn("Compaction thread was interrupted, sending CANCELLED 
state");
             try {
-              TCompactionStatusUpdate update = new TCompactionStatusUpdate(
-                  TCompactionState.CANCELLED, "Compaction cancelled", -1, -1, 
-1);
+              TCompactionStatusUpdate update =
+                  new TCompactionStatusUpdate(TCompactionState.CANCELLED, 
"Compaction cancelled",
+                      -1, -1, -1, fcr.getCompactionAge().toNanos());
               updateCompactionState(job, update);
               updateCompactionFailed(job);
             } catch (RetriesExceededException e) {
@@ -804,7 +819,8 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
               LOG.info("Updating coordinator with compaction failure: id: {}, 
extent: {}",
                   job.getExternalCompactionId(), fromThriftExtent);
               TCompactionStatusUpdate update = new 
TCompactionStatusUpdate(TCompactionState.FAILED,
-                  "Compaction failed due to: " + err.get().getMessage(), -1, 
-1, -1);
+                  "Compaction failed due to: " + err.get().getMessage(), -1, 
-1, -1,
+                  fcr.getCompactionAge().toNanos());
               updateCompactionState(job, update);
               updateCompactionFailed(job);
             } catch (RetriesExceededException e) {
diff --git 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 8a8da9ae89..1ca2f147eb 100644
--- 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.net.UnknownHostException;
+import java.time.Duration;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -107,6 +108,11 @@ public class CompactorTest {
       return new AtomicReference<>(compactor);
     }
 
+    @Override
+    public Duration getCompactionAge() {
+      return Duration.ZERO;
+    }
+
     @Override
     public void run() {
       try {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index f3f8864bb3..89f887251b 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.test.compaction;
 
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.accumulo.core.util.UtilWaitThread.sleep;
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
@@ -29,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -46,12 +49,15 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.compaction.thrift.TCompactionState;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -119,6 +125,93 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
     cfg.setSystemProperties(sysProps);
   }
 
+  @Test
+  public void testCompactionDurationContinuesAfterCoordinatorStop() throws 
Exception {
+    String table = this.getUniqueNames(1)[0];
+
+    try (AccumuloClient client =
+        Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
+      createTable(client, table, "cs1");
+      writeData(client, table, ROWS);
+
+      cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1);
+      
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
+
+      IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
+      SlowIterator.setSleepTime(setting, 5);
+      client.tableOperations().attachIterator(table, setting,
+          EnumSet.of(IteratorUtil.IteratorScope.majc));
+
+      log.info("Compacting table");
+      compact(client, table, 2, QUEUE1, false);
+
+      // Wait until the compaction starts
+      Wait.waitFor(() -> {
+        Map<String,TExternalCompaction> compactions =
+            
getRunningCompactions(getCluster().getServerContext()).getCompactions();
+        return compactions == null || compactions.isEmpty();
+      }, 30_000, 100, "Compaction did not start within the expected time");
+
+      // start a timer after the compaction starts
+      long compactionStartTime = System.nanoTime();
+
+      // let the compaction advance a bit
+      sleepUninterruptibly(6, TimeUnit.SECONDS);
+
+      // Stop the coordinator
+      log.info("Stopping the coordinator");
+      
cluster.getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+
+      sleepUninterruptibly(5, TimeUnit.SECONDS);
+
+      log.info("Restarting the coordinator");
+      
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
+      long coordinatorRestartTime = System.nanoTime();
+
+      // Wait for compactions to be present
+      Map<String,TExternalCompaction> metrics = null;
+      while (metrics == null) {
+        try {
+          metrics = 
getRunningCompactions(getCluster().getServerContext()).getCompactions();
+        } catch (TException e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+
+      // let the compaction advance a bit
+      sleepUninterruptibly(6, TimeUnit.SECONDS);
+
+      TExternalCompaction updatedCompaction = 
getRunningCompactions(getCluster().getServerContext())
+          .getCompactions().values().iterator().next();
+      RunningCompactionInfo updatedCompactionInfo = new 
RunningCompactionInfo(updatedCompaction);
+
+      final Duration reportedCompactionDuration = 
Duration.ofNanos(updatedCompactionInfo.duration);
+      final Duration measuredCompactionDuration =
+          Duration.ofNanos(System.nanoTime() - compactionStartTime);
+      final Duration coordinatorAge = Duration.ofNanos(System.nanoTime() - 
coordinatorRestartTime);
+      log.info(
+          "Coordinator age: {}s. Measured compaction duration: {}s. Reported 
compaction duration: {}s",
+          coordinatorAge.toSeconds(), measuredCompactionDuration.toSeconds(),
+          reportedCompactionDuration.toSeconds());
+
+      assertTrue(coordinatorAge.compareTo(reportedCompactionDuration) < 0,
+          "Reported compaction age should be greater than the coordinator 
age");
+
+      // Verify that the reported duration is approximately equal to the 
elapsed time
+      Duration tolerance = Duration.ofSeconds(7);
+      long reportedVsMeasuredDiff =
+          
Math.abs(reportedCompactionDuration.minus(measuredCompactionDuration).toNanos());
+      assertTrue(reportedVsMeasuredDiff <= tolerance.toNanos(),
+          String.format(
+              "Reported duration (%s) and elapsed time (%s) differ by more 
than the tolerance (%s)",
+              reportedCompactionDuration.toSeconds(), 
measuredCompactionDuration.toSeconds(),
+              tolerance.toSeconds()));
+    } finally {
+      getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+      
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+    }
+  }
+
   @Test
   public void testProgressViaMetrics() throws Exception {
     String table = this.getUniqueNames(1)[0];
@@ -348,13 +441,15 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
    * Check running compaction progress.
    */
   private void checkRunning() throws TException {
-    var ecList = getRunningCompactions(getCluster().getServerContext());
-    var ecMap = ecList.getCompactions();
+    TExternalCompactionList ecList = 
getRunningCompactions(getCluster().getServerContext());
+    Map<String,TExternalCompaction> ecMap = ecList.getCompactions();
     if (ecMap != null) {
       ecMap.forEach((ecid, ec) -> {
         // returns null if it's a new mapping
         RunningCompactionInfo rci = new RunningCompactionInfo(ec);
         RunningCompactionInfo previousRci = runningMap.put(ecid, rci);
+        log.debug("ECID {} has been running for {} seconds", ecid,
+            NANOSECONDS.toSeconds(rci.duration));
         if (previousRci == null) {
           log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles);
         } else {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index a97d8a37b4..08fc5fbb8d 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.test.compaction;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicReference;
@@ -70,6 +71,11 @@ public class ExternalDoNothingCompactor extends Compactor 
implements Iface {
         return ref;
       }
 
+      @Override
+      public Duration getCompactionAge() {
+        return Duration.ZERO;
+      }
+
       @Override
       public void run() {
         try {

Reply via email to