This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit ee197905e99a7680ad3420d990b27aca5c9fb72d Merge: 24267d01d3 b0643beb5f Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Jun 5 13:05:17 2024 +0000 Merge branch '2.1' .../accumulo/server/compaction/FileCompactor.java | 16 +++ .../accumulo/compactor/CompactionJobHolder.java | 12 ++- .../org/apache/accumulo/compactor/Compactor.java | 120 +++++++++++++-------- .../accumulo/tserver/tablet/CompactableUtils.java | 17 ++- .../compaction/ExternalDoNothingCompactor.java | 71 +++++++----- 5 files changed, 165 insertions(+), 71 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index cebdce684d,3825a51d88..403ac2ce60 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@@ -32,7 -32,7 +32,8 @@@ import java.util.Map import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; + import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@@ -54,11 -54,13 +55,12 @@@ import org.apache.accumulo.core.iterato import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; import org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator; import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator; +import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator; + import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; -import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; @@@ -421,10 -429,13 +436,11 @@@ public class FileCompactor implements C readers.add(reader); - SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(context, - extent.tableId(), mapFile.getPathStr(), false, reader); - ((ProblemReportingIterator) iter).setInterruptFlag(interruptFlag); + InterruptibleIterator iter = new ProblemReportingIterator(context, extent.tableId(), + dataFile.getNormalizedPathStr(), false, reader); ++ iter.setInterruptFlag(interruptFlag); - if (filesToCompact.get(mapFile).isTimeSet()) { - iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime()); - } + iter = filesToCompact.get(dataFile).wrapFileIterator(iter); iters.add(iter); diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 1aab26fa0c,7f7509489a..4ba279162e --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -127,7 -125,29 +127,17 @@@ import io.micrometer.core.instrument.Me public class Compactor extends AbstractServer implements MetricsProducer, CompactorService.Iface { + public interface FileCompactorRunnable extends Runnable { + /** + * Unable to create a constructor in an anonymous class so this method serves to initialize the + * object so that {@code #getFileCompactor()} returns a non-null reference. + */ + void initialize() throws RetriesExceededException; + + AtomicReference<FileCompactor> getFileCompactor(); + } + - private static final SecureRandom random = new SecureRandom(); - - public static class CompactorServerOpts extends ServerOpts { - @Parameter(required = true, names = {"-q", "--queue"}, description = "compaction queue name") - private String queueName = null; - - public String getQueueName() { - return queueName; - } - } - private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); - private static final long TIME_BETWEEN_GC_CHECKS = 5000; private static final long TIME_BETWEEN_CANCEL_CHECKS = MINUTES.toMillis(5); private static final long TEN_MEGABYTES = 10485760; @@@ -530,39 -547,58 +539,59 @@@ job.getIteratorSettings().getIterators() .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); - ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName); - FileCompactor compactor = + final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName); - compactor.set(new FileCompactor(getContext(), extent, files, outputFile, - job.isPropagateDeletes(), cenv, iters, aConfig, tConfig.getCryptoService())); ++ compactor.set( + new FileCompactor(getContext(), extent, files, outputFile, job.isPropagateDeletes(), - cenv, iters, aConfig, tConfig.getCryptoService(), pausedMetrics); - - LOG.trace("Starting compactor"); - started.countDown(); - - org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); - TCompactionStats cs = new TCompactionStats(); - cs.setEntriesRead(stat.getEntriesRead()); - cs.setEntriesWritten(stat.getEntriesWritten()); - cs.setFileSize(stat.getFileSize()); - JOB_HOLDER.setStats(cs); - - LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); - // Update state when completed - TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, - "Compaction completed successfully", -1, -1, -1); - updateCompactionState(job, update2); - } catch (FileCompactor.CompactionCanceledException cce) { - LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); - err.set(cce); - } catch (Exception e) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), - fromThriftExtent, e); - err.set(e); - } finally { - stopped.countDown(); - Preconditions.checkState(compactionRunning.compareAndSet(true, false)); ++ cenv, iters, aConfig, tConfig.getCryptoService(), pausedMetrics)); + + } + + @Override + public AtomicReference<FileCompactor> getFileCompactor() { + return compactor; } + + @Override + public void run() { + Preconditions.checkState(compactor.get() != null, "initialize not called"); + // Its only expected that a single compaction runs at a time. Multiple compactions running + // at a time could cause odd behavior like out of order and unexpected thrift calls to the + // coordinator. This is a sanity check to ensure the expectation is met. Should this check + // ever fail, it means there is a bug elsewhere. + Preconditions.checkState(compactionRunning.compareAndSet(false, true)); + try { + + LOG.trace("Starting compactor"); + started.countDown(); + + org.apache.accumulo.server.compaction.CompactionStats stat = compactor.get().call(); + TCompactionStats cs = new TCompactionStats(); + cs.setEntriesRead(stat.getEntriesRead()); + cs.setEntriesWritten(stat.getEntriesWritten()); + cs.setFileSize(stat.getFileSize()); + JOB_HOLDER.setStats(cs); + + LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); + // Update state when completed + TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, + "Compaction completed successfully", -1, -1, -1); + updateCompactionState(job, update2); + } catch (FileCompactor.CompactionCanceledException cce) { + LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); + err.set(cce); + } catch (Exception e) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), + fromThriftExtent, e); + err.set(e); + } finally { + stopped.countDown(); + Preconditions.checkState(compactionRunning.compareAndSet(true, false)); + } + } + }; + } /** diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index 0cff077fbc,1a670c81d1..7239e62f4d --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@@ -27,6 -28,9 +27,8 @@@ import java.util.Objects import java.util.Optional; import java.util.Set; import java.util.SortedMap; -import java.util.concurrent.ExecutionException; + import java.util.concurrent.ScheduledFuture; + import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@@ -402,15 -563,25 +404,26 @@@ public class CompactableUtils throws IOException, CompactionCanceledException { TableConfiguration tableConf = tablet.getTableConfiguration(); - AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf, - getOverrides(job.getKind(), tablet, cInfo.localHelper, job.getFiles())); + AccumuloConfiguration compactionConfig = + getCompactionConfig(tableConf, getOverrides(job.getKind(), tablet, cInfo.localHelper, + job.getFiles(), cInfo.selectedFiles)); - FileCompactor compactor = new FileCompactor(tablet.getContext(), tablet.getExtent(), + final FileCompactor compactor = new FileCompactor(tablet.getContext(), tablet.getExtent(), compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, compactionConfig, - tableConf.getCryptoService()); + tableConf.getCryptoService(), tablet.getPausedCompactionMetrics()); - return compactor.call(); + final Runnable compactionCancellerTask = () -> { + if (!cenv.isCompactionEnabled()) { + compactor.interrupt(); + } + }; + final ScheduledFuture<?> future = tablet.getContext().getScheduledExecutor() + .scheduleWithFixedDelay(compactionCancellerTask, 10, 10, TimeUnit.SECONDS); + try { + return compactor.call(); + } finally { + future.cancel(true); + } } /** diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index c1f4d6f352,a97d8a37b4..7365c7ab0d --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@@ -60,32 -61,51 +62,51 @@@ public class ExternalDoNothingCompacto // Set this to true so that only 1 external compaction is run this.shutdown = true; - return () -> { - try { - LOG.info("Starting up compaction runnable for job: {}", job); - TCompactionStatusUpdate update = new TCompactionStatusUpdate(); - update.setState(TCompactionState.STARTED); - update.setMessage("Compaction started"); - updateCompactionState(job, update); + return new FileCompactorRunnable() { - LOG.info("Starting compactor"); - started.countDown(); + final AtomicReference<FileCompactor> ref = new AtomicReference<>(); - while (!JOB_HOLDER.isCancelled()) { - LOG.info("Sleeping while job is not cancelled"); - UtilWaitThread.sleep(1000); + @Override + public AtomicReference<FileCompactor> getFileCompactor() { + return ref; + } + + @Override + public void run() { + try { + LOG.info("Starting up compaction runnable for job: {}", job); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(); + update.setState(TCompactionState.STARTED); + update.setMessage("Compaction started"); + updateCompactionState(job, update); + + LOG.info("Starting compactor"); + started.countDown(); + + while (!JOB_HOLDER.isCancelled()) { + LOG.info("Sleeping while job is not cancelled"); + UtilWaitThread.sleep(1000); + } + // Compactor throws this exception when cancelled + throw new CompactionCanceledException(); + + } catch (Exception e) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), + fromThriftExtent, e); + err.set(e); + } finally { + stopped.countDown(); } - // Compactor throws this exception when cancelled - throw new CompactionCanceledException(); - - } catch (Exception e) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), - fromThriftExtent, e); - err.set(e); - } finally { - stopped.countDown(); } + + @Override + public void initialize() throws RetriesExceededException { + // This isn't used, just need to create and return something + ref.set(new FileCompactor(getContext(), KeyExtent.fromThrift(job.getExtent()), null, null, - false, null, null, null, null)); ++ false, null, null, null, null, null)); + } + }; }