This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new f9551d0e6d Ensure compaction duration is not reset on coordinator
restart (#4667)
f9551d0e6d is described below
commit f9551d0e6dd44ba6566cd3281096c70ec8424550
Author: Dom G <[email protected]>
AuthorDate: Fri Jun 14 14:39:03 2024 -0400
Ensure compaction duration is not reset on coordinator restart (#4667)
* Ensure compaction duration is not reset on coordinator restart by adding
time tracking to FileCompactorRunnable
* clean up old age calculation logic from monitor code and use age value
directly
* Improve accuracy of compactor time measurement in FileCompactor
---
.../util/compaction/RunningCompactionInfo.java | 21 ++---
.../compaction/thrift/TCompactionStatusUpdate.java | 104 ++++++++++++++++++++-
core/src/main/thrift/compaction-coordinator.thrift | 1 +
.../accumulo/server/compaction/CompactionInfo.java | 6 +-
.../accumulo/server/compaction/FileCompactor.java | 17 +++-
.../org/apache/accumulo/compactor/Compactor.java | 36 +++++--
.../apache/accumulo/compactor/CompactorTest.java | 6 ++
.../compaction/ExternalCompactionProgressIT.java | 99 +++++++++++++++++++-
.../compaction/ExternalDoNothingCompactor.java | 6 ++
9 files changed, 260 insertions(+), 36 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
index 53e838678c..baf8d1ddae 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.util.compaction;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.TreeMap;
@@ -62,37 +63,34 @@ public class RunningCompactionInfo {
// parse the updates map
long nowMillis = System.currentTimeMillis();
- long startedMillis = nowMillis;
float percent = 0f;
long updateMillis;
TCompactionStatusUpdate last;
// sort updates by key, which is a timestamp
TreeMap<Long,TCompactionStatusUpdate> sorted = new TreeMap<>(updates);
- var firstEntry = sorted.firstEntry();
var lastEntry = sorted.lastEntry();
- if (firstEntry != null) {
- startedMillis = firstEntry.getKey();
- }
- duration = nowMillis - startedMillis;
- long durationMinutes = MILLISECONDS.toMinutes(duration);
- if (durationMinutes > 15) {
- log.warn("Compaction {} has been running for {} minutes", ecid,
durationMinutes);
- }
// last entry is all we care about so bail if null
if (lastEntry != null) {
last = lastEntry.getValue();
updateMillis = lastEntry.getKey();
+ duration = last.getCompactionAgeNanos();
} else {
log.debug("No updates found for {}", ecid);
lastUpdate = 1;
progress = percent;
status = "na";
+ duration = 0;
return;
}
+ long durationMinutes = NANOSECONDS.toMinutes(duration);
+ if (durationMinutes > 15) {
+ log.warn("Compaction {} has been running for {} minutes", ecid,
durationMinutes);
+ }
- long sinceLastUpdateSeconds = MILLISECONDS.toSeconds(nowMillis -
updateMillis);
+ lastUpdate = nowMillis - updateMillis;
+ long sinceLastUpdateSeconds = MILLISECONDS.toSeconds(lastUpdate);
log.debug("Time since Last update {} - {} = {} seconds", nowMillis,
updateMillis,
sinceLastUpdateSeconds);
@@ -100,7 +98,6 @@ public class RunningCompactionInfo {
if (total > 0) {
percent = (last.getEntriesRead() / (float) total) * 100;
}
- lastUpdate = nowMillis - updateMillis;
progress = percent;
if (updates.isEmpty()) {
diff --git
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java
index 8fcfd7e468..5833f7cd79 100644
---
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java
+++
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/TCompactionStatusUpdate.java
@@ -33,6 +33,7 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
private static final org.apache.thrift.protocol.TField
ENTRIES_TO_BE_COMPACTED_FIELD_DESC = new
org.apache.thrift.protocol.TField("entriesToBeCompacted",
org.apache.thrift.protocol.TType.I64, (short)3);
private static final org.apache.thrift.protocol.TField
ENTRIES_READ_FIELD_DESC = new org.apache.thrift.protocol.TField("entriesRead",
org.apache.thrift.protocol.TType.I64, (short)4);
private static final org.apache.thrift.protocol.TField
ENTRIES_WRITTEN_FIELD_DESC = new
org.apache.thrift.protocol.TField("entriesWritten",
org.apache.thrift.protocol.TType.I64, (short)5);
+ private static final org.apache.thrift.protocol.TField
COMPACTION_AGE_NANOS_FIELD_DESC = new
org.apache.thrift.protocol.TField("compactionAgeNanos",
org.apache.thrift.protocol.TType.I64, (short)6);
private static final org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new TCompactionStatusUpdateStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new TCompactionStatusUpdateTupleSchemeFactory();
@@ -46,6 +47,7 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
public long entriesToBeCompacted; // required
public long entriesRead; // required
public long entriesWritten; // required
+ public long compactionAgeNanos; // required
/** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -57,7 +59,8 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
MESSAGE((short)2, "message"),
ENTRIES_TO_BE_COMPACTED((short)3, "entriesToBeCompacted"),
ENTRIES_READ((short)4, "entriesRead"),
- ENTRIES_WRITTEN((short)5, "entriesWritten");
+ ENTRIES_WRITTEN((short)5, "entriesWritten"),
+ COMPACTION_AGE_NANOS((short)6, "compactionAgeNanos");
private static final java.util.Map<java.lang.String, _Fields> byName = new
java.util.HashMap<java.lang.String, _Fields>();
@@ -83,6 +86,8 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
return ENTRIES_READ;
case 5: // ENTRIES_WRITTEN
return ENTRIES_WRITTEN;
+ case 6: // COMPACTION_AGE_NANOS
+ return COMPACTION_AGE_NANOS;
default:
return null;
}
@@ -129,6 +134,7 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
private static final int __ENTRIESTOBECOMPACTED_ISSET_ID = 0;
private static final int __ENTRIESREAD_ISSET_ID = 1;
private static final int __ENTRIESWRITTEN_ISSET_ID = 2;
+ private static final int __COMPACTIONAGENANOS_ISSET_ID = 3;
private byte __isset_bitfield = 0;
public static final java.util.Map<_Fields,
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -143,6 +149,8 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
tmpMap.put(_Fields.ENTRIES_WRITTEN, new
org.apache.thrift.meta_data.FieldMetaData("entriesWritten",
org.apache.thrift.TFieldRequirementType.DEFAULT,
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.COMPACTION_AGE_NANOS, new
org.apache.thrift.meta_data.FieldMetaData("compactionAgeNanos",
org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TCompactionStatusUpdate.class,
metaDataMap);
}
@@ -155,7 +163,8 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
java.lang.String message,
long entriesToBeCompacted,
long entriesRead,
- long entriesWritten)
+ long entriesWritten,
+ long compactionAgeNanos)
{
this();
this.state = state;
@@ -166,6 +175,8 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
setEntriesReadIsSet(true);
this.entriesWritten = entriesWritten;
setEntriesWrittenIsSet(true);
+ this.compactionAgeNanos = compactionAgeNanos;
+ setCompactionAgeNanosIsSet(true);
}
/**
@@ -182,6 +193,7 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
this.entriesToBeCompacted = other.entriesToBeCompacted;
this.entriesRead = other.entriesRead;
this.entriesWritten = other.entriesWritten;
+ this.compactionAgeNanos = other.compactionAgeNanos;
}
@Override
@@ -199,6 +211,8 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
this.entriesRead = 0;
setEntriesWrittenIsSet(false);
this.entriesWritten = 0;
+ setCompactionAgeNanosIsSet(false);
+ this.compactionAgeNanos = 0;
}
/**
@@ -328,6 +342,29 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
__isset_bitfield =
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield,
__ENTRIESWRITTEN_ISSET_ID, value);
}
+ public long getCompactionAgeNanos() {
+ return this.compactionAgeNanos;
+ }
+
+ public TCompactionStatusUpdate setCompactionAgeNanos(long
compactionAgeNanos) {
+ this.compactionAgeNanos = compactionAgeNanos;
+ setCompactionAgeNanosIsSet(true);
+ return this;
+ }
+
+ public void unsetCompactionAgeNanos() {
+ __isset_bitfield =
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield,
__COMPACTIONAGENANOS_ISSET_ID);
+ }
+
+ /** Returns true if field compactionAgeNanos is set (has been assigned a
value) and false otherwise */
+ public boolean isSetCompactionAgeNanos() {
+ return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield,
__COMPACTIONAGENANOS_ISSET_ID);
+ }
+
+ public void setCompactionAgeNanosIsSet(boolean value) {
+ __isset_bitfield =
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield,
__COMPACTIONAGENANOS_ISSET_ID, value);
+ }
+
@Override
public void setFieldValue(_Fields field,
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
@@ -371,6 +408,14 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
}
break;
+ case COMPACTION_AGE_NANOS:
+ if (value == null) {
+ unsetCompactionAgeNanos();
+ } else {
+ setCompactionAgeNanos((java.lang.Long)value);
+ }
+ break;
+
}
}
@@ -393,6 +438,9 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
case ENTRIES_WRITTEN:
return getEntriesWritten();
+ case COMPACTION_AGE_NANOS:
+ return getCompactionAgeNanos();
+
}
throw new java.lang.IllegalStateException();
}
@@ -415,6 +463,8 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
return isSetEntriesRead();
case ENTRIES_WRITTEN:
return isSetEntriesWritten();
+ case COMPACTION_AGE_NANOS:
+ return isSetCompactionAgeNanos();
}
throw new java.lang.IllegalStateException();
}
@@ -477,6 +527,15 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
return false;
}
+ boolean this_present_compactionAgeNanos = true;
+ boolean that_present_compactionAgeNanos = true;
+ if (this_present_compactionAgeNanos || that_present_compactionAgeNanos) {
+ if (!(this_present_compactionAgeNanos &&
that_present_compactionAgeNanos))
+ return false;
+ if (this.compactionAgeNanos != that.compactionAgeNanos)
+ return false;
+ }
+
return true;
}
@@ -498,6 +557,8 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
hashCode = hashCode * 8191 +
org.apache.thrift.TBaseHelper.hashCode(entriesWritten);
+ hashCode = hashCode * 8191 +
org.apache.thrift.TBaseHelper.hashCode(compactionAgeNanos);
+
return hashCode;
}
@@ -559,6 +620,16 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetCompactionAgeNanos(),
other.isSetCompactionAgeNanos());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCompactionAgeNanos()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(this.compactionAgeNanos,
other.compactionAgeNanos);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -610,6 +681,10 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
sb.append("entriesWritten:");
sb.append(this.entriesWritten);
first = false;
+ if (!first) sb.append(", ");
+ sb.append("compactionAgeNanos:");
+ sb.append(this.compactionAgeNanos);
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -697,6 +772,14 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
break;
+ case 6: // COMPACTION_AGE_NANOS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.compactionAgeNanos = iprot.readI64();
+ struct.setCompactionAgeNanosIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
@@ -732,6 +815,9 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
oprot.writeFieldBegin(ENTRIES_WRITTEN_FIELD_DESC);
oprot.writeI64(struct.entriesWritten);
oprot.writeFieldEnd();
+ oprot.writeFieldBegin(COMPACTION_AGE_NANOS_FIELD_DESC);
+ oprot.writeI64(struct.compactionAgeNanos);
+ oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -766,7 +852,10 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
if (struct.isSetEntriesWritten()) {
optionals.set(4);
}
- oprot.writeBitSet(optionals, 5);
+ if (struct.isSetCompactionAgeNanos()) {
+ optionals.set(5);
+ }
+ oprot.writeBitSet(optionals, 6);
if (struct.isSetState()) {
oprot.writeI32(struct.state.getValue());
}
@@ -782,12 +871,15 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
if (struct.isSetEntriesWritten()) {
oprot.writeI64(struct.entriesWritten);
}
+ if (struct.isSetCompactionAgeNanos()) {
+ oprot.writeI64(struct.compactionAgeNanos);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot,
TCompactionStatusUpdate struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TTupleProtocol iprot =
(org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet incoming = iprot.readBitSet(5);
+ java.util.BitSet incoming = iprot.readBitSet(6);
if (incoming.get(0)) {
struct.state =
org.apache.accumulo.core.compaction.thrift.TCompactionState.findByValue(iprot.readI32());
struct.setStateIsSet(true);
@@ -808,6 +900,10 @@ public class TCompactionStatusUpdate implements
org.apache.thrift.TBase<TCompact
struct.entriesWritten = iprot.readI64();
struct.setEntriesWrittenIsSet(true);
}
+ if (incoming.get(5)) {
+ struct.compactionAgeNanos = iprot.readI64();
+ struct.setCompactionAgeNanosIsSet(true);
+ }
}
}
diff --git a/core/src/main/thrift/compaction-coordinator.thrift
b/core/src/main/thrift/compaction-coordinator.thrift
index 7cb090be45..f8c2d764fb 100644
--- a/core/src/main/thrift/compaction-coordinator.thrift
+++ b/core/src/main/thrift/compaction-coordinator.thrift
@@ -46,6 +46,7 @@ struct TCompactionStatusUpdate {
3:i64 entriesToBeCompacted
4:i64 entriesRead
5:i64 entriesWritten
+ 6:i64 compactionAgeNanos
}
struct TExternalCompaction {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
index 500653964d..b505c38cb9 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
@@ -98,8 +98,8 @@ public class CompactionInfo {
}
List<String> files =
compactor.getFilesToCompact().stream().map(StoredTabletFile::getPathStr)
.collect(Collectors.toList());
- return new ActiveCompaction(compactor.extent.toThrift(),
- System.currentTimeMillis() - compactor.getStartTime(), files,
compactor.getOutputFile(),
- type, reason, localityGroup, entriesRead, entriesWritten, iiList,
iterOptions);
+ return new ActiveCompaction(compactor.extent.toThrift(),
compactor.getAge().toMillis(), files,
+ compactor.getOutputFile(), type, reason, localityGroup, entriesRead,
entriesWritten, iiList,
+ iterOptions);
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 3825a51d88..2645962887 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -122,7 +122,7 @@ public class FileCompactor implements
Callable<CompactionStats> {
// things to report
private String currentLocalityGroup = "";
- private final long startTime;
+ private volatile long startTime = -1;
private final AtomicLong currentEntriesRead = new AtomicLong(0);
private final AtomicLong currentEntriesWritten = new AtomicLong(0);
@@ -248,8 +248,6 @@ public class FileCompactor implements
Callable<CompactionStats> {
this.env = env;
this.iterators = iterators;
this.cryptoService = cs;
-
- startTime = System.currentTimeMillis();
}
public VolumeManager getVolumeManager() {
@@ -280,6 +278,8 @@ public class FileCompactor implements
Callable<CompactionStats> {
CompactionStats majCStats = new CompactionStats();
+ startTime = System.nanoTime();
+
boolean remove = runningCompactions.add(this);
String threadStartDate = dateFormatter.format(new Date());
@@ -570,8 +570,15 @@ public class FileCompactor implements
Callable<CompactionStats> {
return currentEntriesWritten.get();
}
- long getStartTime() {
- return startTime;
+ /**
+ * @return the duration since {@link #call()} was called
+ */
+ Duration getAge() {
+ if (startTime == -1) {
+ // call() has not been called yet
+ return Duration.ZERO;
+ }
+ return Duration.ofNanos(System.nanoTime() - startTime);
}
Iterable<IteratorSetting> getIterators() {
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index b82358581a..10ce776891 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.UnknownHostException;
import java.security.SecureRandom;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -134,6 +135,8 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
void initialize() throws RetriesExceededException;
AtomicReference<FileCompactor> getFileCompactor();
+
+ Duration getCompactionAge();
}
private static final SecureRandom random = new SecureRandom();
@@ -515,13 +518,15 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
return new FileCompactorRunnable() {
- private AtomicReference<FileCompactor> compactor = new
AtomicReference<>();
+ private final AtomicReference<FileCompactor> compactor = new
AtomicReference<>();
+ private volatile long startTimeNanos = -1;
@Override
public void initialize() throws RetriesExceededException {
LOG.info("Starting up compaction runnable for job: {}", job);
- TCompactionStatusUpdate update =
- new TCompactionStatusUpdate(TCompactionState.STARTED, "Compaction
started", -1, -1, -1);
+ this.startTimeNanos = System.nanoTime();
+ TCompactionStatusUpdate update = new
TCompactionStatusUpdate(TCompactionState.STARTED,
+ "Compaction started", -1, -1, -1, getCompactionAge().toNanos());
updateCompactionState(job, update);
final var extent = KeyExtent.fromThrift(job.getExtent());
final AccumuloConfiguration aConfig;
@@ -589,7 +594,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
LOG.info("Compaction completed successfully {} ",
job.getExternalCompactionId());
// Update state when completed
TCompactionStatusUpdate update2 = new
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
- "Compaction completed successfully", -1, -1, -1);
+ "Compaction completed successfully", -1, -1, -1,
this.getCompactionAge().toNanos());
updateCompactionState(job, update2);
} catch (FileCompactor.CompactionCanceledException cce) {
LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
@@ -605,6 +610,15 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
}
}
+ @Override
+ public Duration getCompactionAge() {
+ if (startTimeNanos == -1) {
+ // compaction hasn't started yet
+ return Duration.ZERO;
+ }
+ return Duration.ofNanos(System.nanoTime() - startTimeNanos);
+ }
+
};
}
@@ -760,9 +774,9 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
watcher.run();
try {
LOG.debug("Updating coordinator with compaction progress:
{}.", message);
- TCompactionStatusUpdate update =
- new
TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message,
- inputEntries, entriesRead, entriesWritten);
+ TCompactionStatusUpdate update = new TCompactionStatusUpdate(
+ TCompactionState.IN_PROGRESS, message, inputEntries,
entriesRead,
+ entriesWritten, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
} catch (RetriesExceededException e) {
LOG.warn("Error updating coordinator with compaction
progress, error: {}",
@@ -789,8 +803,9 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
|| (err.get() != null &&
err.get().getClass().equals(InterruptedException.class))) {
LOG.warn("Compaction thread was interrupted, sending CANCELLED
state");
try {
- TCompactionStatusUpdate update = new TCompactionStatusUpdate(
- TCompactionState.CANCELLED, "Compaction cancelled", -1, -1,
-1);
+ TCompactionStatusUpdate update =
+ new TCompactionStatusUpdate(TCompactionState.CANCELLED,
"Compaction cancelled",
+ -1, -1, -1, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
updateCompactionFailed(job);
} catch (RetriesExceededException e) {
@@ -804,7 +819,8 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
LOG.info("Updating coordinator with compaction failure: id: {},
extent: {}",
job.getExternalCompactionId(), fromThriftExtent);
TCompactionStatusUpdate update = new
TCompactionStatusUpdate(TCompactionState.FAILED,
- "Compaction failed due to: " + err.get().getMessage(), -1,
-1, -1);
+ "Compaction failed due to: " + err.get().getMessage(), -1,
-1, -1,
+ fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
updateCompactionFailed(job);
} catch (RetriesExceededException e) {
diff --git
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 8a8da9ae89..1ca2f147eb 100644
---
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.net.UnknownHostException;
+import java.time.Duration;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@@ -107,6 +108,11 @@ public class CompactorTest {
return new AtomicReference<>(compactor);
}
+ @Override
+ public Duration getCompactionAge() {
+ return Duration.ZERO;
+ }
+
@Override
public void run() {
try {
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index f3f8864bb3..89f887251b 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.test.compaction;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.accumulo.core.util.UtilWaitThread.sleep;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
@@ -29,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@@ -46,12 +49,15 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -119,6 +125,93 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
cfg.setSystemProperties(sysProps);
}
+ @Test
+ public void testCompactionDurationContinuesAfterCoordinatorStop() throws
Exception {
+ String table = this.getUniqueNames(1)[0];
+
+ try (AccumuloClient client =
+ Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
+ createTable(client, table, "cs1");
+ writeData(client, table, ROWS);
+
+ cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1);
+
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
+
+ IteratorSetting setting = new IteratorSetting(50, "Slow",
SlowIterator.class);
+ SlowIterator.setSleepTime(setting, 5);
+ client.tableOperations().attachIterator(table, setting,
+ EnumSet.of(IteratorUtil.IteratorScope.majc));
+
+ log.info("Compacting table");
+ compact(client, table, 2, QUEUE1, false);
+
+ // Wait until the compaction starts
+ Wait.waitFor(() -> {
+ Map<String,TExternalCompaction> compactions =
+
getRunningCompactions(getCluster().getServerContext()).getCompactions();
+ return compactions == null || compactions.isEmpty();
+ }, 30_000, 100, "Compaction did not start within the expected time");
+
+ // start a timer after the compaction starts
+ long compactionStartTime = System.nanoTime();
+
+ // let the compaction advance a bit
+ sleepUninterruptibly(6, TimeUnit.SECONDS);
+
+ // Stop the coordinator
+ log.info("Stopping the coordinator");
+
cluster.getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+
+ sleepUninterruptibly(5, TimeUnit.SECONDS);
+
+ log.info("Restarting the coordinator");
+
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
+ long coordinatorRestartTime = System.nanoTime();
+
+ // Wait for compactions to be present
+ Map<String,TExternalCompaction> metrics = null;
+ while (metrics == null) {
+ try {
+ metrics =
getRunningCompactions(getCluster().getServerContext()).getCompactions();
+ } catch (TException e) {
+ UtilWaitThread.sleep(250);
+ }
+ }
+
+ // let the compaction advance a bit
+ sleepUninterruptibly(6, TimeUnit.SECONDS);
+
+ TExternalCompaction updatedCompaction =
getRunningCompactions(getCluster().getServerContext())
+ .getCompactions().values().iterator().next();
+ RunningCompactionInfo updatedCompactionInfo = new
RunningCompactionInfo(updatedCompaction);
+
+ final Duration reportedCompactionDuration =
Duration.ofNanos(updatedCompactionInfo.duration);
+ final Duration measuredCompactionDuration =
+ Duration.ofNanos(System.nanoTime() - compactionStartTime);
+ final Duration coordinatorAge = Duration.ofNanos(System.nanoTime() -
coordinatorRestartTime);
+ log.info(
+ "Coordinator age: {}s. Measured compaction duration: {}s. Reported
compaction duration: {}s",
+ coordinatorAge.toSeconds(), measuredCompactionDuration.toSeconds(),
+ reportedCompactionDuration.toSeconds());
+
+ assertTrue(coordinatorAge.compareTo(reportedCompactionDuration) < 0,
+ "Reported compaction age should be greater than the coordinator
age");
+
+ // Verify that the reported duration is approximately equal to the
elapsed time
+ Duration tolerance = Duration.ofSeconds(7);
+ long reportedVsMeasuredDiff =
+
Math.abs(reportedCompactionDuration.minus(measuredCompactionDuration).toNanos());
+ assertTrue(reportedVsMeasuredDiff <= tolerance.toNanos(),
+ String.format(
+ "Reported duration (%s) and elapsed time (%s) differ by more
than the tolerance (%s)",
+ reportedCompactionDuration.toSeconds(),
measuredCompactionDuration.toSeconds(),
+ tolerance.toSeconds()));
+ } finally {
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+ }
+ }
+
@Test
public void testProgressViaMetrics() throws Exception {
String table = this.getUniqueNames(1)[0];
@@ -348,13 +441,15 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
* Check running compaction progress.
*/
private void checkRunning() throws TException {
- var ecList = getRunningCompactions(getCluster().getServerContext());
- var ecMap = ecList.getCompactions();
+ TExternalCompactionList ecList =
getRunningCompactions(getCluster().getServerContext());
+ Map<String,TExternalCompaction> ecMap = ecList.getCompactions();
if (ecMap != null) {
ecMap.forEach((ecid, ec) -> {
// returns null if it's a new mapping
RunningCompactionInfo rci = new RunningCompactionInfo(ec);
RunningCompactionInfo previousRci = runningMap.put(ecid, rci);
+ log.debug("ECID {} has been running for {} seconds", ecid,
+ NANOSECONDS.toSeconds(rci.duration));
if (previousRci == null) {
log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles);
} else {
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index a97d8a37b4..08fc5fbb8d 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.test.compaction;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
@@ -70,6 +71,11 @@ public class ExternalDoNothingCompactor extends Compactor
implements Iface {
return ref;
}
+ @Override
+ public Duration getCompactionAge() {
+ return Duration.ZERO;
+ }
+
@Override
public void run() {
try {