This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit ee197905e99a7680ad3420d990b27aca5c9fb72d
Merge: 24267d01d3 b0643beb5f
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Jun 5 13:05:17 2024 +0000

    Merge branch '2.1'

 .../accumulo/server/compaction/FileCompactor.java  |  16 +++
 .../accumulo/compactor/CompactionJobHolder.java    |  12 ++-
 .../org/apache/accumulo/compactor/Compactor.java   | 120 +++++++++++++--------
 .../accumulo/tserver/tablet/CompactableUtils.java  |  17 ++-
 .../compaction/ExternalDoNothingCompactor.java     |  71 +++++++-----
 5 files changed, 165 insertions(+), 71 deletions(-)

diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index cebdce684d,3825a51d88..403ac2ce60
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@@ -32,7 -32,7 +32,8 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.Set;
  import java.util.concurrent.Callable;
+ import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
  import java.util.concurrent.atomic.LongAdder;
  
@@@ -54,11 -54,13 +55,12 @@@ import org.apache.accumulo.core.iterato
  import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
  import 
org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator;
  import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator;
 +import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
+ import 
org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
  import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
 -import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
 -import org.apache.accumulo.core.metadata.MetadataTable;
 -import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.AccumuloTable;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
  import org.apache.accumulo.core.metadata.StoredTabletFile;
 -import org.apache.accumulo.core.metadata.TabletFile;
  import org.apache.accumulo.core.metadata.schema.DataFileValue;
  import org.apache.accumulo.core.spi.crypto.CryptoService;
  import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
@@@ -421,10 -429,13 +436,11 @@@ public class FileCompactor implements C
  
          readers.add(reader);
  
 -        SortedKeyValueIterator<Key,Value> iter = new 
ProblemReportingIterator(context,
 -            extent.tableId(), mapFile.getPathStr(), false, reader);
 -        ((ProblemReportingIterator) iter).setInterruptFlag(interruptFlag);
 +        InterruptibleIterator iter = new ProblemReportingIterator(context, 
extent.tableId(),
 +            dataFile.getNormalizedPathStr(), false, reader);
++        iter.setInterruptFlag(interruptFlag);
  
 -        if (filesToCompact.get(mapFile).isTimeSet()) {
 -          iter = new TimeSettingIterator(iter, 
filesToCompact.get(mapFile).getTime());
 -        }
 +        iter = filesToCompact.get(dataFile).wrapFileIterator(iter);
  
          iters.add(iter);
  
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 1aab26fa0c,7f7509489a..4ba279162e
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -127,7 -125,29 +127,17 @@@ import io.micrometer.core.instrument.Me
  
  public class Compactor extends AbstractServer implements MetricsProducer, 
