This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new f9551d0e6d Ensure compaction duration is not reset on coordinator restart (#4667) f9551d0e6d is described below commit f9551d0e6dd44ba6566cd3281096c70ec8424550 Author: Dom G <domgargu...@apache.org> AuthorDate: Fri Jun 14 14:39:03 2024 -0400 Ensure compaction duration is not reset on coordinator restart (#4667) * Ensure compaction duration is not reset on coordinator restart by adding time tracking to FileCompactorRunnable * clean up old age calculation logic from monitor code and use age value directly * Improve accuracy of compactor time measurement in FileCompactor --- .../util/compaction/RunningCompactionInfo.java | 21 ++--- .../compaction/thrift/TCompactionStatusUpdate.java | 104 ++++++++++++++++++++- core/src/main/thrift/compaction-coordinator.thrift | 1 + .../accumulo/server/compaction/CompactionInfo.java | 6 +- .../accumulo/server/compaction/FileCompactor.java | 17 +++- .../org/apache/accumulo/compactor/Compactor.java | 36 +++++-- .../apache/accumulo/compactor/CompactorTest.java | 6 ++ .../compaction/ExternalCompactionProgressIT.java | 99 +++++++++++++++++++- .../compaction/ExternalDoNothingCompactor.java | 6 ++ 9 files changed, 260 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java index 53e838678c..baf8d1ddae 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java @@ -20,6 +20,7 @@ package org.apache.accumulo.core.util.compaction; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import java.util.TreeMap; @@ -62,37 +63,34 @@ public class RunningCompactionInfo { // parse the updates map long nowMillis = System.currentTimeMillis(); - long startedMillis = nowMillis; float percent = 0f; long updateMillis; TCompactionStatusUpdate last; // sort updates by key, which is a timestamp TreeMap<Long,TCompactionStatusUpdate> sorted = new TreeMap<>(updates); - var firstEntry = sorted.firstEntry(); var lastEntry = sorted.lastEntry(); - if (firstEntry != null) { - startedMillis = firstEntry.getKey(); - } - duration = nowMillis - startedMillis; - long durationMinutes = MILLISECONDS.toMinutes(duration); - if (durationMinutes > 15) { - log.warn("Compaction {} has been running for {} minutes", ecid, durationMinutes); - } // last entry is all we care about so bail if null if (lastEntry != null) { last = lastEntry.getValue(); updateMillis = lastEntry.getKey(); + duration = last.getCompactionAgeNanos(); } else { log.debug("No updates found for {}", ecid); lastUpdate = 1; progress = percent; status = "na"; + duration = 0; return; } + long durationMinutes = NANOSECONDS.toMinutes(duration); + if (durationMinutes > 15) { + log.warn("Compaction {} has been running for {} minutes", ecid, durationMinutes); + } - long sinceLastUpdateSeconds = MILLISECONDS.toSeconds(nowMillis - updateMillis); + lastUpdate = nowMillis - updateMillis; + long sinceLastUpdateSeconds = MILLISECONDS.toSeconds(lastUpdate); log.debug("Time since Last update {} - {} = {} seconds", nowMillis, updateMillis, sinceLastUpdateSeconds); @@ -100,7 +98,6 @@ public class RunningCompactionInfo { if (total > 0) { percent = (last.getEntriesRead() / (float) total) * 100; } - lastUpdate = nowMillis - updateMillis; progress = percent; if (updates.isEmpty()) { diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java index 8fcfd7e468..5833f7cd79 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java @@ -33,6 +33,7 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact private static final org.apache.thrift.protocol.TField ENTRIES_TO_BE_COMPACTED_FIELD_DESC = new org.apache.thrift.protocol.TField("entriesToBeCompacted", org.apache.thrift.protocol.TType.I64, (short)3); private static final org.apache.thrift.protocol.TField ENTRIES_READ_FIELD_DESC = new org.apache.thrift.protocol.TField("entriesRead", org.apache.thrift.protocol.TType.I64, (short)4); private static final org.apache.thrift.protocol.TField ENTRIES_WRITTEN_FIELD_DESC = new org.apache.thrift.protocol.TField("entriesWritten", org.apache.thrift.protocol.TType.I64, (short)5); + private static final org.apache.thrift.protocol.TField COMPACTION_AGE_NANOS_FIELD_DESC = new org.apache.thrift.protocol.TField("compactionAgeNanos", org.apache.thrift.protocol.TType.I64, (short)6); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new TCompactionStatusUpdateStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new TCompactionStatusUpdateTupleSchemeFactory(); @@ -46,6 +47,7 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact public long entriesToBeCompacted; // required public long entriesRead; // required public long entriesWritten; // required + public long compactionAgeNanos; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -57,7 +59,8 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact MESSAGE((short)2, "message"), ENTRIES_TO_BE_COMPACTED((short)3, "entriesToBeCompacted"), ENTRIES_READ((short)4, "entriesRead"), - ENTRIES_WRITTEN((short)5, "entriesWritten"); + ENTRIES_WRITTEN((short)5, "entriesWritten"), + COMPACTION_AGE_NANOS((short)6, "compactionAgeNanos"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -83,6 +86,8 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact return ENTRIES_READ; case 5: // ENTRIES_WRITTEN return ENTRIES_WRITTEN; + case 6: // COMPACTION_AGE_NANOS + return COMPACTION_AGE_NANOS; default: return null; } @@ -129,6 +134,7 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact private static final int __ENTRIESTOBECOMPACTED_ISSET_ID = 0; private static final int __ENTRIESREAD_ISSET_ID = 1; private static final int __ENTRIESWRITTEN_ISSET_ID = 2; + private static final int __COMPACTIONAGENANOS_ISSET_ID = 3; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -143,6 +149,8 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); tmpMap.put(_Fields.ENTRIES_WRITTEN, new org.apache.thrift.meta_data.FieldMetaData("entriesWritten", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.COMPACTION_AGE_NANOS, new org.apache.thrift.meta_data.FieldMetaData("compactionAgeNanos", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TCompactionStatusUpdate.class, metaDataMap); } @@ -155,7 +163,8 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact java.lang.String message, long entriesToBeCompacted, long entriesRead, - long entriesWritten) + long entriesWritten, + long compactionAgeNanos) { this(); this.state = state; @@ -166,6 +175,8 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact setEntriesReadIsSet(true); this.entriesWritten = entriesWritten; setEntriesWrittenIsSet(true); + this.compactionAgeNanos = compactionAgeNanos; + setCompactionAgeNanosIsSet(true); } /** @@ -182,6 +193,7 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact this.entriesToBeCompacted = other.entriesToBeCompacted; this.entriesRead = other.entriesRead; this.entriesWritten = other.entriesWritten; + this.compactionAgeNanos = other.compactionAgeNanos; } @Override @@ -199,6 +211,8 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact this.entriesRead = 0; setEntriesWrittenIsSet(false); this.entriesWritten = 0; + setCompactionAgeNanosIsSet(false); + this.compactionAgeNanos = 0; } /** @@ -328,6 +342,29 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ENTRIESWRITTEN_ISSET_ID, value); } + public long getCompactionAgeNanos() { + return this.compactionAgeNanos; + } + + public TCompactionStatusUpdate setCompactionAgeNanos(long compactionAgeNanos) { + this.compactionAgeNanos = compactionAgeNanos; + setCompactionAgeNanosIsSet(true); + return this; + } + + public void unsetCompactionAgeNanos() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __COMPACTIONAGENANOS_ISSET_ID); + } + + /** Returns true if field compactionAgeNanos is set (has been assigned a value) and false otherwise */ + public boolean isSetCompactionAgeNanos() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __COMPACTIONAGENANOS_ISSET_ID); + } + + public void setCompactionAgeNanosIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __COMPACTIONAGENANOS_ISSET_ID, value); + } + @Override public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { @@ -371,6 +408,14 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact } break; + case COMPACTION_AGE_NANOS: + if (value == null) { + unsetCompactionAgeNanos(); + } else { + setCompactionAgeNanos((java.lang.Long)value); + } + break; + } } @@ -393,6 +438,9 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact case ENTRIES_WRITTEN: return getEntriesWritten(); + case COMPACTION_AGE_NANOS: + return getCompactionAgeNanos(); + } throw new java.lang.IllegalStateException(); } @@ -415,6 +463,8 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact return isSetEntriesRead(); case ENTRIES_WRITTEN: return isSetEntriesWritten(); + case COMPACTION_AGE_NANOS: + return isSetCompactionAgeNanos(); } throw new java.lang.IllegalStateException(); } @@ -477,6 +527,15 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact return false; } + boolean this_present_compactionAgeNanos = true; + boolean that_present_compactionAgeNanos = true; + if (this_present_compactionAgeNanos || that_present_compactionAgeNanos) { + if (!(this_present_compactionAgeNanos && that_present_compactionAgeNanos)) + return false; + if (this.compactionAgeNanos != that.compactionAgeNanos) + return false; + } + return true; } @@ -498,6 +557,8 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(entriesWritten); + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(compactionAgeNanos); + return hashCode; } @@ -559,6 +620,16 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetCompactionAgeNanos(), other.isSetCompactionAgeNanos()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCompactionAgeNanos()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.compactionAgeNanos, other.compactionAgeNanos); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -610,6 +681,10 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact sb.append("entriesWritten:"); sb.append(this.entriesWritten); first = false; + if (!first) sb.append(", "); + sb.append("compactionAgeNanos:"); + sb.append(this.compactionAgeNanos); + first = false; sb.append(")"); return sb.toString(); } @@ -697,6 +772,14 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 6: // COMPACTION_AGE_NANOS + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.compactionAgeNanos = iprot.readI64(); + struct.setCompactionAgeNanosIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -732,6 +815,9 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact oprot.writeFieldBegin(ENTRIES_WRITTEN_FIELD_DESC); oprot.writeI64(struct.entriesWritten); oprot.writeFieldEnd(); + oprot.writeFieldBegin(COMPACTION_AGE_NANOS_FIELD_DESC); + oprot.writeI64(struct.compactionAgeNanos); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -766,7 +852,10 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact if (struct.isSetEntriesWritten()) { optionals.set(4); } - oprot.writeBitSet(optionals, 5); + if (struct.isSetCompactionAgeNanos()) { + optionals.set(5); + } + oprot.writeBitSet(optionals, 6); if (struct.isSetState()) { oprot.writeI32(struct.state.getValue()); } @@ -782,12 +871,15 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact if (struct.isSetEntriesWritten()) { oprot.writeI64(struct.entriesWritten); } + if (struct.isSetCompactionAgeNanos()) { + oprot.writeI64(struct.compactionAgeNanos); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, TCompactionStatusUpdate struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(5); + java.util.BitSet incoming = iprot.readBitSet(6); if (incoming.get(0)) { struct.state = org.apache.accumulo.core.compaction.thrift.TCompactionState.findByValue(iprot.readI32()); struct.setStateIsSet(true); @@ -808,6 +900,10 @@ public class TCompactionStatusUpdate implements org.apache.thrift.TBase<TCompact struct.entriesWritten = iprot.readI64(); struct.setEntriesWrittenIsSet(true); } + if (incoming.get(5)) { + struct.compactionAgeNanos = iprot.readI64(); + struct.setCompactionAgeNanosIsSet(true); + } } } diff --git a/core/src/main/thrift/compaction-coordinator.thrift b/core/src/main/thrift/compaction-coordinator.thrift index 7cb090be45..f8c2d764fb 100644 --- a/core/src/main/thrift/compaction-coordinator.thrift +++ b/core/src/main/thrift/compaction-coordinator.thrift @@ -46,6 +46,7 @@ struct TCompactionStatusUpdate { 3:i64 entriesToBeCompacted 4:i64 entriesRead 5:i64 entriesWritten + 6:i64 compactionAgeNanos } struct TExternalCompaction { diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java index 500653964d..b505c38cb9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java @@ -98,8 +98,8 @@ public class CompactionInfo { } List<String> files = compactor.getFilesToCompact().stream().map(StoredTabletFile::getPathStr) .collect(Collectors.toList()); - return new ActiveCompaction(compactor.extent.toThrift(), - System.currentTimeMillis() - compactor.getStartTime(), files, compactor.getOutputFile(), - type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions); + return new ActiveCompaction(compactor.extent.toThrift(), compactor.getAge().toMillis(), files, + compactor.getOutputFile(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, + iterOptions); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 3825a51d88..2645962887 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -122,7 +122,7 @@ public class FileCompactor implements Callable<CompactionStats> { // things to report private String currentLocalityGroup = ""; - private final long startTime; + private volatile long startTime = -1; private final AtomicLong currentEntriesRead = new AtomicLong(0); private final AtomicLong currentEntriesWritten = new AtomicLong(0); @@ -248,8 +248,6 @@ public class FileCompactor implements Callable<CompactionStats> { this.env = env; this.iterators = iterators; this.cryptoService = cs; - - startTime = System.currentTimeMillis(); } public VolumeManager getVolumeManager() { @@ -280,6 +278,8 @@ public class FileCompactor implements Callable<CompactionStats> { CompactionStats majCStats = new CompactionStats(); + startTime = System.nanoTime(); + boolean remove = runningCompactions.add(this); String threadStartDate = dateFormatter.format(new Date()); @@ -570,8 +570,15 @@ public class FileCompactor implements Callable<CompactionStats> { return currentEntriesWritten.get(); } - long getStartTime() { - return startTime; + /** + * @return the duration since {@link #call()} was called + */ + Duration getAge() { + if (startTime == -1) { + // call() has not been called yet + return Duration.ZERO; + } + return Duration.ofNanos(System.nanoTime() - startTime); } Iterable<IteratorSetting> getIterators() { diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index b82358581a..10ce776891 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.UnknownHostException; import java.security.SecureRandom; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -134,6 +135,8 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac void initialize() throws RetriesExceededException; AtomicReference<FileCompactor> getFileCompactor(); + + Duration getCompactionAge(); } private static final SecureRandom random = new SecureRandom(); @@ -515,13 +518,15 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac return new FileCompactorRunnable() { - private AtomicReference<FileCompactor> compactor = new AtomicReference<>(); + private final AtomicReference<FileCompactor> compactor = new AtomicReference<>(); + private volatile long startTimeNanos = -1; @Override public void initialize() throws RetriesExceededException { LOG.info("Starting up compaction runnable for job: {}", job); - TCompactionStatusUpdate update = - new TCompactionStatusUpdate(TCompactionState.STARTED, "Compaction started", -1, -1, -1); + this.startTimeNanos = System.nanoTime(); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.STARTED, + "Compaction started", -1, -1, -1, getCompactionAge().toNanos()); updateCompactionState(job, update); final var extent = KeyExtent.fromThrift(job.getExtent()); final AccumuloConfiguration aConfig; @@ -589,7 +594,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); // Update state when completed TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, - "Compaction completed successfully", -1, -1, -1); + "Compaction completed successfully", -1, -1, -1, this.getCompactionAge().toNanos()); updateCompactionState(job, update2); } catch (FileCompactor.CompactionCanceledException cce) { LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); @@ -605,6 +610,15 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac } } + @Override + public Duration getCompactionAge() { + if (startTimeNanos == -1) { + // compaction hasn't started yet + return Duration.ZERO; + } + return Duration.ofNanos(System.nanoTime() - startTimeNanos); + } + }; } @@ -760,9 +774,9 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac watcher.run(); try { LOG.debug("Updating coordinator with compaction progress: {}.", message); - TCompactionStatusUpdate update = - new TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message, - inputEntries, entriesRead, entriesWritten); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, + entriesWritten, fcr.getCompactionAge().toNanos()); updateCompactionState(job, update); } catch (RetriesExceededException e) { LOG.warn("Error updating coordinator with compaction progress, error: {}", @@ -789,8 +803,9 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); try { - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.CANCELLED, "Compaction cancelled", -1, -1, -1); + TCompactionStatusUpdate update = + new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", + -1, -1, -1, fcr.getCompactionAge().toNanos()); updateCompactionState(job, update); updateCompactionFailed(job); } catch (RetriesExceededException e) { @@ -804,7 +819,8 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", job.getExternalCompactionId(), fromThriftExtent); TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED, - "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1); + "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, + fcr.getCompactionAge().toNanos()); updateCompactionState(job, update); updateCompactionFailed(job); } catch (RetriesExceededException e) { diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 8a8da9ae89..1ca2f147eb 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.net.UnknownHostException; +import java.time.Duration; import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -107,6 +108,11 @@ public class CompactorTest { return new AtomicReference<>(compactor); } + @Override + public Duration getCompactionAge() { + return Duration.ZERO; + } + @Override public void run() { try { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index f3f8864bb3..89f887251b 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.test.compaction; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.accumulo.core.util.UtilWaitThread.sleep; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; @@ -29,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.Duration; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -46,12 +49,15 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.harness.AccumuloClusterHarness; @@ -119,6 +125,93 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { cfg.setSystemProperties(sysProps); } + @Test + public void testCompactionDurationContinuesAfterCoordinatorStop() throws Exception { + String table = this.getUniqueNames(1)[0]; + + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + createTable(client, table, "cs1"); + writeData(client, table, ROWS); + + cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); + cluster.getClusterControl().startCoordinator(CompactionCoordinator.class); + + IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); + SlowIterator.setSleepTime(setting, 5); + client.tableOperations().attachIterator(table, setting, + EnumSet.of(IteratorUtil.IteratorScope.majc)); + + log.info("Compacting table"); + compact(client, table, 2, QUEUE1, false); + + // Wait until the compaction starts + Wait.waitFor(() -> { + Map<String,TExternalCompaction> compactions = + getRunningCompactions(getCluster().getServerContext()).getCompactions(); + return compactions == null || compactions.isEmpty(); + }, 30_000, 100, "Compaction did not start within the expected time"); + + // start a timer after the compaction starts + long compactionStartTime = System.nanoTime(); + + // let the compaction advance a bit + sleepUninterruptibly(6, TimeUnit.SECONDS); + + // Stop the coordinator + log.info("Stopping the coordinator"); + cluster.getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR); + + sleepUninterruptibly(5, TimeUnit.SECONDS); + + log.info("Restarting the coordinator"); + cluster.getClusterControl().startCoordinator(CompactionCoordinator.class); + long coordinatorRestartTime = System.nanoTime(); + + // Wait for compactions to be present + Map<String,TExternalCompaction> metrics = null; + while (metrics == null) { + try { + metrics = getRunningCompactions(getCluster().getServerContext()).getCompactions(); + } catch (TException e) { + UtilWaitThread.sleep(250); + } + } + + // let the compaction advance a bit + sleepUninterruptibly(6, TimeUnit.SECONDS); + + TExternalCompaction updatedCompaction = getRunningCompactions(getCluster().getServerContext()) + .getCompactions().values().iterator().next(); + RunningCompactionInfo updatedCompactionInfo = new RunningCompactionInfo(updatedCompaction); + + final Duration reportedCompactionDuration = Duration.ofNanos(updatedCompactionInfo.duration); + final Duration measuredCompactionDuration = + Duration.ofNanos(System.nanoTime() - compactionStartTime); + final Duration coordinatorAge = Duration.ofNanos(System.nanoTime() - coordinatorRestartTime); + log.info( + "Coordinator age: {}s. Measured compaction duration: {}s. Reported compaction duration: {}s", + coordinatorAge.toSeconds(), measuredCompactionDuration.toSeconds(), + reportedCompactionDuration.toSeconds()); + + assertTrue(coordinatorAge.compareTo(reportedCompactionDuration) < 0, + "Reported compaction age should be greater than the coordinator age"); + + // Verify that the reported duration is approximately equal to the elapsed time + Duration tolerance = Duration.ofSeconds(7); + long reportedVsMeasuredDiff = + Math.abs(reportedCompactionDuration.minus(measuredCompactionDuration).toNanos()); + assertTrue(reportedVsMeasuredDiff <= tolerance.toNanos(), + String.format( + "Reported duration (%s) and elapsed time (%s) differ by more than the tolerance (%s)", + reportedCompactionDuration.toSeconds(), measuredCompactionDuration.toSeconds(), + tolerance.toSeconds())); + } finally { + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR); + } + } + @Test public void testProgressViaMetrics() throws Exception { String table = this.getUniqueNames(1)[0]; @@ -348,13 +441,15 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { * Check running compaction progress. */ private void checkRunning() throws TException { - var ecList = getRunningCompactions(getCluster().getServerContext()); - var ecMap = ecList.getCompactions(); + TExternalCompactionList ecList = getRunningCompactions(getCluster().getServerContext()); + Map<String,TExternalCompaction> ecMap = ecList.getCompactions(); if (ecMap != null) { ecMap.forEach((ecid, ec) -> { // returns null if it's a new mapping RunningCompactionInfo rci = new RunningCompactionInfo(ec); RunningCompactionInfo previousRci = runningMap.put(ecid, rci); + log.debug("ECID {} has been running for {} seconds", ecid, + NANOSECONDS.toSeconds(rci.duration)); if (previousRci == null) { log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles); } else { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index a97d8a37b4..08fc5fbb8d 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@ -20,6 +20,7 @@ package org.apache.accumulo.test.compaction; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; @@ -70,6 +71,11 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface { return ref; } + @Override + public Duration getCompactionAge() { + return Duration.ZERO; + } + @Override public void run() { try {