This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 01fa10be6391242259f9ec79b00c98ae5128b72a
Merge: 7e9bb4d603 ee197905e9
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Jun 5 13:30:02 2024 +0000

    Merge branch 'main' into elasticity

 .../accumulo/server/compaction/FileCompactor.java  |  16 +++
 .../accumulo/compactor/CompactionJobHolder.java    |  12 ++-
 .../org/apache/accumulo/compactor/Compactor.java   | 120 +++++++++++++--------
 .../compaction/ExternalDoNothingCompactor.java     |  92 +++++++++-------
 4 files changed, 160 insertions(+), 80 deletions(-)

diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 407f0ed6b3,4ba279162e..1cfb043eac
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -130,7 -127,18 +130,17 @@@ import io.micrometer.core.instrument.Me
  
  public class Compactor extends AbstractServer implements MetricsProducer, 
CompactorService.Iface {
  
+   public interface FileCompactorRunnable extends Runnable {
+     /**
+      * Unable to create a constructor in an anonymous class so this method 
serves to initialize the
+      * object so that {@code #getFileCompactor()} returns a non-null 
reference.
+      */
+     void initialize() throws RetriesExceededException;
+ 
+     AtomicReference<FileCompactor> getFileCompactor();
+   }
+ 
    private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
 -  private static final long TIME_BETWEEN_CANCEL_CHECKS = MINUTES.toMillis(5);
  
    private static final long TEN_MEGABYTES = 10485760;
  
@@@ -544,39 -539,59 +553,59 @@@
          job.getIteratorSettings().getIterators()
              .forEach(tis -> 
iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
  
-         ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, this.getResourceGroup());
-         FileCompactor compactor =
 -        final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
++        final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, getResourceGroup());
+         compactor.set(
              new FileCompactor(getContext(), extent, files, outputFile, 
job.isPropagateDeletes(),
-                 cenv, iters, aConfig, tConfig.getCryptoService(), 
pausedMetrics);
- 
-         LOG.trace("Starting compactor");
-         started.countDown();
- 
-         org.apache.accumulo.server.compaction.CompactionStats stat = 
compactor.call();
-         TCompactionStats cs = new TCompactionStats();
-         cs.setEntriesRead(stat.getEntriesRead());
-         cs.setEntriesWritten(stat.getEntriesWritten());
-         cs.setFileSize(stat.getFileSize());
-         JOB_HOLDER.setStats(cs);
- 
-         LOG.info("Compaction completed successfully {} ", 
job.getExternalCompactionId());
-         // Update state when completed
-         TCompactionStatusUpdate update2 = new 
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
-             "Compaction completed successfully", -1, -1, -1);
-         updateCompactionState(job, update2);
-       } catch (FileCompactor.CompactionCanceledException cce) {
-         LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
-         err.set(cce);
-       } catch (Exception e) {
-         KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
-         LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
-             fromThriftExtent, e);
-         err.set(e);
-       } finally {
-         stopped.countDown();
-         Preconditions.checkState(compactionRunning.compareAndSet(true, 
false));
+                 cenv, iters, aConfig, tConfig.getCryptoService(), 
pausedMetrics));
+ 
+       }
+ 
+       @Override
+       public AtomicReference<FileCompactor> getFileCompactor() {
+         return compactor;
        }
+ 
+       @Override
+       public void run() {
+         Preconditions.checkState(compactor.get() != null, "initialize not 
called");
+         // Its only expected that a single compaction runs at a time. 
Multiple compactions running
+         // at a time could cause odd behavior like out of order and 
unexpected thrift calls to the
+         // coordinator. This is a sanity check to ensure the expectation is 
met. Should this check
+         // ever fail, it means there is a bug elsewhere.
+         Preconditions.checkState(compactionRunning.compareAndSet(false, 
true));
+         try {
+ 
+           LOG.trace("Starting compactor");
+           started.countDown();
+ 
+           org.apache.accumulo.server.compaction.CompactionStats stat = 
compactor.get().call();
+           TCompactionStats cs = new TCompactionStats();
+           cs.setEntriesRead(stat.getEntriesRead());
+           cs.setEntriesWritten(stat.getEntriesWritten());
+           cs.setFileSize(stat.getFileSize());
+           JOB_HOLDER.setStats(cs);
+ 
+           LOG.info("Compaction completed successfully {} ", 
job.getExternalCompactionId());
+           // Update state when completed
+           TCompactionStatusUpdate update2 = new 
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
+               "Compaction completed successfully", -1, -1, -1);
+           updateCompactionState(job, update2);
+         } catch (FileCompactor.CompactionCanceledException cce) {
+           LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
+           err.set(cce);
+         } catch (Exception e) {
+           KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
+           LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
+               fromThriftExtent, e);
+           err.set(e);
+         } finally {
+           stopped.countDown();
+           Preconditions.checkState(compactionRunning.compareAndSet(true, 
false));
+         }
+       }
+ 
      };
