This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new b0643beb5f Modified FileCompactor to set interruptFlag on RFile (#4628) b0643beb5f is described below commit b0643beb5f8d74a2049f5c09c57069796664991d Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Jun 5 08:11:24 2024 -0400 Modified FileCompactor to set interruptFlag on RFile (#4628) Added interrupt method on FileCompactor that is set to true in CompactableUtils when the majc env is no longer enabled. FileCompactor.interrupt sets an AtomicBoolean to true, and the reference to the AtomicBoolean is set on the RFiles. The FileCompactor.interrupt is also called from the Compactor during the already existing compaction cancellation process. Fixes #4485 --- .../accumulo/server/compaction/FileCompactor.java | 16 +++ .../accumulo/compactor/CompactionJobHolder.java | 12 ++- .../org/apache/accumulo/compactor/Compactor.java | 120 +++++++++++++-------- .../apache/accumulo/compactor/CompactorTest.java | 33 ++++-- .../accumulo/tserver/tablet/CompactableUtils.java | 17 ++- .../compaction/ExternalDoNothingCompactor.java | 71 +++++++----- 6 files changed, 188 insertions(+), 81 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 77ec9f1696..3825a51d88 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -53,6 +54,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; import org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator; import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator; +import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator; import org.apache.accumulo.core.metadata.MetadataTable; @@ -142,6 +144,12 @@ public class FileCompactor implements Callable<CompactionStats> { protected volatile Thread thread; private final ServerContext context; + private final AtomicBoolean interruptFlag = new AtomicBoolean(false); + + public void interrupt() { + interruptFlag.set(true); + } + public long getCompactorID() { return compactorID; } @@ -347,6 +355,13 @@ public class FileCompactor implements Callable<CompactionStats> { } catch (CompactionCanceledException e) { log.debug("Compaction canceled {}", extent); throw e; + } catch (IterationInterruptedException iie) { + if (!env.isCompactionEnabled()) { + log.debug("Compaction canceled {}", extent); + throw new CompactionCanceledException(); + } + log.debug("RFile interrupted {}", extent); + throw iie; } catch (IOException | RuntimeException e) { Collection<String> inputFileNames = Collections2.transform(getFilesToCompact(), StoredTabletFile::getFileName); @@ -416,6 +431,7 @@ public class FileCompactor implements Callable<CompactionStats> { SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(context, extent.tableId(), mapFile.getPathStr(), false, reader); + ((ProblemReportingIterator) iter).setInterruptFlag(interruptFlag); if (filesToCompact.get(mapFile).isTimeSet()) { iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime()); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java index d609424c4b..38d03faaf2 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java @@ -19,16 +19,19 @@ package org.apache.accumulo.compactor; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.server.compaction.FileCompactor; public class CompactionJobHolder { private TExternalCompactionJob job; private Thread compactionThread; + private AtomicReference<FileCompactor> compactor; private volatile boolean cancelled = false; private volatile TCompactionStats stats = null; @@ -37,6 +40,7 @@ public class CompactionJobHolder { public synchronized void reset() { job = null; compactionThread = null; + compactor = null; cancelled = false; stats = null; } @@ -61,6 +65,9 @@ public class CompactionJobHolder { public synchronized boolean cancel(String extCompId) { if (isSet() && getJob().getExternalCompactionId().equals(extCompId)) { cancelled = true; + if (compactor.get() != null) { + compactor.get().interrupt(); + } compactionThread.interrupt(); return true; } @@ -75,11 +82,14 @@ public class CompactionJobHolder { return (null != this.job); } - public synchronized void set(TExternalCompactionJob job, Thread compactionThread) { + public synchronized void set(TExternalCompactionJob job, Thread compactionThread, + AtomicReference<FileCompactor> compactor) { Objects.requireNonNull(job, "CompactionJob is null"); Objects.requireNonNull(compactionThread, "Compaction thread is null"); + Objects.requireNonNull(compactor, "Compactor object is null"); this.job = job; this.compactionThread = compactionThread; + this.compactor = compactor; } } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 278c121dd6..7f7509489a 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -125,6 +125,16 @@ import io.micrometer.core.instrument.MeterRegistry; public class Compactor extends AbstractServer implements MetricsProducer, CompactorService.Iface { + public interface FileCompactorRunnable extends Runnable { + /** + * Unable to create a constructor in an anonymous class so this method serves to initialize the + * object so that {@code #getFileCompactor()} returns a non-null reference. + */ + void initialize() throws RetriesExceededException; + + AtomicReference<FileCompactor> getFileCompactor(); + } + private static final SecureRandom random = new SecureRandom(); public static class CompactorServerOpts extends ServerOpts { @@ -490,23 +500,22 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac * @param err reference to error * @return Runnable compaction job */ - protected Runnable createCompactionJob(final TExternalCompactionJob job, + protected FileCompactorRunnable createCompactionJob(final TExternalCompactionJob job, final LongAdder totalInputEntries, final LongAdder totalInputBytes, final CountDownLatch started, final CountDownLatch stopped, final AtomicReference<Throwable> err) { - return () -> { - // Its only expected that a single compaction runs at a time. Multiple compactions running - // at a time could cause odd behavior like out of order and unexpected thrift calls to the - // coordinator. This is a sanity check to ensure the expectation is met. Should this check - // ever fail, it means there is a bug elsewhere. - Preconditions.checkState(compactionRunning.compareAndSet(false, true)); - try { + return new FileCompactorRunnable() { + + private AtomicReference<FileCompactor> compactor = new AtomicReference<>(); + + @Override + public void initialize() throws RetriesExceededException { LOG.info("Starting up compaction runnable for job: {}", job); TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.STARTED, "Compaction started", -1, -1, -1); updateCompactionState(job, update); - var extent = KeyExtent.fromThrift(job.getExtent()); + final var extent = KeyExtent.fromThrift(job.getExtent()); final AccumuloConfiguration aConfig; final TableConfiguration tConfig = getContext().getTableConfiguration(extent.tableId()); @@ -538,38 +547,58 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac job.getIteratorSettings().getIterators() .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); - ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName); - FileCompactor compactor = new FileCompactor(getContext(), extent, files, outputFile, - job.isPropagateDeletes(), cenv, iters, aConfig, tConfig.getCryptoService()); - - LOG.trace("Starting compactor"); - started.countDown(); - - org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); - TCompactionStats cs = new TCompactionStats(); - cs.setEntriesRead(stat.getEntriesRead()); - cs.setEntriesWritten(stat.getEntriesWritten()); - cs.setFileSize(stat.getFileSize()); - JOB_HOLDER.setStats(cs); - - LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); - // Update state when completed - TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, - "Compaction completed successfully", -1, -1, -1); - updateCompactionState(job, update2); - } catch (FileCompactor.CompactionCanceledException cce) { - LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); - err.set(cce); - } catch (Exception e) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), - fromThriftExtent, e); - err.set(e); - } finally { - stopped.countDown(); - Preconditions.checkState(compactionRunning.compareAndSet(true, false)); + final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName); + compactor.set(new FileCompactor(getContext(), extent, files, outputFile, + job.isPropagateDeletes(), cenv, iters, aConfig, tConfig.getCryptoService())); + + } + + @Override + public AtomicReference<FileCompactor> getFileCompactor() { + return compactor; } + + @Override + public void run() { + Preconditions.checkState(compactor.get() != null, "initialize not called"); + // Its only expected that a single compaction runs at a time. Multiple compactions running + // at a time could cause odd behavior like out of order and unexpected thrift calls to the + // coordinator. This is a sanity check to ensure the expectation is met. Should this check + // ever fail, it means there is a bug elsewhere. + Preconditions.checkState(compactionRunning.compareAndSet(false, true)); + try { + + LOG.trace("Starting compactor"); + started.countDown(); + + org.apache.accumulo.server.compaction.CompactionStats stat = compactor.get().call(); + TCompactionStats cs = new TCompactionStats(); + cs.setEntriesRead(stat.getEntriesRead()); + cs.setEntriesWritten(stat.getEntriesWritten()); + cs.setFileSize(stat.getFileSize()); + JOB_HOLDER.setStats(cs); + + LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); + // Update state when completed + TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, + "Compaction completed successfully", -1, -1, -1); + updateCompactionState(job, update2); + } catch (FileCompactor.CompactionCanceledException cce) { + LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); + err.set(cce); + } catch (Exception e) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), + fromThriftExtent, e); + err.set(e); + } finally { + stopped.countDown(); + Preconditions.checkState(compactionRunning.compareAndSet(true, false)); + } + } + }; + } /** @@ -686,13 +715,18 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac final CountDownLatch started = new CountDownLatch(1); final CountDownLatch stopped = new CountDownLatch(1); - final Thread compactionThread = Threads.createThread( - "Compaction job for tablet " + job.getExtent().toString(), - createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err)); + final FileCompactorRunnable fcr = + createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); - JOB_HOLDER.set(job, compactionThread); + final Thread compactionThread = + Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr); + + JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); try { + // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set + fcr.initialize(); + compactionThread.start(); // start the compactionThread started.await(); // wait until the compactor is started final long inputEntries = totalInputEntries.sum(); diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index c747d9392c..8a8da9ae89 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Supplier; +import org.apache.accumulo.compactor.Compactor.FileCompactorRunnable; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.conf.ConfigurationCopy; @@ -53,11 +54,13 @@ import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.compaction.FileCompactor; import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; +import org.easymock.EasyMock; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -76,7 +79,7 @@ import org.slf4j.LoggerFactory; "com.sun.org.apache.xerces.*"}) public class CompactorTest { - public class SuccessfulCompaction implements Runnable { + public class SuccessfulCompaction implements FileCompactorRunnable { protected final Logger LOG = LoggerFactory.getLogger(this.getClass()); @@ -85,6 +88,7 @@ public class CompactorTest { protected final CountDownLatch started; protected final CountDownLatch stopped; protected final AtomicReference<Throwable> err; + private final FileCompactor compactor = EasyMock.createMock(FileCompactor.class); public SuccessfulCompaction(LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, AtomicReference<Throwable> err) { @@ -95,6 +99,14 @@ public class CompactorTest { this.stopped = stopped; } + @Override + public void initialize() throws RetriesExceededException {} + + @Override + public AtomicReference<FileCompactor> getFileCompactor() { + return new AtomicReference<>(compactor); + } + @Override public void run() { try { @@ -106,6 +118,7 @@ public class CompactorTest { stopped.countDown(); } } + } public class FailedCompaction extends SuccessfulCompaction { @@ -214,9 +227,9 @@ public class CompactorTest { protected synchronized void checkIfCanceled() {} @Override - protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries, - LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, - AtomicReference<Throwable> err) { + protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob job, + LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch started, + CountDownLatch stopped, AtomicReference<Throwable> err) { return new SuccessfulCompaction(totalInputEntries, totalInputBytes, started, stopped, err); } @@ -265,9 +278,9 @@ public class CompactorTest { } @Override - protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries, - LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, - AtomicReference<Throwable> err) { + protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob job, + LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch started, + CountDownLatch stopped, AtomicReference<Throwable> err) { return new FailedCompaction(totalInputEntries, totalInputBytes, started, stopped, err); } } @@ -280,9 +293,9 @@ public class CompactorTest { } @Override - protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries, - LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, - AtomicReference<Throwable> err) { + protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob job, + LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch started, + CountDownLatch stopped, AtomicReference<Throwable> err) { return new InterruptedCompaction(totalInputEntries, totalInputBytes, started, stopped, err); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index e2aab70256..1a670c81d1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -29,6 +29,8 @@ import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -564,11 +566,22 @@ public class CompactableUtils { AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf, getOverrides(job.getKind(), tablet, cInfo.localHelper, job.getFiles())); - FileCompactor compactor = new FileCompactor(tablet.getContext(), tablet.getExtent(), + final FileCompactor compactor = new FileCompactor(tablet.getContext(), tablet.getExtent(), compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, compactionConfig, tableConf.getCryptoService()); - return compactor.call(); + final Runnable compactionCancellerTask = () -> { + if (!cenv.isCompactionEnabled()) { + compactor.interrupt(); + } + }; + final ScheduledFuture<?> future = tablet.getContext().getScheduledExecutor() + .scheduleWithFixedDelay(compactionCancellerTask, 10, 10, TimeUnit.SECONDS); + try { + return compactor.call(); + } finally { + future.cancel(true); + } } /** diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index 6b4547252b..a97d8a37b4 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@ -32,7 +32,9 @@ import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.compaction.FileCompactor; import org.apache.accumulo.server.compaction.FileCompactor.CompactionCanceledException; +import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,39 +54,58 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface { } @Override - protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries, - LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, - AtomicReference<Throwable> err) { + protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob job, + LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch started, + CountDownLatch stopped, AtomicReference<Throwable> err) { // Set this to true so that only 1 external compaction is run this.shutdown = true; - return () -> { - try { - LOG.info("Starting up compaction runnable for job: {}", job); - TCompactionStatusUpdate update = new TCompactionStatusUpdate(); - update.setState(TCompactionState.STARTED); - update.setMessage("Compaction started"); - updateCompactionState(job, update); + return new FileCompactorRunnable() { - LOG.info("Starting compactor"); - started.countDown(); + final AtomicReference<FileCompactor> ref = new AtomicReference<>(); - while (!JOB_HOLDER.isCancelled()) { - LOG.info("Sleeping while job is not cancelled"); - UtilWaitThread.sleep(1000); + @Override + public AtomicReference<FileCompactor> getFileCompactor() { + return ref; + } + + @Override + public void run() { + try { + LOG.info("Starting up compaction runnable for job: {}", job); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(); + update.setState(TCompactionState.STARTED); + update.setMessage("Compaction started"); + updateCompactionState(job, update); + + LOG.info("Starting compactor"); + started.countDown(); + + while (!JOB_HOLDER.isCancelled()) { + LOG.info("Sleeping while job is not cancelled"); + UtilWaitThread.sleep(1000); + } + // Compactor throws this exception when cancelled + throw new CompactionCanceledException(); + + } catch (Exception e) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), + fromThriftExtent, e); + err.set(e); + } finally { + stopped.countDown(); } - // Compactor throws this exception when cancelled - throw new CompactionCanceledException(); - - } catch (Exception e) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), - fromThriftExtent, e); - err.set(e); - } finally { - stopped.countDown(); } + + @Override + public void initialize() throws RetriesExceededException { + // This isn't used, just need to create and return something + ref.set(new FileCompactor(getContext(), KeyExtent.fromThrift(job.getExtent()), null, null, + false, null, null, null, null)); + } + }; }