This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new a9100ac  Refactored code that does bookeeping for compacting files. 
(#2213)
a9100ac is described below

commit a9100ac35f3d72677b767d7670dfb032c7e7d733
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Aug 3 17:32:03 2021 -0400

    Refactored code that does bookeeping for compacting files. (#2213)
---
 .../accumulo/tserver/tablet/CompactableImpl.java   | 787 +++++++++++++--------
 1 file changed, 497 insertions(+), 290 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index d4562cc..3efbfe5 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -100,34 +100,20 @@ public class CompactableImpl implements Compactable {
 
   private final Tablet tablet;
 
-  private Set<StoredTabletFile> allCompactingFiles = new HashSet<>();
-  private Set<CompactionJob> runnningJobs = new HashSet<>();
-  private volatile boolean compactionRunning = false;
-
-  private Set<StoredTabletFile> selectedFiles = new HashSet<>();
-
-  private Set<StoredTabletFile> allFilesWhenChopStarted = new HashSet<>();
+  private final FileManager fileMgr;
 
-  // track files produced by compactions of this tablet, those are considered 
chopped
-  private Set<StoredTabletFile> choppedFiles = new HashSet<>();
-  private FileSelectionStatus chopStatus = FileSelectionStatus.NOT_ACTIVE;
+  private Set<CompactionJob> runningJobs = new HashSet<>();
+  private volatile boolean compactionRunning = false;
 
   private Supplier<Set<CompactionServiceId>> servicesInUse;
 
   private Set<CompactionServiceId> servicesUsed = new 
ConcurrentSkipListSet<>();
 
   // status of special compactions
-  private enum FileSelectionStatus {
+  enum FileSelectionStatus {
     NEW, SELECTING, SELECTED, NOT_ACTIVE, CANCELED
   }
 
-  private FileSelectionStatus selectStatus = FileSelectionStatus.NOT_ACTIVE;
-  private CompactionKind selectKind = null;
-  // Tracks if when a set of files was selected, if at that time the set was 
all of the tablets
-  // files. Because a set of selected files can be compacted over one or more 
compactions, its
-  // important to track this in order to know if the last compaction is a full 
compaction and should
-  // not propagate deletes.
-  private boolean initiallySelectedAll = false;
   private CompactionHelper chelper = null;
   private Long compactionId;
   private CompactionConfig compactionConfig;
@@ -155,6 +141,417 @@ public class CompactableImpl implements Compactable {
 
   }
 
+  static class SelectedInfo {
+    final boolean initiallySelectedAll;
+    final Set<StoredTabletFile> selectedFiles;
+    final CompactionKind selectKind;
+
+    public SelectedInfo(boolean initiallySelectedAll, Set<StoredTabletFile> 
selectedFiles,
+        CompactionKind selectKind) {
+      this.initiallySelectedAll = initiallySelectedAll;
+      this.selectedFiles = Set.copyOf(selectedFiles);
+      this.selectKind = selectKind;
+    }
+  }
+
+  /**
+   * This class tracks status of a tablets files for compactions for {@link 
CompactableImpl} owning
+   * the following functionality.
+   *
+   * <UL>
+   * <LI>Tracks which files are reserved for compactions
+   * <LI>Determines which files are available for compactions
+   * <LI>Tracks which files are chopped and which need to be chopped
+   * <LI>Tracks which files are selected for user and selector compactions
+   * <LI>Coordinates the file selection process
+   * </UL>
+   *
+   * <p>
+   * The class is structured in such a way that the above functionality can be 
unit tested.
+   *
+   * <p>
+   * This class does no synchronization of its own and relies on 
CompactableImpl to do all needed
+   * synchronization. CompactableImpl must makes changes to files and other 
state like running jobs
+   * in a mutually exclusive manner, so synchronization at this level is 
unnecessary.
+   *
+   */
+  private class FileManager {
+
+    FileSelectionStatus selectStatus = FileSelectionStatus.NOT_ACTIVE;
+    private CompactionKind selectKind = null;
+
+    // Tracks if when a set of files was selected, if at that time the set was 
all of the tablets
+    // files. Because a set of selected files can be compacted over one or 
more compactions, its
+    // important to track this in order to know if the last compaction is a 
full compaction and
+    // should
+    // not propagate deletes.
+    private boolean initiallySelectedAll = false;
+    private Set<StoredTabletFile> selectedFiles = new HashSet<>();
+
+    private Set<StoredTabletFile> allCompactingFiles = new HashSet<>();
+
+    // track files produced by compactions of this tablet, those are 
considered chopped
+    private Set<StoredTabletFile> choppedFiles = new HashSet<>();
+    private FileSelectionStatus chopStatus = FileSelectionStatus.NOT_ACTIVE;
+    private Set<StoredTabletFile> allFilesWhenChopStarted = new HashSet<>();
+
+    private final KeyExtent extent;
+
+    public FileManager(KeyExtent extent, Collection<StoredTabletFile> 
extCompactingFiles,
+        Optional<SelectedInfo> extSelInfo) {
+      this.extent = extent;
+      allCompactingFiles.addAll(extCompactingFiles);
+      if (extSelInfo.isPresent()) {
+        this.selectedFiles.addAll(extSelInfo.get().selectedFiles);
+        this.selectKind = extSelInfo.get().selectKind;
+        this.initiallySelectedAll = extSelInfo.get().initiallySelectedAll;
+        this.selectStatus = FileSelectionStatus.SELECTED;
+
+        log.debug("Selected compaction status initialized from external 
compactions {} {} {} {}",
+            getExtent(), selectStatus, initiallySelectedAll, 
asFileNames(selectedFiles));
+      }
+    }
+
+    FileSelectionStatus getSelectionStatus() {
+      return selectStatus;
+    }
+
+    CompactionKind getSelectionKind() {
+      return selectKind;
+    }
+
+    SelectedInfo getSelectedInfo() {
+      Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTED);
+      return new SelectedInfo(initiallySelectedAll, selectedFiles, selectKind);
+    }
+
+    boolean initiateSelection(CompactionKind kind) {
+
+      if (selectStatus == FileSelectionStatus.NOT_ACTIVE || (kind == 
CompactionKind.USER
+          && selectKind == CompactionKind.SELECTOR && 
noneRunning(CompactionKind.SELECTOR))) {
+        selectStatus = FileSelectionStatus.NEW;
+        selectKind = kind;
+        selectedFiles.clear();
+        initiallySelectedAll = false;
+        return true;
+      }
+
+      return false;
+
+    }
+
+    boolean beginSelection() {
+      if (selectStatus == FileSelectionStatus.NEW && 
allCompactingFiles.isEmpty()) {
+        selectStatus = FileSelectionStatus.SELECTING;
+        log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
+        return true;
+      }
+
+      return false;
+    }
+
+    void finishSelection(Set<StoredTabletFile> selected, boolean allSelected) {
+      Preconditions.checkArgument(!selected.isEmpty());
+      Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTING);
+      selectStatus = FileSelectionStatus.SELECTED;
+      selectedFiles.clear();
+      selectedFiles.addAll(selected);
+      initiallySelectedAll = allSelected;
+      log.trace("Selected compaction status changed {} {} {} {}", getExtent(), 
selectStatus,
+          initiallySelectedAll, asFileNames(selectedFiles));
+      TabletLogger.selected(getExtent(), selectKind, selectedFiles);
+    }
+
+    void cancelSelection() {
+      Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTING);
+      selectStatus = FileSelectionStatus.NOT_ACTIVE;
+      log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
+    }
+
+    boolean isSelected(CompactionKind kind) {
+      return selectStatus == FileSelectionStatus.SELECTED && kind == 
selectKind;
+    }
+
+    FileSelectionStatus getChopStatus() {
+      return chopStatus;
+    }
+
+    ChopSelector initiateChop(Set<StoredTabletFile> allFiles) {
+      Preconditions.checkState(chopStatus == FileSelectionStatus.NOT_ACTIVE);
+      Set<StoredTabletFile> filesToExamine = new HashSet<>(allFiles);
+      chopStatus = FileSelectionStatus.SELECTING;
+      filesToExamine.removeAll(choppedFiles);
+      filesToExamine.removeAll(allCompactingFiles);
+      return new ChopSelector(allFiles, filesToExamine);
+    }
+
+    class ChopSelector {
+      private Set<StoredTabletFile> allFiles;
+      private Set<StoredTabletFile> filesToExamine;
+
+      private ChopSelector(Set<StoredTabletFile> allFiles, 
Set<StoredTabletFile> filesToExamine) {
+        this.allFiles = allFiles;
+        this.filesToExamine = filesToExamine;
+      }
+
+      void selectChopFiles(Set<StoredTabletFile> unchoppedFiles) {
+        Preconditions.checkState(chopStatus == FileSelectionStatus.SELECTING);
+        choppedFiles.addAll(Sets.difference(filesToExamine, unchoppedFiles));
+        chopStatus = FileSelectionStatus.SELECTED;
+        allFilesWhenChopStarted.clear();
+        allFilesWhenChopStarted.addAll(allFiles);
+
+        var filesToChop = getFilesToChop(allFiles);
+        if (!filesToChop.isEmpty()) {
+          TabletLogger.selected(getExtent(), CompactionKind.CHOP, filesToChop);
+        }
+      }
+
+      Set<StoredTabletFile> getFilesToExamine() {
+        return Collections.unmodifiableSet(filesToExamine);
+      }
+    }
+
+    boolean finishChop(Set<StoredTabletFile> allFiles) {
+
+      boolean completed = false;
+
+      if (chopStatus == FileSelectionStatus.SELECTED) {
+        if (getFilesToChop(allFiles).isEmpty()) {
+          chopStatus = FileSelectionStatus.NOT_ACTIVE;
+          completed = true;
+        }
+      }
+
+      choppedFiles.retainAll(allFiles);
+
+      return completed;
+    }
+
+    void addChoppedFiles(Collection<StoredTabletFile> files) {
+      choppedFiles.addAll(files);
+    }
+
+    void userCompactionCanceled() {
+      if (isSelected(CompactionKind.USER)) {
+        if (noneRunning(CompactionKind.USER)) {
+          selectStatus = FileSelectionStatus.NOT_ACTIVE;
+          log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
+        } else {
+          selectStatus = FileSelectionStatus.CANCELED;
+          log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
+        }
+      }
+    }
+
+    private Set<StoredTabletFile> getFilesToChop(Set<StoredTabletFile> 
allFiles) {
+      Preconditions.checkState(chopStatus == FileSelectionStatus.SELECTED);
+      var copy = new HashSet<>(allFilesWhenChopStarted);
+      copy.retainAll(allFiles);
+      copy.removeAll(choppedFiles);
+      return copy;
+    }
+
+    /**
+     * @return The set of tablet files that are candidates for compaction
+     */
+    Set<StoredTabletFile> getCandidates(Set<StoredTabletFile> currFiles, 
CompactionKind kind,
+        boolean isCompactionStratConfigured) {
+
+      if (!currFiles.containsAll(allCompactingFiles)) {
+        log.trace("Ignoring because compacting not a subset {}", getExtent());
+
+        // A compaction finished, so things are out of date. This can happen 
because CompactableImpl
+        // and Tablet have separate locks, its ok.
+        return Set.of();
+      }
+
+      switch (kind) {
+        case SYSTEM: {
+          if (isCompactionStratConfigured)
+            return Set.of();
+
+          switch (selectStatus) {
+            case NOT_ACTIVE:
+            case CANCELED: {
+              Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
+              candidates.removeAll(allCompactingFiles);
+              return Collections.unmodifiableSet(candidates);
+            }
+            case NEW:
+            case SELECTING:
+              return Set.of();
+            case SELECTED: {
+              Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
+              candidates.removeAll(allCompactingFiles);
+              candidates.removeAll(selectedFiles);
+              return Collections.unmodifiableSet(candidates);
+            }
+            default:
+              throw new AssertionError();
+          }
+        }
+        case SELECTOR:
+          // intentional fall through
+        case USER:
+          switch (selectStatus) {
+            case NOT_ACTIVE:
+            case NEW:
+            case SELECTING:
+            case CANCELED:
+              return Set.of();
+            case SELECTED: {
+              if (selectKind == kind) {
+                Set<StoredTabletFile> candidates = new 
HashSet<>(selectedFiles);
+                candidates.removeAll(allCompactingFiles);
+                candidates = Collections.unmodifiableSet(candidates);
+                Preconditions.checkState(currFiles.containsAll(candidates),
+                    "selected files not in all files %s %s", candidates, 
currFiles);
+                return candidates;
+              } else {
+                return Set.of();
+              }
+            }
+            default:
+              throw new AssertionError();
+          }
+        case CHOP: {
+          switch (chopStatus) {
+            case NOT_ACTIVE:
+            case NEW:
+            case SELECTING:
+              return Set.of();
+            case SELECTED: {
+              if (selectStatus == FileSelectionStatus.NEW
+                  || selectStatus == FileSelectionStatus.SELECTING)
+                return Set.of();
+
+              var filesToChop = getFilesToChop(currFiles);
+              filesToChop.removeAll(allCompactingFiles);
+              if (selectStatus == FileSelectionStatus.SELECTED)
+                filesToChop.removeAll(selectedFiles);
+              return Collections.unmodifiableSet(filesToChop);
+            }
+            case CANCELED: // intentional fall through, not expected status 
for chop
+            default:
+              throw new AssertionError();
+          }
+        }
+        default:
+          throw new AssertionError();
+      }
+    }
+
+    /**
+     * Attempts to reserve a set of files for compaction.
+     *
+     * @return true if the files were reserved and false otherwise
+     */
+    private boolean reserveFiles(CompactionJob job, Set<StoredTabletFile> 
jobFiles) {
+
+      Preconditions.checkArgument(!jobFiles.isEmpty());
+
+      switch (selectStatus) {
+        case NEW:
+        case SELECTING:
+          log.trace(
+              "Ignoring compaction because files are being selected for user 
compaction {} {}",
+              getExtent(), job);
+          return false;
+        case SELECTED: {
+          if (job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR) {
+            if (selectKind == job.getKind()) {
+              if (!selectedFiles.containsAll(jobFiles)) {
+                // TODO diff log level?
+                log.error("Ignoring {} compaction that does not contain 
selected files {} {} {}",
+                    job.getKind(), getExtent(), asFileNames(selectedFiles), 
asFileNames(jobFiles));
+                return false;
+              }
+            } else {
+              log.trace("Ingoring {} compaction because not selected kind {}", 
job.getKind(),
+                  getExtent());
+              return false;
+            }
+          } else if (!Collections.disjoint(selectedFiles, jobFiles)) {
+            log.trace("Ingoring compaction that overlaps with selected files 
{} {} {}", getExtent(),
+                job.getKind(), asFileNames(Sets.intersection(selectedFiles, 
jobFiles)));
+            return false;
+          }
+          break;
+        }
+        case CANCELED:
+        case NOT_ACTIVE: {
+          if (job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR) {
+            log.trace("Ignoring {} compaction because selectStatus is {} for 
{}", job.getKind(),
+                selectStatus, getExtent());
+            return false;
+          }
+          break;
+        }
+        default:
+          throw new AssertionError();
+      }
+
+      if (Collections.disjoint(allCompactingFiles, jobFiles)) {
+        allCompactingFiles.addAll(jobFiles);
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    private KeyExtent getExtent() {
+      return extent;
+    }
+
+    /**
+     * Releases a set of files that were previously reserved for compaction.
+     *
+     * @param newFile
+     *          The file produced by a compaction. If the compaction failed, 
this can be null.
+     */
+    private void completed(CompactionJob job, Set<StoredTabletFile> jobFiles,
+        StoredTabletFile newFile) {
+      Preconditions.checkArgument(!jobFiles.isEmpty());
+      Preconditions.checkState(allCompactingFiles.removeAll(jobFiles));
+      if (newFile != null) {
+        choppedFiles.add(newFile);
+      }
+
+      if ((job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR)
+          && newFile != null) {
+        selectedCompactionCompleted(job, jobFiles, newFile);
+      }
+    }
+
+    private void selectedCompactionCompleted(CompactionJob job, 
Set<StoredTabletFile> jobFiles,
+        StoredTabletFile newFile) {
+      Preconditions.checkArgument(
+          job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR);
+      Preconditions.checkState(selectedFiles.containsAll(jobFiles));
+      Preconditions.checkState((selectStatus == FileSelectionStatus.SELECTED
+          || selectStatus == FileSelectionStatus.CANCELED) && selectKind == 
job.getKind());
+
+      selectedFiles.removeAll(jobFiles);
+
+      if (selectedFiles.isEmpty()
+          || (selectStatus == FileSelectionStatus.CANCELED && 
noneRunning(selectKind))) {
+        selectStatus = FileSelectionStatus.NOT_ACTIVE;
+        log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
+      } else if (selectStatus == FileSelectionStatus.SELECTED) {
+        selectedFiles.add(newFile);
+        log.trace("Compacted subset of selected files {} {} -> {}", 
getExtent(),
+            asFileNames(jobFiles), newFile.getFileName());
+      } else {
+        log.debug("Canceled selected compaction completed {} but others still 
running ",
+            getExtent());
+      }
+
+      TabletLogger.selected(getExtent(), selectKind, selectedFiles);
+    }
+
+  }
+
   public CompactableImpl(Tablet tablet, CompactionManager manager,
       Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions) {
     this.tablet = tablet;
@@ -164,7 +561,7 @@ public class CompactableImpl implements Compactable {
 
     Map<ExternalCompactionId,String> extCompactionsToRemove = new HashMap<>();
 
-    initializeSelection(extCompactions, tablet, extCompactionsToRemove);
+    var extSelInfo = initializeSelection(extCompactions, tablet, 
extCompactionsToRemove);
 
     sanityCheckExternalCompactions(extCompactions, dataFileSizes.keySet(), 
extCompactionsToRemove);
 
@@ -179,15 +576,17 @@ public class CompactableImpl implements Compactable {
       tabletMutator.mutate();
     }
 
+    ArrayList<StoredTabletFile> extCompactingFiles = new ArrayList<>();
+
     extCompactions.forEach((ecid, ecMeta) -> {
       if (!extCompactionsToRemove.containsKey(ecid)) {
-        allCompactingFiles.addAll(ecMeta.getJobFiles());
+        extCompactingFiles.addAll(ecMeta.getJobFiles());
         Collection<CompactableFile> files =
             ecMeta.getJobFiles().stream().map(f -> new CompactableFileImpl(f, 
dataFileSizes.get(f)))
                 .collect(Collectors.toList());
         CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(),
             ecMeta.getCompactionExecutorId(), files, ecMeta.getKind(), 
Optional.empty());
-        runnningJobs.add(job);
+        addJob(job);
 
         ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
         ecInfo.job = job;
@@ -200,8 +599,6 @@ public class CompactableImpl implements Compactable {
       }
     });
 
-    compactionRunning = !allCompactingFiles.isEmpty();
-
     if (extCompactions.values().stream().map(ecMeta -> ecMeta.getKind())
         .anyMatch(kind -> kind == CompactionKind.CHOP)) {
       initiateChop();
@@ -214,6 +611,8 @@ public class CompactableImpl implements Compactable {
       }
       return Set.copyOf(servicesIds);
     }, 2, TimeUnit.SECONDS);
+
+    this.fileMgr = new FileManager(tablet.getExtent(), extCompactingFiles, 
extSelInfo);
   }
 
   private void sanityCheckExternalCompactions(
@@ -239,60 +638,53 @@ public class CompactableImpl implements Compactable {
 
   }
 
+  private synchronized boolean addJob(CompactionJob job) {
+    if (runningJobs.add(job)) {
+      compactionRunning = true;
+      return true;
+    }
+
+    return false;
+  }
+
+  private synchronized boolean removeJob(CompactionJob job) {
+    var removed = runningJobs.remove(job);
+    compactionRunning = !runningJobs.isEmpty();
+    return removed;
+  }
+
+  private synchronized boolean noneRunning(CompactionKind kind) {
+    return runningJobs.stream().noneMatch(job -> job.getKind() == kind);
+  }
+
   void initiateChop() {
 
     Set<StoredTabletFile> allFiles = tablet.getDatafiles().keySet();
-    Set<StoredTabletFile> filesToExamine = new HashSet<>(allFiles);
+    FileManager.ChopSelector chopSelector;
 
     synchronized (this) {
-      if (chopStatus == FileSelectionStatus.NOT_ACTIVE) {
-        chopStatus = FileSelectionStatus.SELECTING;
-        filesToExamine.removeAll(choppedFiles);
-        filesToExamine.removeAll(allCompactingFiles);
+      if (fileMgr.getChopStatus() == FileSelectionStatus.NOT_ACTIVE) {
+        chopSelector = fileMgr.initiateChop(allFiles);
       } else {
         return;
       }
     }
 
-    Set<StoredTabletFile> unchoppedFiles = selectChopFiles(filesToExamine);
+    Set<StoredTabletFile> unchoppedFiles = 
selectChopFiles(chopSelector.getFilesToExamine());
 
     synchronized (this) {
-      Preconditions.checkState(chopStatus == FileSelectionStatus.SELECTING);
-      choppedFiles.addAll(Sets.difference(filesToExamine, unchoppedFiles));
-      chopStatus = FileSelectionStatus.SELECTED;
-      this.allFilesWhenChopStarted.clear();
-      this.allFilesWhenChopStarted.addAll(allFiles);
-
-      var filesToChop = getFilesToChop(allFiles);
-      if (!filesToChop.isEmpty()) {
-        TabletLogger.selected(getExtent(), CompactionKind.CHOP, filesToChop);
-      }
+      chopSelector.selectChopFiles(unchoppedFiles);
     }
 
     checkifChopComplete(tablet.getDatafiles().keySet());
   }
 
-  private synchronized Set<StoredTabletFile> 
getFilesToChop(Set<StoredTabletFile> allFiles) {
-    Preconditions.checkState(chopStatus == FileSelectionStatus.SELECTED);
-    var copy = new HashSet<>(allFilesWhenChopStarted);
-    copy.retainAll(allFiles);
-    copy.removeAll(choppedFiles);
-    return copy;
-  }
-
   private void checkifChopComplete(Set<StoredTabletFile> allFiles) {
 
     boolean completed = false;
 
     synchronized (this) {
-      if (chopStatus == FileSelectionStatus.SELECTED) {
-        if (getFilesToChop(allFiles).isEmpty()) {
-          chopStatus = FileSelectionStatus.NOT_ACTIVE;
-          completed = true;
-        }
-      }
-
-      choppedFiles.retainAll(allFiles);
+      completed = fileMgr.finishChop(allFiles);
     }
 
     if (completed) {
@@ -323,7 +715,7 @@ public class CompactableImpl implements Compactable {
   void filesAdded(boolean chopped, Collection<StoredTabletFile> files) {
     if (chopped) {
       synchronized (this) {
-        choppedFiles.addAll(files);
+        fileMgr.addChoppedFiles(files);
       }
     }
 
@@ -345,17 +737,13 @@ public class CompactableImpl implements Compactable {
     initiateSelection(CompactionKind.SELECTOR, null, null);
   }
 
-  private boolean noneRunning(CompactionKind kind) {
-    return runnningJobs.stream().noneMatch(job -> job.getKind() == kind);
-  }
-
   private void checkIfUserCompactionCanceled() {
 
     synchronized (this) {
       if (closed)
         return;
 
-      if (selectStatus != FileSelectionStatus.SELECTED || selectKind != 
CompactionKind.USER) {
+      if (!fileMgr.isSelected(CompactionKind.USER)) {
         return;
       }
     }
@@ -365,16 +753,8 @@ public class CompactableImpl implements Compactable {
     lastSeenCompactionCancelId.getAndUpdate(prev -> Long.max(prev, cancelId));
 
     synchronized (this) {
-      if (selectStatus == FileSelectionStatus.SELECTED && selectKind == 
CompactionKind.USER) {
-        if (cancelId >= compactionId) {
-          if (noneRunning(CompactionKind.USER)) {
-            selectStatus = FileSelectionStatus.NOT_ACTIVE;
-            log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
-          } else {
-            selectStatus = FileSelectionStatus.CANCELED;
-            log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
-          }
-        }
+      if (cancelId >= compactionId) {
+        fileMgr.userCompactionCanceled();
       }
     }
   }
@@ -387,7 +767,7 @@ public class CompactableImpl implements Compactable {
    * selected set must be initialized. Since the data is coming from persisted 
storage, lots of
    * sanity checks are done in this method rather than assuming the persisted 
data is just correct.
    */
-  private void initializeSelection(
+  private Optional<SelectedInfo> initializeSelection(
       Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions, 
Tablet tablet,
       Map<ExternalCompactionId,String> externalCompactionsToRemove) {
     CompactionKind extKind = null;
@@ -487,7 +867,7 @@ public class CompactableImpl implements Compactable {
         var kind = e.getValue().getKind();
         return kind == CompactionKind.SELECTOR || kind == CompactionKind.USER;
       }).map(Entry::getKey).forEach(ecid -> 
externalCompactionsToRemove.putIfAbsent(ecid, reason));
-      return;
+      return Optional.empty();
     }
 
     if (extKind != null) {
@@ -500,15 +880,10 @@ public class CompactableImpl implements Compactable {
         this.chelper = CompactableUtils.getHelper(extKind, tablet, null, null);
       }
 
-      this.selectedFiles.clear();
-      this.selectedFiles.addAll(tmpSelectedFiles);
-      this.selectKind = extKind;
-      this.initiallySelectedAll = initiallySelAll;
-      this.selectStatus = FileSelectionStatus.SELECTED;
-
-      log.debug("Selected compaction status initialized from external 
compactions {} {} {} {}",
-          getExtent(), selectStatus, initiallySelectedAll, 
asFileNames(selectedFiles));
+      return Optional.of(new SelectedInfo(initiallySelAll, tmpSelectedFiles, 
extKind));
     }
+
+    return Optional.empty();
   }
 
   private void initiateSelection(CompactionKind kind, Long compactionId,
@@ -524,17 +899,12 @@ public class CompactableImpl implements Compactable {
       if (closed)
         return;
 
-      if (selectStatus == FileSelectionStatus.NOT_ACTIVE || (kind == 
CompactionKind.USER
-          && selectKind == CompactionKind.SELECTOR && 
noneRunning(CompactionKind.SELECTOR))) {
-        selectStatus = FileSelectionStatus.NEW;
-        selectKind = kind;
-        selectedFiles.clear();
-        initiallySelectedAll = false;
+      if (fileMgr.initiateSelection(kind)) {
         this.chelper = localHelper;
         this.compactionId = compactionId;
         this.compactionConfig = compactionConfig;
-        log.trace("Selected compaction status changed {} {} {} {}", 
getExtent(), selectStatus,
-            compactionId, compactionConfig);
+        log.trace("Selected compaction status changed {} {} {} {}", 
getExtent(),
+            fileMgr.getSelectionStatus(), compactionId, compactionConfig);
       } else {
         return;
       }
@@ -549,11 +919,8 @@ public class CompactableImpl implements Compactable {
     CompactionHelper localHelper;
 
     synchronized (this) {
-      if (selectStatus == FileSelectionStatus.NEW && 
allCompactingFiles.isEmpty()) {
-        selectedFiles.clear();
-        selectStatus = FileSelectionStatus.SELECTING;
+      if (!closed && fileMgr.beginSelection()) {
         localHelper = this.chelper;
-        log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
       } else {
         return;
       }
@@ -565,67 +932,34 @@ public class CompactableImpl implements Compactable {
 
       if (selectingFiles.isEmpty()) {
         synchronized (this) {
-          Preconditions.checkState(selectStatus == 
FileSelectionStatus.SELECTING);
-          selectStatus = FileSelectionStatus.NOT_ACTIVE;
-          log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
+          fileMgr.cancelSelection();
         }
       } else {
         var allSelected =
             allFiles.keySet().equals(Sets.union(selectingFiles, 
localHelper.getFilesToDrop()));
         synchronized (this) {
-          Preconditions.checkState(selectStatus == 
FileSelectionStatus.SELECTING);
-          selectStatus = FileSelectionStatus.SELECTED;
-          selectedFiles.addAll(selectingFiles);
-          initiallySelectedAll = allSelected;
-          log.trace("Selected compaction status changed {} {} {} {}", 
getExtent(), selectStatus,
-              initiallySelectedAll, asFileNames(selectedFiles));
-          TabletLogger.selected(getExtent(), selectKind, selectedFiles);
+          fileMgr.finishSelection(selectingFiles, allSelected);
         }
 
         manager.compactableChanged(this);
       }
 
     } catch (Exception e) {
+      log.error("Failed to select user compaction files {}", getExtent(), e);
+    } finally {
       synchronized (this) {
-        if (selectStatus == FileSelectionStatus.SELECTING)
-          selectStatus = FileSelectionStatus.NOT_ACTIVE;
-        log.error("Failed to select user compaction files {}", getExtent(), e);
-        log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
-        selectedFiles.clear();
+        if (fileMgr.getSelectionStatus() == FileSelectionStatus.SELECTING) {
+          fileMgr.cancelSelection();
+        }
       }
     }
 
   }
 
-  private Collection<String> asFileNames(Set<StoredTabletFile> files) {
+  static Collection<String> asFileNames(Set<StoredTabletFile> files) {
     return Collections2.transform(files, StoredTabletFile::getFileName);
   }
 
-  private synchronized void selectedCompactionCompleted(CompactionJob job,
-      Set<StoredTabletFile> jobFiles, StoredTabletFile newFile) {
-    Preconditions.checkArgument(
-        job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR);
-    Preconditions.checkState(selectedFiles.containsAll(jobFiles));
-    Preconditions.checkState((selectStatus == FileSelectionStatus.SELECTED
-        || selectStatus == FileSelectionStatus.CANCELED) && selectKind == 
job.getKind());
-
-    selectedFiles.removeAll(jobFiles);
-
-    if (selectedFiles.isEmpty()
-        || (selectStatus == FileSelectionStatus.CANCELED && 
noneRunning(selectKind))) {
-      selectStatus = FileSelectionStatus.NOT_ACTIVE;
-      log.trace("Selected compaction status changed {} {}", getExtent(), 
selectStatus);
-    } else if (selectStatus == FileSelectionStatus.SELECTED) {
-      selectedFiles.add(newFile);
-      log.trace("Compacted subset of selected files {} {} -> {}", getExtent(),
-          asFileNames(jobFiles), newFile.getFileName());
-    } else {
-      log.debug("Canceled selected compaction completed {} but others still 
running ", getExtent());
-    }
-
-    TabletLogger.selected(getExtent(), selectKind, selectedFiles);
-  }
-
   @Override
   public TableId getTableId() {
     return getExtent().tableId();
@@ -662,94 +996,16 @@ public class CompactableImpl implements Compactable {
       if (closed)
         return Optional.empty();
 
-      if (!files.keySet().containsAll(allCompactingFiles)) {
-        log.trace("Ignoring because compacting not a subset {}", getExtent());
-
-        // A compaction finished, so things are out of date. This can happen 
because this class and
-        // tablet have separate locks, its ok.
-        return Optional.of(new Compactable.Files(files, Set.of(), Set.of()));
-      }
-
-      var allCompactingCopy = Set.copyOf(allCompactingFiles);
-      var runningJobsCopy = Set.copyOf(runnningJobs);
+      var runningJobsCopy = Set.copyOf(runningJobs);
 
-      switch (kind) {
-        case SYSTEM: {
-          if (isCompactionStratConfigured())
-            return Optional.of(new Compactable.Files(files, Set.of(), 
runningJobsCopy));
-
-          switch (selectStatus) {
-            case NOT_ACTIVE:
-            case CANCELED:
-              return Optional.of(new Compactable.Files(files,
-                  Sets.difference(files.keySet(), allCompactingCopy), 
runningJobsCopy));
-            case NEW:
-            case SELECTING:
-              return Optional.of(new Compactable.Files(files, Set.of(), 
runningJobsCopy));
-            case SELECTED: {
-              Set<StoredTabletFile> candidates = new HashSet<>(files.keySet());
-              candidates.removeAll(allCompactingCopy);
-              candidates.removeAll(selectedFiles);
-              candidates = Collections.unmodifiableSet(candidates);
-              return Optional.of(new Compactable.Files(files, candidates, 
runningJobsCopy));
-            }
-            default:
-              throw new AssertionError();
-          }
-        }
-        case SELECTOR:
-          // intentional fall through
-        case USER:
-          switch (selectStatus) {
-            case NOT_ACTIVE:
-            case NEW:
-            case SELECTING:
-            case CANCELED:
-              return Optional.of(new Compactable.Files(files, Set.of(), 
runningJobsCopy));
-            case SELECTED: {
-              if (selectKind == kind) {
-                Set<StoredTabletFile> candidates = new 
HashSet<>(selectedFiles);
-                candidates.removeAll(allCompactingFiles);
-                candidates = Collections.unmodifiableSet(candidates);
-                
Preconditions.checkState(files.keySet().containsAll(candidates),
-                    "selected files not in all files %s %s", candidates, 
files.keySet());
-                Map<String,String> hints = Map.of();
-                if (kind == CompactionKind.USER)
-                  hints = compactionConfig.getExecutionHints();
-                return Optional
-                    .of(new Compactable.Files(files, candidates, 
runningJobsCopy, hints));
-              } else {
-                return Optional.of(new Compactable.Files(files, Set.of(), 
runningJobsCopy));
-              }
-            }
-            default:
-              throw new AssertionError();
-          }
-        case CHOP: {
-          switch (chopStatus) {
-            case NOT_ACTIVE:
-            case NEW:
-            case SELECTING:
-              return Optional.of(new Compactable.Files(files, Set.of(), 
runningJobsCopy));
-            case SELECTED: {
-              if (selectStatus == FileSelectionStatus.NEW
-                  || selectStatus == FileSelectionStatus.SELECTING)
-                return Optional.of(new Compactable.Files(files, Set.of(), 
runningJobsCopy));
+      Set<StoredTabletFile> candidates = fileMgr.getCandidates(
+          Collections.unmodifiableSet(files.keySet()), kind, 
isCompactionStratConfigured());
 
-              var filesToChop = getFilesToChop(files.keySet());
-              filesToChop.removeAll(allCompactingFiles);
-              filesToChop = Collections.unmodifiableSet(filesToChop);
-              if (selectStatus == FileSelectionStatus.SELECTED)
-                filesToChop.removeAll(selectedFiles);
-              return Optional.of(new Compactable.Files(files, filesToChop, 
runningJobsCopy));
-            }
-            case CANCELED: // intentional fall through, not expected status 
for chop
-            default:
-              throw new AssertionError();
-          }
-        }
-        default:
-          throw new AssertionError();
+      if (kind == CompactionKind.USER && !candidates.isEmpty()) {
+        Map<String,String> hints = compactionConfig.getExecutionHints();
+        return Optional.of(new Compactable.Files(files, candidates, 
runningJobsCopy, hints));
+      } else {
+        return Optional.of(new Compactable.Files(files, candidates, 
runningJobsCopy));
       }
     }
   }
@@ -806,70 +1062,31 @@ public class CompactableImpl implements Compactable {
       if (closed)
         return Optional.empty();
 
-      if (!service.equals(getConfiguredService(job.getKind())))
+      if (runningJobs.contains(job))
         return Optional.empty();
 
-      switch (selectStatus) {
-        case NEW:
-        case SELECTING:
-          log.trace(
-              "Ignoring compaction because files are being selected for user 
compaction {} {}",
-              getExtent(), job);
-          return Optional.empty();
-        case SELECTED: {
-          if (job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR) {
-            if (selectKind == job.getKind()) {
-              if (!selectedFiles.containsAll(cInfo.jobFiles)) {
-                log.error("Ignoring {} compaction that does not contain 
selected files {} {} {}",
-                    job.getKind(), getExtent(), asFileNames(selectedFiles),
-                    asFileNames(cInfo.jobFiles));
-                return Optional.empty();
-              }
-            } else {
-              log.trace("Ingoring {} compaction because not selected kind {}", 
job.getKind(),
-                  getExtent());
-              return Optional.empty();
-            }
-          } else if (!Collections.disjoint(selectedFiles, cInfo.jobFiles)) {
-            log.trace("Ingoring compaction that overlaps with selected files 
{} {} {}", getExtent(),
-                job.getKind(), asFileNames(Sets.intersection(selectedFiles, 
cInfo.jobFiles)));
-            return Optional.empty();
-          }
-          break;
-        }
-        case CANCELED:
-        case NOT_ACTIVE: {
-          if (job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR) {
-            log.trace("Ignoring {} compaction because selectStatus is {} for 
{}", job.getKind(),
-                selectStatus, getExtent());
-            return Optional.empty();
-          }
-          break;
-        }
-        default:
-          throw new AssertionError();
-      }
+      if (!service.equals(getConfiguredService(job.getKind())))
+        return Optional.empty();
 
-      if (Collections.disjoint(allCompactingFiles, cInfo.jobFiles)) {
-        allCompactingFiles.addAll(cInfo.jobFiles);
-        runnningJobs.add(job);
-      } else {
+      if (!fileMgr.reserveFiles(job, cInfo.jobFiles))
         return Optional.empty();
-      }
 
-      compactionRunning = !allCompactingFiles.isEmpty();
+      if (!addJob(job)) {
+        throw new AssertionError();
+      }
 
       switch (job.getKind()) {
         case SELECTOR:
         case USER:
-          Preconditions.checkState(selectStatus == 
FileSelectionStatus.SELECTED);
-          if (job.getKind() == selectKind && initiallySelectedAll
-              && cInfo.jobFiles.containsAll(selectedFiles)) {
+          var si = fileMgr.getSelectedInfo();
+
+          if (job.getKind() == si.selectKind && si.initiallySelectedAll
+              && cInfo.jobFiles.containsAll(si.selectedFiles)) {
             cInfo.propagateDeletes = false;
           }
 
-          cInfo.selectedFiles = Set.copyOf(selectedFiles);
-          cInfo.initiallySelectedAll = initiallySelectedAll;
+          cInfo.selectedFiles = si.selectedFiles;
+          cInfo.initiallySelectedAll = si.initiallySelectedAll;
 
           break;
         default:
@@ -897,26 +1114,16 @@ public class CompactableImpl implements Compactable {
   private void completeCompaction(CompactionJob job, Set<StoredTabletFile> 
jobFiles,
       StoredTabletFile metaFile) {
     synchronized (this) {
-      Preconditions.checkState(allCompactingFiles.removeAll(jobFiles));
-      Preconditions.checkState(runnningJobs.remove(job));
-      compactionRunning = !allCompactingFiles.isEmpty();
+      Preconditions.checkState(removeJob(job));
+      fileMgr.completed(job, jobFiles, metaFile);
 
-      if (allCompactingFiles.isEmpty()) {
+      if (!compactionRunning) {
         notifyAll();
       }
-
-      if (metaFile != null) {
-        choppedFiles.add(metaFile);
-      }
     }
 
     checkifChopComplete(tablet.getDatafiles().keySet());
-
-    if ((job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR)
-        && metaFile != null)
-      selectedCompactionCompleted(job, jobFiles, metaFile);
-    else
-      selectFiles();
+    selectFiles();
   }
 
   @Override
@@ -1112,9 +1319,9 @@ public class CompactableImpl implements Compactable {
 
       if (kind == CompactionKind.USER) {
         synchronized (this) {
-          if (selectStatus != FileSelectionStatus.NOT_ACTIVE
-              && selectStatus != FileSelectionStatus.CANCELED
-              && selectKind == CompactionKind.USER) {
+          if (fileMgr.getSelectionStatus() != FileSelectionStatus.NOT_ACTIVE
+              && fileMgr.getSelectionStatus() != FileSelectionStatus.CANCELED
+              && fileMgr.getSelectionKind() == CompactionKind.USER) {
             tmpHints = compactionConfig.getExecutionHints();
           }
         }
@@ -1183,7 +1390,7 @@ public class CompactableImpl implements Compactable {
 
       // wait while internal jobs are running or external compactions are 
committing, but do not
       // wait on external compactions that are running
-      while (runnningJobs.stream()
+      while (runningJobs.stream()
           .anyMatch(job -> !((CompactionExecutorIdImpl) 
job.getExecutor()).isExternalId())
           || !externalCompactionsCommitting.isEmpty()) {
         try {

Reply via email to