This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 01fa10be6391242259f9ec79b00c98ae5128b72a Merge: 7e9bb4d603 ee197905e9 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Jun 5 13:30:02 2024 +0000 Merge branch 'main' into elasticity .../accumulo/server/compaction/FileCompactor.java | 16 +++ .../accumulo/compactor/CompactionJobHolder.java | 12 ++- .../org/apache/accumulo/compactor/Compactor.java | 120 +++++++++++++-------- .../compaction/ExternalDoNothingCompactor.java | 92 +++++++++------- 4 files changed, 160 insertions(+), 80 deletions(-) diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 407f0ed6b3,4ba279162e..1cfb043eac --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -130,7 -127,18 +130,17 @@@ import io.micrometer.core.instrument.Me public class Compactor extends AbstractServer implements MetricsProducer, CompactorService.Iface { + public interface FileCompactorRunnable extends Runnable { + /** + * Unable to create a constructor in an anonymous class so this method serves to initialize the + * object so that {@code #getFileCompactor()} returns a non-null reference. + */ + void initialize() throws RetriesExceededException; + + AtomicReference<FileCompactor> getFileCompactor(); + } + private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); - private static final long TIME_BETWEEN_CANCEL_CHECKS = MINUTES.toMillis(5); private static final long TEN_MEGABYTES = 10485760; @@@ -544,39 -539,59 +553,59 @@@ job.getIteratorSettings().getIterators() .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); - ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, this.getResourceGroup()); - FileCompactor compactor = - final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName); ++ final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, getResourceGroup()); + compactor.set( new FileCompactor(getContext(), extent, files, outputFile, job.isPropagateDeletes(), - cenv, iters, aConfig, tConfig.getCryptoService(), pausedMetrics); - - LOG.trace("Starting compactor"); - started.countDown(); - - org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); - TCompactionStats cs = new TCompactionStats(); - cs.setEntriesRead(stat.getEntriesRead()); - cs.setEntriesWritten(stat.getEntriesWritten()); - cs.setFileSize(stat.getFileSize()); - JOB_HOLDER.setStats(cs); - - LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); - // Update state when completed - TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, - "Compaction completed successfully", -1, -1, -1); - updateCompactionState(job, update2); - } catch (FileCompactor.CompactionCanceledException cce) { - LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); - err.set(cce); - } catch (Exception e) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), - fromThriftExtent, e); - err.set(e); - } finally { - stopped.countDown(); - Preconditions.checkState(compactionRunning.compareAndSet(true, false)); + cenv, iters, aConfig, tConfig.getCryptoService(), pausedMetrics)); + + } + + @Override + public AtomicReference<FileCompactor> getFileCompactor() { + return compactor; } + + @Override + public void run() { + Preconditions.checkState(compactor.get() != null, "initialize not called"); + // Its only expected that a single compaction runs at a time. Multiple compactions running + // at a time could cause odd behavior like out of order and unexpected thrift calls to the + // coordinator. This is a sanity check to ensure the expectation is met. Should this check + // ever fail, it means there is a bug elsewhere. + Preconditions.checkState(compactionRunning.compareAndSet(false, true)); + try { + + LOG.trace("Starting compactor"); + started.countDown(); + + org.apache.accumulo.server.compaction.CompactionStats stat = compactor.get().call(); + TCompactionStats cs = new TCompactionStats(); + cs.setEntriesRead(stat.getEntriesRead()); + cs.setEntriesWritten(stat.getEntriesWritten()); + cs.setFileSize(stat.getFileSize()); + JOB_HOLDER.setStats(cs); + + LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); + // Update state when completed + TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, + "Compaction completed successfully", -1, -1, -1); + updateCompactionState(job, update2); + } catch (FileCompactor.CompactionCanceledException cce) { + LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); + err.set(cce); + } catch (Exception e) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), + fromThriftExtent, e); + err.set(e); + } finally { + stopped.countDown(); + Preconditions.checkState(compactionRunning.compareAndSet(true, false)); + } + } + }; + } /** diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index ed101cdf3e,7365c7ab0d..51c919a2de --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@@ -31,14 -31,11 +31,16 @@@ import org.apache.accumulo.core.compact import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.util.UtilWaitThread; + import org.apache.accumulo.server.compaction.FileCompactor; import org.apache.accumulo.server.compaction.FileCompactor.CompactionCanceledException; + import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; +import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -58,48 -55,58 +60,66 @@@ public class ExternalDoNothingCompacto } @Override - protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries, - LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, - AtomicReference<Throwable> err) { + protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob job, + LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch started, + CountDownLatch stopped, AtomicReference<Throwable> err) { // Set this to true so that only 1 external compaction is run ++ final AtomicReference<FileCompactor> ref = new AtomicReference<>(); this.shutdown = true; - return () -> { - try { - LOG.info("Starting up compaction runnable for job: {}", job); - TCompactionStatusUpdate update = new TCompactionStatusUpdate(); - update.setState(TCompactionState.STARTED); - update.setMessage("Compaction started"); - updateCompactionState(job, update); - - // Create tmp output file - final TabletMetadata tm = getContext().getAmple() - .readTablet(KeyExtent.fromThrift(job.getExtent()), ColumnType.DIR); - ReferencedTabletFile newFile = - TabletNameGenerator.getNextDataFilenameForMajc(job.isPropagateDeletes(), getContext(), - tm, (dir) -> {}, ExternalCompactionId.from(job.getExternalCompactionId())); - LOG.info("Creating tmp file: {}", newFile.getPath()); - getContext().getVolumeManager().createNewFile(newFile.getPath()); - - LOG.info("Starting compactor"); - started.countDown(); - - while (!JOB_HOLDER.isCancelled()) { - LOG.info("Sleeping while job is not cancelled"); - UtilWaitThread.sleep(1000); + return new FileCompactorRunnable() { + - final AtomicReference<FileCompactor> ref = new AtomicReference<>(); - + @Override + public AtomicReference<FileCompactor> getFileCompactor() { + return ref; + } + + @Override + public void run() { + try { + LOG.info("Starting up compaction runnable for job: {}", job); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(); + update.setState(TCompactionState.STARTED); + update.setMessage("Compaction started"); + updateCompactionState(job, update); + ++ // Create tmp output file ++ final TabletMetadata tm = getContext().getAmple() ++ .readTablet(KeyExtent.fromThrift(job.getExtent()), ColumnType.DIR); ++ ReferencedTabletFile newFile = ++ TabletNameGenerator.getNextDataFilenameForMajc(job.isPropagateDeletes(), getContext(), ++ tm, (dir) -> {}, ExternalCompactionId.from(job.getExternalCompactionId())); ++ LOG.info("Creating tmp file: {}", newFile.getPath()); ++ getContext().getVolumeManager().createNewFile(newFile.getPath()); ++ + LOG.info("Starting compactor"); + started.countDown(); + + while (!JOB_HOLDER.isCancelled()) { + LOG.info("Sleeping while job is not cancelled"); + UtilWaitThread.sleep(1000); + } + // Compactor throws this exception when cancelled + throw new CompactionCanceledException(); + + } catch (Exception e) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), + fromThriftExtent, e); + err.set(e); + } finally { + stopped.countDown(); } - // Compactor throws this exception when cancelled - throw new CompactionCanceledException(); - - } catch (Exception e) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - LOG.error("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), - fromThriftExtent, e); - err.set(e); - } finally { - stopped.countDown(); } + + @Override + public void initialize() throws RetriesExceededException { + // This isn't used, just need to create and return something + ref.set(new FileCompactor(getContext(), KeyExtent.fromThrift(job.getExtent()), null, null, + false, null, null, null, null, null)); + } + }; }