+ 
    }
  
    /**
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index ed101cdf3e,7365c7ab0d..51c919a2de
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@@ -31,14 -31,11 +31,16 @@@ import org.apache.accumulo.core.compact
  import org.apache.accumulo.core.compaction.thrift.TCompactionState;
  import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
  import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
  import org.apache.accumulo.core.util.UtilWaitThread;
+ import org.apache.accumulo.server.compaction.FileCompactor;
  import 
org.apache.accumulo.server.compaction.FileCompactor.CompactionCanceledException;
+ import 
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 +import org.apache.accumulo.server.tablets.TabletNameGenerator;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -58,48 -55,58 +60,66 @@@ public class ExternalDoNothingCompacto
    }
  
    @Override
-   protected Runnable createCompactionJob(TExternalCompactionJob job, 
LongAdder totalInputEntries,
-       LongAdder totalInputBytes, CountDownLatch started, CountDownLatch 
stopped,
-       AtomicReference<Throwable> err) {
+   protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob 
job,
+       LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch 
started,
+       CountDownLatch stopped, AtomicReference<Throwable> err) {
  
      // Set this to true so that only 1 external compaction is run
++    final AtomicReference<FileCompactor> ref = new AtomicReference<>();
      this.shutdown = true;
  
-     return () -> {
-       try {
-         LOG.info("Starting up compaction runnable for job: {}", job);
-         TCompactionStatusUpdate update = new TCompactionStatusUpdate();
-         update.setState(TCompactionState.STARTED);
-         update.setMessage("Compaction started");
-         updateCompactionState(job, update);
- 
-         // Create tmp output file
-         final TabletMetadata tm = getContext().getAmple()
-             .readTablet(KeyExtent.fromThrift(job.getExtent()), 
ColumnType.DIR);
-         ReferencedTabletFile newFile =
-             
TabletNameGenerator.getNextDataFilenameForMajc(job.isPropagateDeletes(), 
getContext(),
-                 tm, (dir) -> {}, 
ExternalCompactionId.from(job.getExternalCompactionId()));
-         LOG.info("Creating tmp file: {}", newFile.getPath());
-         getContext().getVolumeManager().createNewFile(newFile.getPath());
- 
-         LOG.info("Starting compactor");
-         started.countDown();
- 
-         while (!JOB_HOLDER.isCancelled()) {
-           LOG.info("Sleeping while job is not cancelled");
-           UtilWaitThread.sleep(1000);
+     return new FileCompactorRunnable() {
+ 
 -      final AtomicReference<FileCompactor> ref = new AtomicReference<>();
 -
+       @Override
+       public AtomicReference<FileCompactor> getFileCompactor() {
+         return ref;
+       }
+ 
+       @Override
+       public void run() {
+         try {
+           LOG.info("Starting up compaction runnable for job: {}", job);
+           TCompactionStatusUpdate update = new TCompactionStatusUpdate();
+           update.setState(TCompactionState.STARTED);
+           update.setMessage("Compaction started");
+           updateCompactionState(job, update);
+ 
++          // Create tmp output file
++          final TabletMetadata tm = getContext().getAmple()
++              .readTablet(KeyExtent.fromThrift(job.getExtent()), 
ColumnType.DIR);
++          ReferencedTabletFile newFile =
++              
TabletNameGenerator.getNextDataFilenameForMajc(job.isPropagateDeletes(), 
getContext(),
++                  tm, (dir) -> {}, 
ExternalCompactionId.from(job.getExternalCompactionId()));
++          LOG.info("Creating tmp file: {}", newFile.getPath());
++          getContext().getVolumeManager().createNewFile(newFile.getPath());
++
+           LOG.info("Starting compactor");
+           started.countDown();
+ 
+           while (!JOB_HOLDER.isCancelled()) {
+             LOG.info("Sleeping while job is not cancelled");
+             UtilWaitThread.sleep(1000);
+           }
+           // Compactor throws this exception when cancelled
+           throw new CompactionCanceledException();
+ 
+         } catch (Exception e) {
+           KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
+           LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
+               fromThriftExtent, e);
+           err.set(e);
+         } finally {
+           stopped.countDown();
          }
-         // Compactor throws this exception when cancelled
-         throw new CompactionCanceledException();
- 
-       } catch (Exception e) {
-         KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
-         LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
-             fromThriftExtent, e);
-         err.set(e);
-       } finally {
-         stopped.countDown();
        }
+ 
+       @Override
+       public void initialize() throws RetriesExceededException {
+         // This isn't used, just need to create and return something
+         ref.set(new FileCompactor(getContext(), 
KeyExtent.fromThrift(job.getExtent()), null, null,
+             false, null, null, null, null, null));
+       }
+ 
      };
  
    }

Reply via email to