This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b0643beb5f Modified FileCompactor to set interruptFlag on RFile (#4628)
b0643beb5f is described below

commit b0643beb5f8d74a2049f5c09c57069796664991d
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Jun 5 08:11:24 2024 -0400

    Modified FileCompactor to set interruptFlag on RFile (#4628)
    
    Added interrupt method on FileCompactor that is set to true
    in CompactableUtils when the majc env is no longer enabled.
    FileCompactor.interrupt sets an AtomicBoolean to true, and
    the reference to the AtomicBoolean is set on the RFiles. The
    FileCompactor.interrupt is also called from the Compactor
    during the already existing compaction cancellation process.
    
    Fixes #4485
---
 .../accumulo/server/compaction/FileCompactor.java  |  16 +++
 .../accumulo/compactor/CompactionJobHolder.java    |  12 ++-
 .../org/apache/accumulo/compactor/Compactor.java   | 120 +++++++++++++--------
 .../apache/accumulo/compactor/CompactorTest.java   |  33 ++++--
 .../accumulo/tserver/tablet/CompactableUtils.java  |  17 ++-
 .../compaction/ExternalDoNothingCompactor.java     |  71 +++++++-----
 6 files changed, 188 insertions(+), 81 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 77ec9f1696..3825a51d88 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
@@ -53,6 +54,7 @@ import 
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
 import 
org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator;
+import 
org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
 import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
 import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
@@ -142,6 +144,12 @@ public class FileCompactor implements 
Callable<CompactionStats> {
   protected volatile Thread thread;
   private final ServerContext context;
 
+  private final AtomicBoolean interruptFlag = new AtomicBoolean(false);
+
+  public void interrupt() {
+    interruptFlag.set(true);
+  }
+
   public long getCompactorID() {
     return compactorID;
   }
@@ -347,6 +355,13 @@ public class FileCompactor implements 
Callable<CompactionStats> {
     } catch (CompactionCanceledException e) {
       log.debug("Compaction canceled {}", extent);
       throw e;
+    } catch (IterationInterruptedException iie) {
+      if (!env.isCompactionEnabled()) {
+        log.debug("Compaction canceled {}", extent);
+        throw new CompactionCanceledException();
+      }
+      log.debug("RFile interrupted {}", extent);
+      throw iie;
     } catch (IOException | RuntimeException e) {
       Collection<String> inputFileNames =
           Collections2.transform(getFilesToCompact(), 
StoredTabletFile::getFileName);
@@ -416,6 +431,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
         SortedKeyValueIterator<Key,Value> iter = new 
ProblemReportingIterator(context,
             extent.tableId(), mapFile.getPathStr(), false, reader);
+        ((ProblemReportingIterator) iter).setInterruptFlag(interruptFlag);
 
         if (filesToCompact.get(mapFile).isTimeSet()) {
           iter = new TimeSettingIterator(iter, 
filesToCompact.get(mapFile).getTime());
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
index d609424c4b..38d03faaf2 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
@@ -19,16 +19,19 @@
 package org.apache.accumulo.compactor;
 
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.server.compaction.FileCompactor;
 
 public class CompactionJobHolder {
 
   private TExternalCompactionJob job;
   private Thread compactionThread;
+  private AtomicReference<FileCompactor> compactor;
   private volatile boolean cancelled = false;
   private volatile TCompactionStats stats = null;
 
@@ -37,6 +40,7 @@ public class CompactionJobHolder {
   public synchronized void reset() {
     job = null;
     compactionThread = null;
+    compactor = null;
     cancelled = false;
     stats = null;
   }
@@ -61,6 +65,9 @@ public class CompactionJobHolder {
   public synchronized boolean cancel(String extCompId) {
     if (isSet() && getJob().getExternalCompactionId().equals(extCompId)) {
       cancelled = true;
+      if (compactor.get() != null) {
+        compactor.get().interrupt();
+      }
       compactionThread.interrupt();
       return true;
     }
@@ -75,11 +82,14 @@ public class CompactionJobHolder {
     return (null != this.job);
   }
 
-  public synchronized void set(TExternalCompactionJob job, Thread 
compactionThread) {
+  public synchronized void set(TExternalCompactionJob job, Thread 
compactionThread,
+      AtomicReference<FileCompactor> compactor) {
     Objects.requireNonNull(job, "CompactionJob is null");
     Objects.requireNonNull(compactionThread, "Compaction thread is null");
+    Objects.requireNonNull(compactor, "Compactor object is null");
     this.job = job;
     this.compactionThread = compactionThread;
+    this.compactor = compactor;
   }
 
 }
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 278c121dd6..7f7509489a 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -125,6 +125,16 @@ import io.micrometer.core.instrument.MeterRegistry;
 
 public class Compactor extends AbstractServer implements MetricsProducer, 
CompactorService.Iface {
 
+  public interface FileCompactorRunnable extends Runnable {
+    /**
+     * Unable to create a constructor in an anonymous class so this method 
serves to initialize the
+     * object so that {@code #getFileCompactor()} returns a non-null reference.
+     */
+    void initialize() throws RetriesExceededException;
+
+    AtomicReference<FileCompactor> getFileCompactor();
+  }
+
   private static final SecureRandom random = new SecureRandom();
 
   public static class CompactorServerOpts extends ServerOpts {
@@ -490,23 +500,22 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
    * @param err reference to error
    * @return Runnable compaction job
    */
-  protected Runnable createCompactionJob(final TExternalCompactionJob job,
+  protected FileCompactorRunnable createCompactionJob(final 
TExternalCompactionJob job,
       final LongAdder totalInputEntries, final LongAdder totalInputBytes,
       final CountDownLatch started, final CountDownLatch stopped,
       final AtomicReference<Throwable> err) {
 
-    return () -> {
-      // Its only expected that a single compaction runs at a time. Multiple 
compactions running
-      // at a time could cause odd behavior like out of order and unexpected 
thrift calls to the
-      // coordinator. This is a sanity check to ensure the expectation is met. 
Should this check
-      // ever fail, it means there is a bug elsewhere.
-      Preconditions.checkState(compactionRunning.compareAndSet(false, true));
-      try {
+    return new FileCompactorRunnable() {
+
+      private AtomicReference<FileCompactor> compactor = new 
AtomicReference<>();
+
+      @Override
+      public void initialize() throws RetriesExceededException {
         LOG.info("Starting up compaction runnable for job: {}", job);
         TCompactionStatusUpdate update =
             new TCompactionStatusUpdate(TCompactionState.STARTED, "Compaction 
started", -1, -1, -1);
         updateCompactionState(job, update);
-        var extent = KeyExtent.fromThrift(job.getExtent());
+        final var extent = KeyExtent.fromThrift(job.getExtent());
         final AccumuloConfiguration aConfig;
         final TableConfiguration tConfig = 
getContext().getTableConfiguration(extent.tableId());
 
@@ -538,38 +547,58 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
         job.getIteratorSettings().getIterators()
             .forEach(tis -> 
iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
 
-        ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
-        FileCompactor compactor = new FileCompactor(getContext(), extent, 
files, outputFile,
-            job.isPropagateDeletes(), cenv, iters, aConfig, 
tConfig.getCryptoService());
-
-        LOG.trace("Starting compactor");
-        started.countDown();
-
-        org.apache.accumulo.server.compaction.CompactionStats stat = 
compactor.call();
-        TCompactionStats cs = new TCompactionStats();
-        cs.setEntriesRead(stat.getEntriesRead());
-        cs.setEntriesWritten(stat.getEntriesWritten());
-        cs.setFileSize(stat.getFileSize());
-        JOB_HOLDER.setStats(cs);
-
-        LOG.info("Compaction completed successfully {} ", 
job.getExternalCompactionId());
-        // Update state when completed
-        TCompactionStatusUpdate update2 = new 
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
-            "Compaction completed successfully", -1, -1, -1);
-        updateCompactionState(job, update2);
-      } catch (FileCompactor.CompactionCanceledException cce) {
-        LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
-        err.set(cce);
-      } catch (Exception e) {
-        KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
-        LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
-            fromThriftExtent, e);
-        err.set(e);
-      } finally {
-        stopped.countDown();
-        Preconditions.checkState(compactionRunning.compareAndSet(true, false));
+        final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
+        compactor.set(new FileCompactor(getContext(), extent, files, 
outputFile,
+            job.isPropagateDeletes(), cenv, iters, aConfig, 
tConfig.getCryptoService()));
+
+      }
+
+      @Override
+      public AtomicReference<FileCompactor> getFileCompactor() {
+        return compactor;
       }
+
+      @Override
+      public void run() {
+        Preconditions.checkState(compactor.get() != null, "initialize not 
called");
+        // Its only expected that a single compaction runs at a time. Multiple 
compactions running
+        // at a time could cause odd behavior like out of order and unexpected 
thrift calls to the
+        // coordinator. This is a sanity check to ensure the expectation is 
met. Should this check
+        // ever fail, it means there is a bug elsewhere.
+        Preconditions.checkState(compactionRunning.compareAndSet(false, true));
+        try {
+
+          LOG.trace("Starting compactor");
+          started.countDown();
+
+          org.apache.accumulo.server.compaction.CompactionStats stat = 
compactor.get().call();
+          TCompactionStats cs = new TCompactionStats();
+          cs.setEntriesRead(stat.getEntriesRead());
+          cs.setEntriesWritten(stat.getEntriesWritten());
+          cs.setFileSize(stat.getFileSize());
+          JOB_HOLDER.setStats(cs);
+
+          LOG.info("Compaction completed successfully {} ", 
job.getExternalCompactionId());
+          // Update state when completed
+          TCompactionStatusUpdate update2 = new 
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
+              "Compaction completed successfully", -1, -1, -1);
+          updateCompactionState(job, update2);
+        } catch (FileCompactor.CompactionCanceledException cce) {
+          LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
+          err.set(cce);
+        } catch (Exception e) {
+          KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
+          LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
+              fromThriftExtent, e);
+          err.set(e);
+        } finally {
+          stopped.countDown();
+          Preconditions.checkState(compactionRunning.compareAndSet(true, 
false));
+        }
+      }
+
     };
+
   }
 
   /**
@@ -686,13 +715,18 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
         final CountDownLatch started = new CountDownLatch(1);
         final CountDownLatch stopped = new CountDownLatch(1);
 
-        final Thread compactionThread = Threads.createThread(
-            "Compaction job for tablet " + job.getExtent().toString(),
-            createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err));
+        final FileCompactorRunnable fcr =
+            createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err);
 
-        JOB_HOLDER.set(job, compactionThread);
+        final Thread compactionThread =
+            Threads.createThread("Compaction job for tablet " + 
job.getExtent().toString(), fcr);
+
+        JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
 
         try {
+          // Need to call FileCompactorRunnable.initialize after calling 
JOB_HOLDER.set
+          fcr.initialize();
+
           compactionThread.start(); // start the compactionThread
           started.await(); // wait until the compactor is started
           final long inputEntries = totalInputEntries.sum();
diff --git 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index c747d9392c..8a8da9ae89 100644
--- 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Supplier;
 
+import org.apache.accumulo.compactor.Compactor.FileCompactorRunnable;
 import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
@@ -53,11 +54,13 @@ import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.FileCompactor;
 import 
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
+import org.easymock.EasyMock;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -76,7 +79,7 @@ import org.slf4j.LoggerFactory;
     "com.sun.org.apache.xerces.*"})
 public class CompactorTest {
 
-  public class SuccessfulCompaction implements Runnable {
+  public class SuccessfulCompaction implements FileCompactorRunnable {
 
     protected final Logger LOG = LoggerFactory.getLogger(this.getClass());
 
@@ -85,6 +88,7 @@ public class CompactorTest {
     protected final CountDownLatch started;
     protected final CountDownLatch stopped;
     protected final AtomicReference<Throwable> err;
+    private final FileCompactor compactor = 
EasyMock.createMock(FileCompactor.class);
 
     public SuccessfulCompaction(LongAdder totalInputEntries, LongAdder 
totalInputBytes,
         CountDownLatch started, CountDownLatch stopped, 
AtomicReference<Throwable> err) {
@@ -95,6 +99,14 @@ public class CompactorTest {
       this.stopped = stopped;
     }
 
+    @Override
+    public void initialize() throws RetriesExceededException {}
+
+    @Override
+    public AtomicReference<FileCompactor> getFileCompactor() {
+      return new AtomicReference<>(compactor);
+    }
+
     @Override
     public void run() {
       try {
@@ -106,6 +118,7 @@ public class CompactorTest {
         stopped.countDown();
       }
     }
+
   }
 
   public class FailedCompaction extends SuccessfulCompaction {
@@ -214,9 +227,9 @@ public class CompactorTest {
     protected synchronized void checkIfCanceled() {}
 
     @Override
-    protected Runnable createCompactionJob(TExternalCompactionJob job, 
LongAdder totalInputEntries,
-        LongAdder totalInputBytes, CountDownLatch started, CountDownLatch 
stopped,
-        AtomicReference<Throwable> err) {
+    protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob 
job,
+        LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch 
started,
+        CountDownLatch stopped, AtomicReference<Throwable> err) {
       return new SuccessfulCompaction(totalInputEntries, totalInputBytes, 
started, stopped, err);
     }
 
@@ -265,9 +278,9 @@ public class CompactorTest {
     }
 
     @Override
-    protected Runnable createCompactionJob(TExternalCompactionJob job, 
LongAdder totalInputEntries,
-        LongAdder totalInputBytes, CountDownLatch started, CountDownLatch 
stopped,
-        AtomicReference<Throwable> err) {
+    protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob 
job,
+        LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch 
started,
+        CountDownLatch stopped, AtomicReference<Throwable> err) {
       return new FailedCompaction(totalInputEntries, totalInputBytes, started, 
stopped, err);
     }
   }
@@ -280,9 +293,9 @@ public class CompactorTest {
     }
 
     @Override
-    protected Runnable createCompactionJob(TExternalCompactionJob job, 
LongAdder totalInputEntries,
-        LongAdder totalInputBytes, CountDownLatch started, CountDownLatch 
stopped,
-        AtomicReference<Throwable> err) {
+    protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob 
job,
+        LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch 
started,
+        CountDownLatch stopped, AtomicReference<Throwable> err) {
       return new InterruptedCompaction(totalInputEntries, totalInputBytes, 
started, stopped, err);
     }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index e2aab70256..1a670c81d1 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -29,6 +29,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -564,11 +566,22 @@ public class CompactableUtils {
     AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
         getOverrides(job.getKind(), tablet, cInfo.localHelper, 
job.getFiles()));
 
-    FileCompactor compactor = new FileCompactor(tablet.getContext(), 
tablet.getExtent(),
+    final FileCompactor compactor = new FileCompactor(tablet.getContext(), 
tablet.getExtent(),
         compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, 
compactionConfig,
         tableConf.getCryptoService());
 
-    return compactor.call();
+    final Runnable compactionCancellerTask = () -> {
+      if (!cenv.isCompactionEnabled()) {
+        compactor.interrupt();
+      }
+    };
+    final ScheduledFuture<?> future = 
tablet.getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(compactionCancellerTask, 10, 10, 
TimeUnit.SECONDS);
+    try {
+      return compactor.call();
+    } finally {
+      future.cancel(true);
+    }
   }
 
   /**
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index 6b4547252b..a97d8a37b4 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@ -32,7 +32,9 @@ import 
org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.compaction.FileCompactor;
 import 
org.apache.accumulo.server.compaction.FileCompactor.CompactionCanceledException;
+import 
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,39 +54,58 @@ public class ExternalDoNothingCompactor extends Compactor 
implements Iface {
   }
 
   @Override
-  protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder 
totalInputEntries,
-      LongAdder totalInputBytes, CountDownLatch started, CountDownLatch 
stopped,
-      AtomicReference<Throwable> err) {
+  protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob 
job,
+      LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch 
started,
+      CountDownLatch stopped, AtomicReference<Throwable> err) {
 
     // Set this to true so that only 1 external compaction is run
     this.shutdown = true;
 
-    return () -> {
-      try {
-        LOG.info("Starting up compaction runnable for job: {}", job);
-        TCompactionStatusUpdate update = new TCompactionStatusUpdate();
-        update.setState(TCompactionState.STARTED);
-        update.setMessage("Compaction started");
-        updateCompactionState(job, update);
+    return new FileCompactorRunnable() {
 
-        LOG.info("Starting compactor");
-        started.countDown();
+      final AtomicReference<FileCompactor> ref = new AtomicReference<>();
 
-        while (!JOB_HOLDER.isCancelled()) {
-          LOG.info("Sleeping while job is not cancelled");
-          UtilWaitThread.sleep(1000);
+      @Override
+      public AtomicReference<FileCompactor> getFileCompactor() {
+        return ref;
+      }
+
+      @Override
+      public void run() {
+        try {
+          LOG.info("Starting up compaction runnable for job: {}", job);
+          TCompactionStatusUpdate update = new TCompactionStatusUpdate();
+          update.setState(TCompactionState.STARTED);
+          update.setMessage("Compaction started");
+          updateCompactionState(job, update);
+
+          LOG.info("Starting compactor");
+          started.countDown();
+
+          while (!JOB_HOLDER.isCancelled()) {
+            LOG.info("Sleeping while job is not cancelled");
+            UtilWaitThread.sleep(1000);
+          }
+          // Compactor throws this exception when cancelled
+          throw new CompactionCanceledException();
+
+        } catch (Exception e) {
+          KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
+          LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
+              fromThriftExtent, e);
+          err.set(e);
+        } finally {
+          stopped.countDown();
         }
-        // Compactor throws this exception when cancelled
-        throw new CompactionCanceledException();
-
-      } catch (Exception e) {
-        KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
-        LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
-            fromThriftExtent, e);
-        err.set(e);
-      } finally {
-        stopped.countDown();
       }
+
+      @Override
+      public void initialize() throws RetriesExceededException {
+        // This isn't used, just need to create and return something
+        ref.set(new FileCompactor(getContext(), 
KeyExtent.fromThrift(job.getExtent()), null, null,
+            false, null, null, null, null));
+      }
+
     };
 
   }

Reply via email to