This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit b24f7b849649519ba158f0c1c79ce50c402c1e9e Merge: 91db8a98d1 f9551d0e6d Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Fri Jun 14 14:51:17 2024 -0400 Merge remote-tracking branch 'upstream/2.1' .../util/compaction/RunningCompactionInfo.java | 21 ++--- .../compaction/thrift/TCompactionStatusUpdate.java | 104 ++++++++++++++++++++- core/src/main/thrift/compaction-coordinator.thrift | 1 + .../accumulo/server/compaction/CompactionInfo.java | 3 +- .../accumulo/server/compaction/FileCompactor.java | 17 +++- .../org/apache/accumulo/compactor/Compactor.java | 36 +++++-- .../compaction/ExternalCompactionProgressIT.java | 98 ++++++++++++++++++- .../compaction/ExternalDoNothingCompactor.java | 6 ++ 8 files changed, 251 insertions(+), 35 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java index 57d4d24120,b505c38cb9..08e0da282d --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java @@@ -102,11 -96,10 +102,10 @@@ public class CompactionInfo iterSetting.getName())); iterOptions.put(iterSetting.getName(), iterSetting.getOptions()); } - List<String> files = compactor.getFilesToCompact().stream().map(StoredTabletFile::getPathStr) - .collect(Collectors.toList()); + List<String> files = compactor.getFilesToCompact().stream() + .map(StoredTabletFile::getNormalizedPathStr).collect(Collectors.toList()); - return new ActiveCompaction(compactor.extent.toThrift(), - System.currentTimeMillis() - compactor.getStartTime(), files, + return new ActiveCompaction(compactor.extent.toThrift(), compactor.getAge().toMillis(), files, - compactor.getOutputFile(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, - iterOptions); + compactor.getOutputFile().getMetadataPath(), type, reason, localityGroup, entriesRead, + entriesWritten, iiList, iterOptions, timesPaused); } } diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 403ac2ce60,2645962887..906dcc0331 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@@ -125,10 -122,8 +125,10 @@@ public class FileCompactor implements C // things to report private String currentLocalityGroup = ""; - private final long startTime; + private volatile long startTime = -1; + private final AtomicInteger timesPaused = new AtomicInteger(0); + private final AtomicLong currentEntriesRead = new AtomicLong(0); private final AtomicLong currentEntriesWritten = new AtomicLong(0); @@@ -253,9 -248,6 +253,7 @@@ this.env = env; this.iterators = iterators; this.cryptoService = cs; + this.metrics = metrics; - - startTime = System.currentTimeMillis(); } public VolumeManager getVolumeManager() { @@@ -597,12 -570,15 +597,19 @@@ return currentEntriesWritten.get(); } + long getTimesPaused() { + return timesPaused.get(); + } + - long getStartTime() { - return startTime; + /** + * @return the duration since {@link #call()} was called + */ + Duration getAge() { + if (startTime == -1) { + // call() has not been called yet + return Duration.ZERO; + } + return Duration.ofNanos(System.nanoTime() - startTime); } Iterable<IteratorSetting> getIterators() { diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index f8ab5b9386,10ce776891..0649b9e8fe --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -26,6 -25,8 +26,7 @@@ import static org.apache.accumulo.core. import java.io.IOException; import java.io.UncheckedIOException; import java.net.UnknownHostException; -import java.security.SecureRandom; + import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@@ -136,9 -135,23 +137,11 @@@ public class Compactor extends Abstract void initialize() throws RetriesExceededException; AtomicReference<FileCompactor> getFileCompactor(); + + Duration getCompactionAge(); } - private static final SecureRandom random = new SecureRandom(); - - public static class CompactorServerOpts extends ServerOpts { - @Parameter(required = true, names = {"-q", "--queue"}, description = "compaction queue name") - private String queueName = null; - - public String getQueueName() { - return queueName; - } - } - private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); - private static final long TIME_BETWEEN_GC_CHECKS = 5000; private static final long TIME_BETWEEN_CANCEL_CHECKS = MINUTES.toMillis(5); private static final long TEN_MEGABYTES = 10485760; diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 36e6b0031d,89f887251b..a7844c05d0 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@@ -18,7 -18,9 +18,8 @@@ */ package org.apache.accumulo.test.compaction; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.accumulo.core.util.UtilWaitThread.sleep; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;