This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 91bbf13e69 Set compaction start time in Coordinator from Compactor msg
(#6225)
91bbf13e69 is described below
commit 91bbf13e691f63711a6223ce630bb76bd75c73ee
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 19 10:22:40 2026 -0400
Set compaction start time in Coordinator from Compactor msg (#6225)
In #6221 we removed the RunningCompaction object in favor
of using TExternalCompaction. The RunningCompaction object
was setting its start time variable when the STARTED update
msg was added. This behavior was lost in #6221. This change
adds it back by setting the start time on the TExternalCompaction
object when the STARTED msg is received.
---
.../main/java/org/apache/accumulo/compactor/Compactor.java | 14 +++++++++++---
.../compaction/coordinator/CompactionCoordinator.java | 3 +++
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 92239742ab..82301f9817 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -473,7 +473,12 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
*/
protected void updateCompactionState(TExternalCompactionJob job,
TCompactionStatusUpdate update)
throws RetriesExceededException {
- JOB_HOLDER.getCurrentCompaction().putToUpdates(System.currentTimeMillis(),
update);
+ long updateTime = System.currentTimeMillis();
+ TExternalCompaction tec = JOB_HOLDER.getCurrentCompaction();
+ if (update.getState() == TCompactionState.STARTED) {
+ tec.setStartTime(updateTime);
+ }
+ tec.putToUpdates(updateTime, update);
RetryableThriftCall<String> thriftCall =
new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25,
() -> {
Client coordinatorClient = getCoordinatorClient();
@@ -481,7 +486,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
LOG.trace("Attempting to update compaction state in coordinator
{}",
job.getExternalCompactionId());
coordinatorClient.updateCompactionStatus(TraceUtil.traceInfo(),
getContext().rpcCreds(),
- job.getExternalCompactionId(), update,
System.currentTimeMillis());
+ job.getExternalCompactionId(), update, updateTime);
return "";
} finally {
ThriftUtil.returnClient(coordinatorClient, getContext());
@@ -902,6 +907,10 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
current.setCompactor(clientAddress.toString());
current.setGroupName(getResourceGroup().canonical());
current.setJob(job);
+ // start time for the current compaction is set when the
+ // STARTED msg is sent to the coordinator. The coordinator
+ // updates its copy of the TExternalCompaction when it
+ // receives the STARTED msg.
JOB_HOLDER.set(current, compactionThread, fcr.getFileCompactor());
try {
@@ -924,7 +933,6 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
}
compactionThread.start(); // start the compactionThread
- current.setStartTime(System.currentTimeMillis());
started.await(); // wait until the compactor is started
final long inputEntries = totalInputEntries.sum();
final long waitTime =
calculateProgressCheckTime(totalInputBytes.sum());
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 0729f09169..6c31302fc0 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -1055,6 +1055,9 @@ public class CompactionCoordinator
tec.putToUpdates(timestamp, update);
switch (update.state) {
case STARTED:
+ // Start time is used by the comparator, so set it first
+ // before adding to the LONG_RUNNING_COMPACTIONS_BY_RG object.
+ tec.setStartTime(timestamp);
LONG_RUNNING_COMPACTIONS_BY_RG
.computeIfAbsent(tec.getGroupName(), k -> new
TimeOrderedRunningCompactionSet())
.add(tec);