CompactorService.Iface {
  
+   public interface FileCompactorRunnable extends Runnable {
+     /**
+      * Unable to create a constructor in an anonymous class so this method 
serves to initialize the
+      * object so that {@code #getFileCompactor()} returns a non-null 
reference.
+      */
+     void initialize() throws RetriesExceededException;
+ 
+     AtomicReference<FileCompactor> getFileCompactor();
+   }
+ 
 -  private static final SecureRandom random = new SecureRandom();
 -
 -  public static class CompactorServerOpts extends ServerOpts {
 -    @Parameter(required = true, names = {"-q", "--queue"}, description = 
"compaction queue name")
 -    private String queueName = null;
 -
 -    public String getQueueName() {
 -      return queueName;
 -    }
 -  }
 -
    private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
 -  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
    private static final long TIME_BETWEEN_CANCEL_CHECKS = MINUTES.toMillis(5);
  
    private static final long TEN_MEGABYTES = 10485760;
@@@ -530,39 -547,58 +539,59 @@@
          job.getIteratorSettings().getIterators()
              .forEach(tis -> 
iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
  
-         ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
-         FileCompactor compactor =
+         final ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
 -        compactor.set(new FileCompactor(getContext(), extent, files, 
outputFile,
 -            job.isPropagateDeletes(), cenv, iters, aConfig, 
tConfig.getCryptoService()));
++        compactor.set(
 +            new FileCompactor(getContext(), extent, files, outputFile, 
job.isPropagateDeletes(),
-                 cenv, iters, aConfig, tConfig.getCryptoService(), 
pausedMetrics);
- 
-         LOG.trace("Starting compactor");
-         started.countDown();
- 
-         org.apache.accumulo.server.compaction.CompactionStats stat = 
compactor.call();
-         TCompactionStats cs = new TCompactionStats();
-         cs.setEntriesRead(stat.getEntriesRead());
-         cs.setEntriesWritten(stat.getEntriesWritten());
-         cs.setFileSize(stat.getFileSize());
-         JOB_HOLDER.setStats(cs);
- 
-         LOG.info("Compaction completed successfully {} ", 
job.getExternalCompactionId());
-         // Update state when completed
-         TCompactionStatusUpdate update2 = new 
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
-             "Compaction completed successfully", -1, -1, -1);
-         updateCompactionState(job, update2);
-       } catch (FileCompactor.CompactionCanceledException cce) {
-         LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
-         err.set(cce);
-       } catch (Exception e) {
-         KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
-         LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
-             fromThriftExtent, e);
-         err.set(e);
-       } finally {
-         stopped.countDown();
-         Preconditions.checkState(compactionRunning.compareAndSet(true, 
false));
++                cenv, iters, aConfig, tConfig.getCryptoService(), 
pausedMetrics));
+ 
+       }
+ 
+       @Override
+       public AtomicReference<FileCompactor> getFileCompactor() {
+         return compactor;
        }
+ 
+       @Override
+       public void run() {
+         Preconditions.checkState(compactor.get() != null, "initialize not 
called");
+         // Its only expected that a single compaction runs at a time. 
Multiple compactions running
+         // at a time could cause odd behavior like out of order and 
unexpected thrift calls to the
+         // coordinator. This is a sanity check to ensure the expectation is 
met. Should this check
+         // ever fail, it means there is a bug elsewhere.
+         Preconditions.checkState(compactionRunning.compareAndSet(false, 
true));
+         try {
+ 
+           LOG.trace("Starting compactor");
+           started.countDown();
+ 
+           org.apache.accumulo.server.compaction.CompactionStats stat = 
compactor.get().call();
+           TCompactionStats cs = new TCompactionStats();
+           cs.setEntriesRead(stat.getEntriesRead());
+           cs.setEntriesWritten(stat.getEntriesWritten());
+           cs.setFileSize(stat.getFileSize());
+           JOB_HOLDER.setStats(cs);
+ 
+           LOG.info("Compaction completed successfully {} ", 
job.getExternalCompactionId());
+           // Update state when completed
+           TCompactionStatusUpdate update2 = new 
TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
+               "Compaction completed successfully", -1, -1, -1);
+           updateCompactionState(job, update2);
+         } catch (FileCompactor.CompactionCanceledException cce) {
+           LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
+           err.set(cce);
+         } catch (Exception e) {
+           KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
+           LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
+               fromThriftExtent, e);
+           err.set(e);
+         } finally {
+           stopped.countDown();
+           Preconditions.checkState(compactionRunning.compareAndSet(true, 
false));
+         }
+       }
+ 
      };
