This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b0643beb5f Modified FileCompactor to set interruptFlag on RFile (#4628)
b0643beb5f is described below
commit b0643beb5f8d74a2049f5c09c57069796664991d
Author: Dave Marion <[email protected]>
AuthorDate: Wed Jun 5 08:11:24 2024 -0400
Modified FileCompactor to set interruptFlag on RFile (#4628)
Added interrupt method on FileCompactor that is set to true
in CompactableUtils when the majc env is no longer enabled.
FileCompactor.interrupt sets an AtomicBoolean to true, and
the reference to the AtomicBoolean is set on the RFiles. The
FileCompactor.interrupt is also called from the Compactor
during the already existing compaction cancellation process.
Fixes #4485
---
.../accumulo/server/compaction/FileCompactor.java | 16 +++
.../accumulo/compactor/CompactionJobHolder.java | 12 ++-
.../org/apache/accumulo/compactor/Compactor.java | 120 +++++++++++++--------
.../apache/accumulo/compactor/CompactorTest.java | 33 ++++--
.../accumulo/tserver/tablet/CompactableUtils.java | 17 ++-
.../compaction/ExternalDoNothingCompactor.java | 71 +++++++-----
6 files changed, 188 insertions(+), 81 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 77ec9f1696..3825a51d88 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
@@ -53,6 +54,7 @@ import
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
import
org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator;
import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator;
+import
org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
@@ -142,6 +144,12 @@ public class FileCompactor implements
Callable<CompactionStats> {
protected volatile Thread thread;
private final ServerContext context;
+ private final AtomicBoolean interruptFlag = new AtomicBoolean(false);
+
+ public void interrupt() {
+ interruptFlag.set(true);
+ }
+
public long getCompactorID() {
return compactorID;
}
@@ -347,6 +355,13 @@ public class FileCompactor implements
Callable<CompactionStats> {
} catch (CompactionCanceledException e) {
log.debug("Compaction canceled {}", extent);
throw e;
+ } catch (IterationInterruptedException iie) {
+ if (!env.isCompactionEnabled()) {
+ log.debug("Compaction canceled {}", extent);
+ throw new CompactionCanceledException();
+ }
+ log.debug("RFile interrupted {}", extent);
+ throw iie;
} catch (IOException | RuntimeException e) {
Collection<String> inputFileNames =
Collections2.transform(getFilesToCompact(),
StoredTabletFile::getFileName);
@@ -416,6 +431,7 @@ public class FileCompactor implements
Callable<CompactionStats> {
SortedKeyValueIterator<Key,Value> iter = new
ProblemReportingIterator(context,
extent.tableId(), mapFile.getPathStr(), false, reader);
+ ((ProblemReportingIterator) iter).setInterruptFlag(interruptFlag);
if (filesToCompact.get(mapFile).isTimeSet()) {
iter = new TimeSettingIterator(iter,
filesToCompact.get(mapFile).getTime());
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
index d609424c4b..38d03faaf2 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
@@ -19,16 +19,19 @@
package org.apache.accumulo.compactor;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.server.compaction.FileCompactor;
public class CompactionJobHolder {
private TExternalCompactionJob job;
private Thread compactionThread;
+ private AtomicReference<FileCompactor> compactor;
private volatile boolean cancelled = false;
private volatile TCompactionStats stats = null;
@@ -37,6 +40,7 @@ public class CompactionJobHolder {
public synchronized void reset() {
job = null;
compactionThread = null;
+ compactor = null;
cancelled = false;
stats = null;
}
@@ -61,6 +65,9 @@ public class CompactionJobHolder {
public synchronized boolean cancel(String extCompId) {
if (isSet() && getJob().getExternalCompactionId().equals(extCompId)) {
cancelled = true;
+ if (compactor.get() != null) {
+ compactor.get().interrupt();
+ }
compactionThread.interrupt();
return true;
}
@@ -75,11 +82,14 @@ public class CompactionJobHolder {
return (null != this.job);
}
- public synchronized void set(TExternalCompactionJob job, Thread
compactionThread) {
+ public synchronized void set(TExternalCompactionJob job, Thread
compactionThread,
+ AtomicReference<FileCompactor> compactor) {
Objects.requireNonNull(job, "CompactionJob is null");
Objects.requireNonNull(compactionThread, "Compaction thread is null");
+ Objects.requireNonNull(compactor, "Compactor object is null");
this.job = job;
this.compactionThread = compactionThread;
+ this.compactor = compactor;
}
}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 278c121dd6..7f7509489a 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -125,6 +125,16 @@ import io.micrometer.core.instrument.MeterRegistry;
public class Compactor extends AbstractServer implements MetricsProducer,
CompactorService.Iface {
+ public interface FileCompactorRunnable extends Runnable {
+ /**
+ * Unable to create a constructor in an anonymous class so this method
serves to initialize the
+ * object so that {@code #getFileCompactor()} returns a non-null reference.
+ */
+ void initialize() throws RetriesExceededException;
+
+ AtomicReference<FileCompactor> getFileCompactor();
+ }
+
private static final SecureRandom random = new SecureRandom();
public static class CompactorServerOpts extends ServerOpts {
@@ -490,23 +500,22 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
* @param err reference to error
* @return Runnable compaction job
*/
- protected Runnable createCompactionJob(final TExternalCompactionJob job,
+ protected FileCompactorRunnable createCompactionJob(final
TExternalCompactionJob job,
final LongAdder totalInputEntries, final LongAdder totalInputBytes,
final CountDownLatch started, final CountDownLatch stopped,
final AtomicReference<Throwable> err) {
- return () -> {
- // Its only expected that a single compaction runs at a time. Multiple
compactions running
- // at a time could cause odd behavior like out of order and unexpected
thrift calls to the
- // coordinator. This is a sanity check to ensure the expectation is met.
Should this check
- // ever fail, it means there is a bug elsewhere.
- Preconditions.checkState(compactionRunning.compareAndSet(false, true));
- try {
+ return new FileCompactorRunnable() {
+
+ private AtomicReference<FileCompactor> compactor = new
AtomicReference<>();
+
+ @Override
+ public void initialize() throws RetriesExceededException {
LOG.info("Starting up compaction runnable for job: {}", job);
TCompactionStatusUpdate update =
new TCompactionStatusUpdate(TCompactionState.STARTED, "Compaction
started", -1, -1, -1);
updateCompactionState(job, update);
- var extent = KeyExtent.fromThrift(job.getExtent());
+ final var extent = KeyExtent.fromThrift(job.getExtent());
final AccumuloConfiguration aConfig;
final TableConfiguration tConfig =
getContext().getTableConfiguration(extent.tableId());
@@ -538,38 +547,58 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
job.getIteratorSettings().getIterators()
.forEach(tis ->
iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
- ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
- FileCompactor compactor = new FileCompactor(getContext(), extent,
files, outputFile,
- job.isPropagateDeletes(), cenv, iters, aConfig,
tConfig.getCryptoService());
-
- LOG.trace("Starting compactor");
- started.countDown();
-
- org.apache.accumulo.server.compaction.CompactionStats stat =
compactor.call();
- TCompactionStats cs = new TCompactionStats();
- cs.setEntriesRead(stat.getEntriesRead());
- cs.setEntriesWritten(stat.getEntriesWritten());
- cs.setFileSize(stat.getFileSize());
- JOB_HOLDER.setStats(cs);
-
- LOG.info("Compaction completed successfully {} ",
job.getExternalCompactionId());
- // Update state when completed
- TCompactionStatusUpdate update2 = new
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
- "Compaction completed successfully", -1, -1, -1);
- updateCompactionState(job, update2);
- } catch (FileCompactor.CompactionCanceledException cce) {
- LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
- err.set(cce);
- } catch (Exception e) {
- KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
- LOG.error("Compaction failed: id: {}, extent: {}",
job.getExternalCompactionId(),
- fromThriftExtent, e);
- err.set(e);
- } finally {
- stopped.countDown();
- Preconditions.checkState(compactionRunning.compareAndSet(true, false));
+ final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
+ compactor.set(new FileCompactor(getContext(), extent, files,
outputFile,
+ job.isPropagateDeletes(), cenv, iters, aConfig,
tConfig.getCryptoService()));
+
+ }
+
+ @Override
+ public AtomicReference<FileCompactor> getFileCompactor() {
+ return compactor;
}
+
+ @Override
+ public void run() {
+ Preconditions.checkState(compactor.get() != null, "initialize not
called");
+ // Its only expected that a single compaction runs at a time. Multiple
compactions running
+ // at a time could cause odd behavior like out of order and unexpected
thrift calls to the
+ // coordinator. This is a sanity check to ensure the expectation is
met. Should this check
+ // ever fail, it means there is a bug elsewhere.
+ Preconditions.checkState(compactionRunning.compareAndSet(false, true));
+ try {
+
+ LOG.trace("Starting compactor");
+ started.countDown();
+
+ org.apache.accumulo.server.compaction.CompactionStats stat =
compactor.get().call();
+ TCompactionStats cs = new TCompactionStats();
+ cs.setEntriesRead(stat.getEntriesRead());
+ cs.setEntriesWritten(stat.getEntriesWritten());
+ cs.setFileSize(stat.getFileSize());
+ JOB_HOLDER.setStats(cs);
+
+ LOG.info("Compaction completed successfully {} ",
job.getExternalCompactionId());
+ // Update state when completed
+ TCompactionStatusUpdate update2 = new
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
+ "Compaction completed successfully", -1, -1, -1);
+ updateCompactionState(job, update2);
+ } catch (FileCompactor.CompactionCanceledException cce) {
+ LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
+ err.set(cce);
+ } catch (Exception e) {
+ KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
+ LOG.error("Compaction failed: id: {}, extent: {}",
job.getExternalCompactionId(),
+ fromThriftExtent, e);
+ err.set(e);
+ } finally {
+ stopped.countDown();
+ Preconditions.checkState(compactionRunning.compareAndSet(true,
false));
+ }
+ }
+
};
+
}
/**
@@ -686,13 +715,18 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
final CountDownLatch started = new CountDownLatch(1);
final CountDownLatch stopped = new CountDownLatch(1);
- final Thread compactionThread = Threads.createThread(
- "Compaction job for tablet " + job.getExtent().toString(),
- createCompactionJob(job, totalInputEntries, totalInputBytes,
started, stopped, err));
+ final FileCompactorRunnable fcr =
+ createCompactionJob(job, totalInputEntries, totalInputBytes,
started, stopped, err);
- JOB_HOLDER.set(job, compactionThread);
+ final Thread compactionThread =
+ Threads.createThread("Compaction job for tablet " +
job.getExtent().toString(), fcr);
+
+ JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
try {
+ // Need to call FileCompactorRunnable.initialize after calling
JOB_HOLDER.set
+ fcr.initialize();
+
compactionThread.start(); // start the compactionThread
started.await(); // wait until the compactor is started
final long inputEntries = totalInputEntries.sum();
diff --git
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index c747d9392c..8a8da9ae89 100644
---
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
+import org.apache.accumulo.compactor.Compactor.FileCompactorRunnable;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
import org.apache.accumulo.core.conf.ConfigurationCopy;
@@ -53,11 +54,13 @@ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.FileCompactor;
import
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
+import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -76,7 +79,7 @@ import org.slf4j.LoggerFactory;
"com.sun.org.apache.xerces.*"})
public class CompactorTest {
- public class SuccessfulCompaction implements Runnable {
+ public class SuccessfulCompaction implements FileCompactorRunnable {
protected final Logger LOG = LoggerFactory.getLogger(this.getClass());
@@ -85,6 +88,7 @@ public class CompactorTest {
protected final CountDownLatch started;
protected final CountDownLatch stopped;
protected final AtomicReference<Throwable> err;
+ private final FileCompactor compactor =
EasyMock.createMock(FileCompactor.class);
public SuccessfulCompaction(LongAdder totalInputEntries, LongAdder
totalInputBytes,
CountDownLatch started, CountDownLatch stopped,
AtomicReference<Throwable> err) {
@@ -95,6 +99,14 @@ public class CompactorTest {
this.stopped = stopped;
}
+ @Override
+ public void initialize() throws RetriesExceededException {}
+
+ @Override
+ public AtomicReference<FileCompactor> getFileCompactor() {
+ return new AtomicReference<>(compactor);
+ }
+
@Override
public void run() {
try {
@@ -106,6 +118,7 @@ public class CompactorTest {
stopped.countDown();
}
}
+
}
public class FailedCompaction extends SuccessfulCompaction {
@@ -214,9 +227,9 @@ public class CompactorTest {
protected synchronized void checkIfCanceled() {}
@Override
- protected Runnable createCompactionJob(TExternalCompactionJob job,
LongAdder totalInputEntries,
- LongAdder totalInputBytes, CountDownLatch started, CountDownLatch
stopped,
- AtomicReference<Throwable> err) {
+ protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob
job,
+ LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch
started,
+ CountDownLatch stopped, AtomicReference<Throwable> err) {
return new SuccessfulCompaction(totalInputEntries, totalInputBytes,
started, stopped, err);
}
@@ -265,9 +278,9 @@ public class CompactorTest {
}
@Override
- protected Runnable createCompactionJob(TExternalCompactionJob job,
LongAdder totalInputEntries,
- LongAdder totalInputBytes, CountDownLatch started, CountDownLatch
stopped,
- AtomicReference<Throwable> err) {
+ protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob
job,
+ LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch
started,
+ CountDownLatch stopped, AtomicReference<Throwable> err) {
return new FailedCompaction(totalInputEntries, totalInputBytes, started,
stopped, err);
}
}
@@ -280,9 +293,9 @@ public class CompactorTest {
}
@Override
- protected Runnable createCompactionJob(TExternalCompactionJob job,
LongAdder totalInputEntries,
- LongAdder totalInputBytes, CountDownLatch started, CountDownLatch
stopped,
- AtomicReference<Throwable> err) {
+ protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob
job,
+ LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch
started,
+ CountDownLatch stopped, AtomicReference<Throwable> err) {
return new InterruptedCompaction(totalInputEntries, totalInputBytes,
started, stopped, err);
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index e2aab70256..1a670c81d1 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -29,6 +29,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -564,11 +566,22 @@ public class CompactableUtils {
AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
getOverrides(job.getKind(), tablet, cInfo.localHelper,
job.getFiles()));
- FileCompactor compactor = new FileCompactor(tablet.getContext(),
tablet.getExtent(),
+ final FileCompactor compactor = new FileCompactor(tablet.getContext(),
tablet.getExtent(),
compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters,
compactionConfig,
tableConf.getCryptoService());
- return compactor.call();
+ final Runnable compactionCancellerTask = () -> {
+ if (!cenv.isCompactionEnabled()) {
+ compactor.interrupt();
+ }
+ };
+ final ScheduledFuture<?> future =
tablet.getContext().getScheduledExecutor()
+ .scheduleWithFixedDelay(compactionCancellerTask, 10, 10,
TimeUnit.SECONDS);
+ try {
+ return compactor.call();
+ } finally {
+ future.cancel(true);
+ }
}
/**
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index 6b4547252b..a97d8a37b4 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@ -32,7 +32,9 @@ import
org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.compaction.FileCompactor;
import
org.apache.accumulo.server.compaction.FileCompactor.CompactionCanceledException;
+import
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,39 +54,58 @@ public class ExternalDoNothingCompactor extends Compactor
implements Iface {
}
@Override
- protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder
totalInputEntries,
- LongAdder totalInputBytes, CountDownLatch started, CountDownLatch
stopped,
- AtomicReference<Throwable> err) {
+ protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob
job,
+ LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch
started,
+ CountDownLatch stopped, AtomicReference<Throwable> err) {
// Set this to true so that only 1 external compaction is run
this.shutdown = true;
- return () -> {
- try {
- LOG.info("Starting up compaction runnable for job: {}", job);
- TCompactionStatusUpdate update = new TCompactionStatusUpdate();
- update.setState(TCompactionState.STARTED);
- update.setMessage("Compaction started");
- updateCompactionState(job, update);
+ return new FileCompactorRunnable() {
- LOG.info("Starting compactor");
- started.countDown();
+ final AtomicReference<FileCompactor> ref = new AtomicReference<>();
- while (!JOB_HOLDER.isCancelled()) {
- LOG.info("Sleeping while job is not cancelled");
- UtilWaitThread.sleep(1000);
+ @Override
+ public AtomicReference<FileCompactor> getFileCompactor() {
+ return ref;
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.info("Starting up compaction runnable for job: {}", job);
+ TCompactionStatusUpdate update = new TCompactionStatusUpdate();
+ update.setState(TCompactionState.STARTED);
+ update.setMessage("Compaction started");
+ updateCompactionState(job, update);
+
+ LOG.info("Starting compactor");
+ started.countDown();
+
+ while (!JOB_HOLDER.isCancelled()) {
+ LOG.info("Sleeping while job is not cancelled");
+ UtilWaitThread.sleep(1000);
+ }
+ // Compactor throws this exception when cancelled
+ throw new CompactionCanceledException();
+
+ } catch (Exception e) {
+ KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
+ LOG.error("Compaction failed: id: {}, extent: {}",
job.getExternalCompactionId(),
+ fromThriftExtent, e);
+ err.set(e);
+ } finally {
+ stopped.countDown();
}
- // Compactor throws this exception when cancelled
- throw new CompactionCanceledException();
-
- } catch (Exception e) {
- KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
- LOG.error("Compaction failed: id: {}, extent: {}",
job.getExternalCompactionId(),
- fromThriftExtent, e);
- err.set(e);
- } finally {
- stopped.countDown();
}
+
+ @Override
+ public void initialize() throws RetriesExceededException {
+ // This isn't used, just need to create and return something
+ ref.set(new FileCompactor(getContext(),
KeyExtent.fromThrift(job.getExtent()), null, null,
+ false, null, null, null, null));
+ }
+
};
}