+ 
    }
  
    /**
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index 0cff077fbc,1a670c81d1..7239e62f4d
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@@ -27,6 -28,9 +27,8 @@@ import java.util.Objects
  import java.util.Optional;
  import java.util.Set;
  import java.util.SortedMap;
 -import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ScheduledFuture;
+ import java.util.concurrent.TimeUnit;
  import java.util.function.Predicate;
  import java.util.stream.Collectors;
  
@@@ -402,15 -563,25 +404,26 @@@ public class CompactableUtils 
        throws IOException, CompactionCanceledException {
      TableConfiguration tableConf = tablet.getTableConfiguration();
  
 -    AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
 -        getOverrides(job.getKind(), tablet, cInfo.localHelper, 
job.getFiles()));
 +    AccumuloConfiguration compactionConfig =
 +        getCompactionConfig(tableConf, getOverrides(job.getKind(), tablet, 
cInfo.localHelper,
 +            job.getFiles(), cInfo.selectedFiles));
  
-     FileCompactor compactor = new FileCompactor(tablet.getContext(), 
tablet.getExtent(),
+     final FileCompactor compactor = new FileCompactor(tablet.getContext(), 
tablet.getExtent(),
          compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, 
compactionConfig,
 -        tableConf.getCryptoService());
 +        tableConf.getCryptoService(), tablet.getPausedCompactionMetrics());
  
-     return compactor.call();
+     final Runnable compactionCancellerTask = () -> {
+       if (!cenv.isCompactionEnabled()) {
+         compactor.interrupt();
+       }
+     };
+     final ScheduledFuture<?> future = 
tablet.getContext().getScheduledExecutor()
+         .scheduleWithFixedDelay(compactionCancellerTask, 10, 10, 
TimeUnit.SECONDS);
+     try {
+       return compactor.call();
+     } finally {
+       future.cancel(true);
+     }
    }
  
    /**
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index c1f4d6f352,a97d8a37b4..7365c7ab0d
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@@ -60,32 -61,51 +62,51 @@@ public class ExternalDoNothingCompacto
      // Set this to true so that only 1 external compaction is run
      this.shutdown = true;
  
-     return () -> {
-       try {
-         LOG.info("Starting up compaction runnable for job: {}", job);
-         TCompactionStatusUpdate update = new TCompactionStatusUpdate();
-         update.setState(TCompactionState.STARTED);
-         update.setMessage("Compaction started");
-         updateCompactionState(job, update);
+     return new FileCompactorRunnable() {
  
-         LOG.info("Starting compactor");
-         started.countDown();
+       final AtomicReference<FileCompactor> ref = new AtomicReference<>();
  
-         while (!JOB_HOLDER.isCancelled()) {
-           LOG.info("Sleeping while job is not cancelled");
-           UtilWaitThread.sleep(1000);
+       @Override
+       public AtomicReference<FileCompactor> getFileCompactor() {
+         return ref;
+       }
+ 
+       @Override
+       public void run() {
+         try {
+           LOG.info("Starting up compaction runnable for job: {}", job);
+           TCompactionStatusUpdate update = new TCompactionStatusUpdate();
+           update.setState(TCompactionState.STARTED);
+           update.setMessage("Compaction started");
+           updateCompactionState(job, update);
+ 
+           LOG.info("Starting compactor");
+           started.countDown();
+ 
+           while (!JOB_HOLDER.isCancelled()) {
+             LOG.info("Sleeping while job is not cancelled");
+             UtilWaitThread.sleep(1000);
+           }
+           // Compactor throws this exception when cancelled
+           throw new CompactionCanceledException();
+ 
+         } catch (Exception e) {
+           KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
+           LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
+               fromThriftExtent, e);
+           err.set(e);
+         } finally {
+           stopped.countDown();
          }
-         // Compactor throws this exception when cancelled
-         throw new CompactionCanceledException();
- 
-       } catch (Exception e) {
-         KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
-         LOG.error("Compaction failed: id: {}, extent: {}", 
job.getExternalCompactionId(),
-             fromThriftExtent, e);
-         err.set(e);
-       } finally {
-         stopped.countDown();
        }
+ 
+       @Override
+       public void initialize() throws RetriesExceededException {
+         // This isn't used, just need to create and return something
+         ref.set(new FileCompactor(getContext(), 
KeyExtent.fromThrift(job.getExtent()), null, null,
 -            false, null, null, null, null));
++            false, null, null, null, null, null));
+       }
+ 
      };
  
    }

Reply via email to