This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new a9100ac Refactored code that does bookeeping for compacting files. (#2213) a9100ac is described below commit a9100ac35f3d72677b767d7670dfb032c7e7d733 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Aug 3 17:32:03 2021 -0400 Refactored code that does bookeeping for compacting files. (#2213) --- .../accumulo/tserver/tablet/CompactableImpl.java | 787 +++++++++++++-------- 1 file changed, 497 insertions(+), 290 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index d4562cc..3efbfe5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -100,34 +100,20 @@ public class CompactableImpl implements Compactable { private final Tablet tablet; - private Set<StoredTabletFile> allCompactingFiles = new HashSet<>(); - private Set<CompactionJob> runnningJobs = new HashSet<>(); - private volatile boolean compactionRunning = false; - - private Set<StoredTabletFile> selectedFiles = new HashSet<>(); - - private Set<StoredTabletFile> allFilesWhenChopStarted = new HashSet<>(); + private final FileManager fileMgr; - // track files produced by compactions of this tablet, those are considered chopped - private Set<StoredTabletFile> choppedFiles = new HashSet<>(); - private FileSelectionStatus chopStatus = FileSelectionStatus.NOT_ACTIVE; + private Set<CompactionJob> runningJobs = new HashSet<>(); + private volatile boolean compactionRunning = false; private Supplier<Set<CompactionServiceId>> servicesInUse; private Set<CompactionServiceId> servicesUsed = new ConcurrentSkipListSet<>(); // status of special compactions - private enum FileSelectionStatus { + enum FileSelectionStatus { NEW, SELECTING, SELECTED, NOT_ACTIVE, CANCELED } - private FileSelectionStatus selectStatus = FileSelectionStatus.NOT_ACTIVE; - private CompactionKind selectKind = null; - // Tracks if when a set of files was selected, if at that time the set was all of the tablets - // files. Because a set of selected files can be compacted over one or more compactions, its - // important to track this in order to know if the last compaction is a full compaction and should - // not propagate deletes. - private boolean initiallySelectedAll = false; private CompactionHelper chelper = null; private Long compactionId; private CompactionConfig compactionConfig; @@ -155,6 +141,417 @@ public class CompactableImpl implements Compactable { } + static class SelectedInfo { + final boolean initiallySelectedAll; + final Set<StoredTabletFile> selectedFiles; + final CompactionKind selectKind; + + public SelectedInfo(boolean initiallySelectedAll, Set<StoredTabletFile> selectedFiles, + CompactionKind selectKind) { + this.initiallySelectedAll = initiallySelectedAll; + this.selectedFiles = Set.copyOf(selectedFiles); + this.selectKind = selectKind; + } + } + + /** + * This class tracks status of a tablets files for compactions for {@link CompactableImpl} owning + * the following functionality. + * + * <UL> + * <LI>Tracks which files are reserved for compactions + * <LI>Determines which files are available for compactions + * <LI>Tracks which files are chopped and which need to be chopped + * <LI>Tracks which files are selected for user and selector compactions + * <LI>Coordinates the file selection process + * </UL> + * + * <p> + * The class is structured in such a way that the above functionality can be unit tested. + * + * <p> + * This class does no synchronization of its own and relies on CompactableImpl to do all needed + * synchronization. CompactableImpl must makes changes to files and other state like running jobs + * in a mutually exclusive manner, so synchronization at this level is unnecessary. + * + */ + private class FileManager { + + FileSelectionStatus selectStatus = FileSelectionStatus.NOT_ACTIVE; + private CompactionKind selectKind = null; + + // Tracks if when a set of files was selected, if at that time the set was all of the tablets + // files. Because a set of selected files can be compacted over one or more compactions, its + // important to track this in order to know if the last compaction is a full compaction and + // should + // not propagate deletes. + private boolean initiallySelectedAll = false; + private Set<StoredTabletFile> selectedFiles = new HashSet<>(); + + private Set<StoredTabletFile> allCompactingFiles = new HashSet<>(); + + // track files produced by compactions of this tablet, those are considered chopped + private Set<StoredTabletFile> choppedFiles = new HashSet<>(); + private FileSelectionStatus chopStatus = FileSelectionStatus.NOT_ACTIVE; + private Set<StoredTabletFile> allFilesWhenChopStarted = new HashSet<>(); + + private final KeyExtent extent; + + public FileManager(KeyExtent extent, Collection<StoredTabletFile> extCompactingFiles, + Optional<SelectedInfo> extSelInfo) { + this.extent = extent; + allCompactingFiles.addAll(extCompactingFiles); + if (extSelInfo.isPresent()) { + this.selectedFiles.addAll(extSelInfo.get().selectedFiles); + this.selectKind = extSelInfo.get().selectKind; + this.initiallySelectedAll = extSelInfo.get().initiallySelectedAll; + this.selectStatus = FileSelectionStatus.SELECTED; + + log.debug("Selected compaction status initialized from external compactions {} {} {} {}", + getExtent(), selectStatus, initiallySelectedAll, asFileNames(selectedFiles)); + } + } + + FileSelectionStatus getSelectionStatus() { + return selectStatus; + } + + CompactionKind getSelectionKind() { + return selectKind; + } + + SelectedInfo getSelectedInfo() { + Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTED); + return new SelectedInfo(initiallySelectedAll, selectedFiles, selectKind); + } + + boolean initiateSelection(CompactionKind kind) { + + if (selectStatus == FileSelectionStatus.NOT_ACTIVE || (kind == CompactionKind.USER + && selectKind == CompactionKind.SELECTOR && noneRunning(CompactionKind.SELECTOR))) { + selectStatus = FileSelectionStatus.NEW; + selectKind = kind; + selectedFiles.clear(); + initiallySelectedAll = false; + return true; + } + + return false; + + } + + boolean beginSelection() { + if (selectStatus == FileSelectionStatus.NEW && allCompactingFiles.isEmpty()) { + selectStatus = FileSelectionStatus.SELECTING; + log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); + return true; + } + + return false; + } + + void finishSelection(Set<StoredTabletFile> selected, boolean allSelected) { + Preconditions.checkArgument(!selected.isEmpty()); + Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTING); + selectStatus = FileSelectionStatus.SELECTED; + selectedFiles.clear(); + selectedFiles.addAll(selected); + initiallySelectedAll = allSelected; + log.trace("Selected compaction status changed {} {} {} {}", getExtent(), selectStatus, + initiallySelectedAll, asFileNames(selectedFiles)); + TabletLogger.selected(getExtent(), selectKind, selectedFiles); + } + + void cancelSelection() { + Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTING); + selectStatus = FileSelectionStatus.NOT_ACTIVE; + log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); + } + + boolean isSelected(CompactionKind kind) { + return selectStatus == FileSelectionStatus.SELECTED && kind == selectKind; + } + + FileSelectionStatus getChopStatus() { + return chopStatus; + } + + ChopSelector initiateChop(Set<StoredTabletFile> allFiles) { + Preconditions.checkState(chopStatus == FileSelectionStatus.NOT_ACTIVE); + Set<StoredTabletFile> filesToExamine = new HashSet<>(allFiles); + chopStatus = FileSelectionStatus.SELECTING; + filesToExamine.removeAll(choppedFiles); + filesToExamine.removeAll(allCompactingFiles); + return new ChopSelector(allFiles, filesToExamine); + } + + class ChopSelector { + private Set<StoredTabletFile> allFiles; + private Set<StoredTabletFile> filesToExamine; + + private ChopSelector(Set<StoredTabletFile> allFiles, Set<StoredTabletFile> filesToExamine) { + this.allFiles = allFiles; + this.filesToExamine = filesToExamine; + } + + void selectChopFiles(Set<StoredTabletFile> unchoppedFiles) { + Preconditions.checkState(chopStatus == FileSelectionStatus.SELECTING); + choppedFiles.addAll(Sets.difference(filesToExamine, unchoppedFiles)); + chopStatus = FileSelectionStatus.SELECTED; + allFilesWhenChopStarted.clear(); + allFilesWhenChopStarted.addAll(allFiles); + + var filesToChop = getFilesToChop(allFiles); + if (!filesToChop.isEmpty()) { + TabletLogger.selected(getExtent(), CompactionKind.CHOP, filesToChop); + } + } + + Set<StoredTabletFile> getFilesToExamine() { + return Collections.unmodifiableSet(filesToExamine); + } + } + + boolean finishChop(Set<StoredTabletFile> allFiles) { + + boolean completed = false; + + if (chopStatus == FileSelectionStatus.SELECTED) { + if (getFilesToChop(allFiles).isEmpty()) { + chopStatus = FileSelectionStatus.NOT_ACTIVE; + completed = true; + } + } + + choppedFiles.retainAll(allFiles); + + return completed; + } + + void addChoppedFiles(Collection<StoredTabletFile> files) { + choppedFiles.addAll(files); + } + + void userCompactionCanceled() { + if (isSelected(CompactionKind.USER)) { + if (noneRunning(CompactionKind.USER)) { + selectStatus = FileSelectionStatus.NOT_ACTIVE; + log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); + } else { + selectStatus = FileSelectionStatus.CANCELED; + log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); + } + } + } + + private Set<StoredTabletFile> getFilesToChop(Set<StoredTabletFile> allFiles) { + Preconditions.checkState(chopStatus == FileSelectionStatus.SELECTED); + var copy = new HashSet<>(allFilesWhenChopStarted); + copy.retainAll(allFiles); + copy.removeAll(choppedFiles); + return copy; + } + + /** + * @return The set of tablet files that are candidates for compaction + */ + Set<StoredTabletFile> getCandidates(Set<StoredTabletFile> currFiles, CompactionKind kind, + boolean isCompactionStratConfigured) { + + if (!currFiles.containsAll(allCompactingFiles)) { + log.trace("Ignoring because compacting not a subset {}", getExtent()); + + // A compaction finished, so things are out of date. This can happen because CompactableImpl + // and Tablet have separate locks, its ok. + return Set.of(); + } + + switch (kind) { + case SYSTEM: { + if (isCompactionStratConfigured) + return Set.of(); + + switch (selectStatus) { + case NOT_ACTIVE: + case CANCELED: { + Set<StoredTabletFile> candidates = new HashSet<>(currFiles); + candidates.removeAll(allCompactingFiles); + return Collections.unmodifiableSet(candidates); + } + case NEW: + case SELECTING: + return Set.of(); + case SELECTED: { + Set<StoredTabletFile> candidates = new HashSet<>(currFiles); + candidates.removeAll(allCompactingFiles); + candidates.removeAll(selectedFiles); + return Collections.unmodifiableSet(candidates); + } + default: + throw new AssertionError(); + } + } + case SELECTOR: + // intentional fall through + case USER: + switch (selectStatus) { + case NOT_ACTIVE: + case NEW: + case SELECTING: + case CANCELED: + return Set.of(); + case SELECTED: { + if (selectKind == kind) { + Set<StoredTabletFile> candidates = new HashSet<>(selectedFiles); + candidates.removeAll(allCompactingFiles); + candidates = Collections.unmodifiableSet(candidates); + Preconditions.checkState(currFiles.containsAll(candidates), + "selected files not in all files %s %s", candidates, currFiles); + return candidates; + } else { + return Set.of(); + } + } + default: + throw new AssertionError(); + } + case CHOP: { + switch (chopStatus) { + case NOT_ACTIVE: + case NEW: + case SELECTING: + return Set.of(); + case SELECTED: { + if (selectStatus == FileSelectionStatus.NEW + || selectStatus == FileSelectionStatus.SELECTING) + return Set.of(); + + var filesToChop = getFilesToChop(currFiles); + filesToChop.removeAll(allCompactingFiles); + if (selectStatus == FileSelectionStatus.SELECTED) + filesToChop.removeAll(selectedFiles); + return Collections.unmodifiableSet(filesToChop); + } + case CANCELED: // intentional fall through, not expected status for chop + default: + throw new AssertionError(); + } + } + default: + throw new AssertionError(); + } + } + + /** + * Attempts to reserve a set of files for compaction. + * + * @return true if the files were reserved and false otherwise + */ + private boolean reserveFiles(CompactionJob job, Set<StoredTabletFile> jobFiles) { + + Preconditions.checkArgument(!jobFiles.isEmpty()); + + switch (selectStatus) { + case NEW: + case SELECTING: + log.trace( + "Ignoring compaction because files are being selected for user compaction {} {}", + getExtent(), job); + return false; + case SELECTED: { + if (job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR) { + if (selectKind == job.getKind()) { + if (!selectedFiles.containsAll(jobFiles)) { + // TODO diff log level? + log.error("Ignoring {} compaction that does not contain selected files {} {} {}", + job.getKind(), getExtent(), asFileNames(selectedFiles), asFileNames(jobFiles)); + return false; + } + } else { + log.trace("Ingoring {} compaction because not selected kind {}", job.getKind(), + getExtent()); + return false; + } + } else if (!Collections.disjoint(selectedFiles, jobFiles)) { + log.trace("Ingoring compaction that overlaps with selected files {} {} {}", getExtent(), + job.getKind(), asFileNames(Sets.intersection(selectedFiles, jobFiles))); + return false; + } + break; + } + case CANCELED: + case NOT_ACTIVE: { + if (job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR) { + log.trace("Ignoring {} compaction because selectStatus is {} for {}", job.getKind(), + selectStatus, getExtent()); + return false; + } + break; + } + default: + throw new AssertionError(); + } + + if (Collections.disjoint(allCompactingFiles, jobFiles)) { + allCompactingFiles.addAll(jobFiles); + return true; + } else { + return false; + } + } + + private KeyExtent getExtent() { + return extent; + } + + /** + * Releases a set of files that were previously reserved for compaction. + * + * @param newFile + * The file produced by a compaction. If the compaction failed, this can be null. + */ + private void completed(CompactionJob job, Set<StoredTabletFile> jobFiles, + StoredTabletFile newFile) { + Preconditions.checkArgument(!jobFiles.isEmpty()); + Preconditions.checkState(allCompactingFiles.removeAll(jobFiles)); + if (newFile != null) { + choppedFiles.add(newFile); + } + + if ((job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR) + && newFile != null) { + selectedCompactionCompleted(job, jobFiles, newFile); + } + } + + private void selectedCompactionCompleted(CompactionJob job, Set<StoredTabletFile> jobFiles, + StoredTabletFile newFile) { + Preconditions.checkArgument( + job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR); + Preconditions.checkState(selectedFiles.containsAll(jobFiles)); + Preconditions.checkState((selectStatus == FileSelectionStatus.SELECTED + || selectStatus == FileSelectionStatus.CANCELED) && selectKind == job.getKind()); + + selectedFiles.removeAll(jobFiles); + + if (selectedFiles.isEmpty() + || (selectStatus == FileSelectionStatus.CANCELED && noneRunning(selectKind))) { + selectStatus = FileSelectionStatus.NOT_ACTIVE; + log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); + } else if (selectStatus == FileSelectionStatus.SELECTED) { + selectedFiles.add(newFile); + log.trace("Compacted subset of selected files {} {} -> {}", getExtent(), + asFileNames(jobFiles), newFile.getFileName()); + } else { + log.debug("Canceled selected compaction completed {} but others still running ", + getExtent()); + } + + TabletLogger.selected(getExtent(), selectKind, selectedFiles); + } + + } + public CompactableImpl(Tablet tablet, CompactionManager manager, Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions) { this.tablet = tablet; @@ -164,7 +561,7 @@ public class CompactableImpl implements Compactable { Map<ExternalCompactionId,String> extCompactionsToRemove = new HashMap<>(); - initializeSelection(extCompactions, tablet, extCompactionsToRemove); + var extSelInfo = initializeSelection(extCompactions, tablet, extCompactionsToRemove); sanityCheckExternalCompactions(extCompactions, dataFileSizes.keySet(), extCompactionsToRemove); @@ -179,15 +576,17 @@ public class CompactableImpl implements Compactable { tabletMutator.mutate(); } + ArrayList<StoredTabletFile> extCompactingFiles = new ArrayList<>(); + extCompactions.forEach((ecid, ecMeta) -> { if (!extCompactionsToRemove.containsKey(ecid)) { - allCompactingFiles.addAll(ecMeta.getJobFiles()); + extCompactingFiles.addAll(ecMeta.getJobFiles()); Collection<CompactableFile> files = ecMeta.getJobFiles().stream().map(f -> new CompactableFileImpl(f, dataFileSizes.get(f))) .collect(Collectors.toList()); CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), ecMeta.getCompactionExecutorId(), files, ecMeta.getKind(), Optional.empty()); - runnningJobs.add(job); + addJob(job); ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); ecInfo.job = job; @@ -200,8 +599,6 @@ public class CompactableImpl implements Compactable { } }); - compactionRunning = !allCompactingFiles.isEmpty(); - if (extCompactions.values().stream().map(ecMeta -> ecMeta.getKind()) .anyMatch(kind -> kind == CompactionKind.CHOP)) { initiateChop(); @@ -214,6 +611,8 @@ public class CompactableImpl implements Compactable { } return Set.copyOf(servicesIds); }, 2, TimeUnit.SECONDS); + + this.fileMgr = new FileManager(tablet.getExtent(), extCompactingFiles, extSelInfo); } private void sanityCheckExternalCompactions( @@ -239,60 +638,53 @@ public class CompactableImpl implements Compactable { } + private synchronized boolean addJob(CompactionJob job) { + if (runningJobs.add(job)) { + compactionRunning = true; + return true; + } + + return false; + } + + private synchronized boolean removeJob(CompactionJob job) { + var removed = runningJobs.remove(job); + compactionRunning = !runningJobs.isEmpty(); + return removed; + } + + private synchronized boolean noneRunning(CompactionKind kind) { + return runningJobs.stream().noneMatch(job -> job.getKind() == kind); + } + void initiateChop() { Set<StoredTabletFile> allFiles = tablet.getDatafiles().keySet(); - Set<StoredTabletFile> filesToExamine = new HashSet<>(allFiles); + FileManager.ChopSelector chopSelector; synchronized (this) { - if (chopStatus == FileSelectionStatus.NOT_ACTIVE) { - chopStatus = FileSelectionStatus.SELECTING; - filesToExamine.removeAll(choppedFiles); - filesToExamine.removeAll(allCompactingFiles); + if (fileMgr.getChopStatus() == FileSelectionStatus.NOT_ACTIVE) { + chopSelector = fileMgr.initiateChop(allFiles); } else { return; } } - Set<StoredTabletFile> unchoppedFiles = selectChopFiles(filesToExamine); + Set<StoredTabletFile> unchoppedFiles = selectChopFiles(chopSelector.getFilesToExamine()); synchronized (this) { - Preconditions.checkState(chopStatus == FileSelectionStatus.SELECTING); - choppedFiles.addAll(Sets.difference(filesToExamine, unchoppedFiles)); - chopStatus = FileSelectionStatus.SELECTED; - this.allFilesWhenChopStarted.clear(); - this.allFilesWhenChopStarted.addAll(allFiles); - - var filesToChop = getFilesToChop(allFiles); - if (!filesToChop.isEmpty()) { - TabletLogger.selected(getExtent(), CompactionKind.CHOP, filesToChop); - } + chopSelector.selectChopFiles(unchoppedFiles); } checkifChopComplete(tablet.getDatafiles().keySet()); } - private synchronized Set<StoredTabletFile> getFilesToChop(Set<StoredTabletFile> allFiles) { - Preconditions.checkState(chopStatus == FileSelectionStatus.SELECTED); - var copy = new HashSet<>(allFilesWhenChopStarted); - copy.retainAll(allFiles); - copy.removeAll(choppedFiles); - return copy; - } - private void checkifChopComplete(Set<StoredTabletFile> allFiles) { boolean completed = false; synchronized (this) { - if (chopStatus == FileSelectionStatus.SELECTED) { - if (getFilesToChop(allFiles).isEmpty()) { - chopStatus = FileSelectionStatus.NOT_ACTIVE; - completed = true; - } - } - - choppedFiles.retainAll(allFiles); + completed = fileMgr.finishChop(allFiles); } if (completed) { @@ -323,7 +715,7 @@ public class CompactableImpl implements Compactable { void filesAdded(boolean chopped, Collection<StoredTabletFile> files) { if (chopped) { synchronized (this) { - choppedFiles.addAll(files); + fileMgr.addChoppedFiles(files); } } @@ -345,17 +737,13 @@ public class CompactableImpl implements Compactable { initiateSelection(CompactionKind.SELECTOR, null, null); } - private boolean noneRunning(CompactionKind kind) { - return runnningJobs.stream().noneMatch(job -> job.getKind() == kind); - } - private void checkIfUserCompactionCanceled() { synchronized (this) { if (closed) return; - if (selectStatus != FileSelectionStatus.SELECTED || selectKind != CompactionKind.USER) { + if (!fileMgr.isSelected(CompactionKind.USER)) { return; } } @@ -365,16 +753,8 @@ public class CompactableImpl implements Compactable { lastSeenCompactionCancelId.getAndUpdate(prev -> Long.max(prev, cancelId)); synchronized (this) { - if (selectStatus == FileSelectionStatus.SELECTED && selectKind == CompactionKind.USER) { - if (cancelId >= compactionId) { - if (noneRunning(CompactionKind.USER)) { - selectStatus = FileSelectionStatus.NOT_ACTIVE; - log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); - } else { - selectStatus = FileSelectionStatus.CANCELED; - log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); - } - } + if (cancelId >= compactionId) { + fileMgr.userCompactionCanceled(); } } } @@ -387,7 +767,7 @@ public class CompactableImpl implements Compactable { * selected set must be initialized. Since the data is coming from persisted storage, lots of * sanity checks are done in this method rather than assuming the persisted data is just correct. */ - private void initializeSelection( + private Optional<SelectedInfo> initializeSelection( Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions, Tablet tablet, Map<ExternalCompactionId,String> externalCompactionsToRemove) { CompactionKind extKind = null; @@ -487,7 +867,7 @@ public class CompactableImpl implements Compactable { var kind = e.getValue().getKind(); return kind == CompactionKind.SELECTOR || kind == CompactionKind.USER; }).map(Entry::getKey).forEach(ecid -> externalCompactionsToRemove.putIfAbsent(ecid, reason)); - return; + return Optional.empty(); } if (extKind != null) { @@ -500,15 +880,10 @@ public class CompactableImpl implements Compactable { this.chelper = CompactableUtils.getHelper(extKind, tablet, null, null); } - this.selectedFiles.clear(); - this.selectedFiles.addAll(tmpSelectedFiles); - this.selectKind = extKind; - this.initiallySelectedAll = initiallySelAll; - this.selectStatus = FileSelectionStatus.SELECTED; - - log.debug("Selected compaction status initialized from external compactions {} {} {} {}", - getExtent(), selectStatus, initiallySelectedAll, asFileNames(selectedFiles)); + return Optional.of(new SelectedInfo(initiallySelAll, tmpSelectedFiles, extKind)); } + + return Optional.empty(); } private void initiateSelection(CompactionKind kind, Long compactionId, @@ -524,17 +899,12 @@ public class CompactableImpl implements Compactable { if (closed) return; - if (selectStatus == FileSelectionStatus.NOT_ACTIVE || (kind == CompactionKind.USER - && selectKind == CompactionKind.SELECTOR && noneRunning(CompactionKind.SELECTOR))) { - selectStatus = FileSelectionStatus.NEW; - selectKind = kind; - selectedFiles.clear(); - initiallySelectedAll = false; + if (fileMgr.initiateSelection(kind)) { this.chelper = localHelper; this.compactionId = compactionId; this.compactionConfig = compactionConfig; - log.trace("Selected compaction status changed {} {} {} {}", getExtent(), selectStatus, - compactionId, compactionConfig); + log.trace("Selected compaction status changed {} {} {} {}", getExtent(), + fileMgr.getSelectionStatus(), compactionId, compactionConfig); } else { return; } @@ -549,11 +919,8 @@ public class CompactableImpl implements Compactable { CompactionHelper localHelper; synchronized (this) { - if (selectStatus == FileSelectionStatus.NEW && allCompactingFiles.isEmpty()) { - selectedFiles.clear(); - selectStatus = FileSelectionStatus.SELECTING; + if (!closed && fileMgr.beginSelection()) { localHelper = this.chelper; - log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); } else { return; } @@ -565,67 +932,34 @@ public class CompactableImpl implements Compactable { if (selectingFiles.isEmpty()) { synchronized (this) { - Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTING); - selectStatus = FileSelectionStatus.NOT_ACTIVE; - log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); + fileMgr.cancelSelection(); } } else { var allSelected = allFiles.keySet().equals(Sets.union(selectingFiles, localHelper.getFilesToDrop())); synchronized (this) { - Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTING); - selectStatus = FileSelectionStatus.SELECTED; - selectedFiles.addAll(selectingFiles); - initiallySelectedAll = allSelected; - log.trace("Selected compaction status changed {} {} {} {}", getExtent(), selectStatus, - initiallySelectedAll, asFileNames(selectedFiles)); - TabletLogger.selected(getExtent(), selectKind, selectedFiles); + fileMgr.finishSelection(selectingFiles, allSelected); } manager.compactableChanged(this); } } catch (Exception e) { + log.error("Failed to select user compaction files {}", getExtent(), e); + } finally { synchronized (this) { - if (selectStatus == FileSelectionStatus.SELECTING) - selectStatus = FileSelectionStatus.NOT_ACTIVE; - log.error("Failed to select user compaction files {}", getExtent(), e); - log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); - selectedFiles.clear(); + if (fileMgr.getSelectionStatus() == FileSelectionStatus.SELECTING) { + fileMgr.cancelSelection(); + } } } } - private Collection<String> asFileNames(Set<StoredTabletFile> files) { + static Collection<String> asFileNames(Set<StoredTabletFile> files) { return Collections2.transform(files, StoredTabletFile::getFileName); } - private synchronized void selectedCompactionCompleted(CompactionJob job, - Set<StoredTabletFile> jobFiles, StoredTabletFile newFile) { - Preconditions.checkArgument( - job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR); - Preconditions.checkState(selectedFiles.containsAll(jobFiles)); - Preconditions.checkState((selectStatus == FileSelectionStatus.SELECTED - || selectStatus == FileSelectionStatus.CANCELED) && selectKind == job.getKind()); - - selectedFiles.removeAll(jobFiles); - - if (selectedFiles.isEmpty() - || (selectStatus == FileSelectionStatus.CANCELED && noneRunning(selectKind))) { - selectStatus = FileSelectionStatus.NOT_ACTIVE; - log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus); - } else if (selectStatus == FileSelectionStatus.SELECTED) { - selectedFiles.add(newFile); - log.trace("Compacted subset of selected files {} {} -> {}", getExtent(), - asFileNames(jobFiles), newFile.getFileName()); - } else { - log.debug("Canceled selected compaction completed {} but others still running ", getExtent()); - } - - TabletLogger.selected(getExtent(), selectKind, selectedFiles); - } - @Override public TableId getTableId() { return getExtent().tableId(); @@ -662,94 +996,16 @@ public class CompactableImpl implements Compactable { if (closed) return Optional.empty(); - if (!files.keySet().containsAll(allCompactingFiles)) { - log.trace("Ignoring because compacting not a subset {}", getExtent()); - - // A compaction finished, so things are out of date. This can happen because this class and - // tablet have separate locks, its ok. - return Optional.of(new Compactable.Files(files, Set.of(), Set.of())); - } - - var allCompactingCopy = Set.copyOf(allCompactingFiles); - var runningJobsCopy = Set.copyOf(runnningJobs); + var runningJobsCopy = Set.copyOf(runningJobs); - switch (kind) { - case SYSTEM: { - if (isCompactionStratConfigured()) - return Optional.of(new Compactable.Files(files, Set.of(), runningJobsCopy)); - - switch (selectStatus) { - case NOT_ACTIVE: - case CANCELED: - return Optional.of(new Compactable.Files(files, - Sets.difference(files.keySet(), allCompactingCopy), runningJobsCopy)); - case NEW: - case SELECTING: - return Optional.of(new Compactable.Files(files, Set.of(), runningJobsCopy)); - case SELECTED: { - Set<StoredTabletFile> candidates = new HashSet<>(files.keySet()); - candidates.removeAll(allCompactingCopy); - candidates.removeAll(selectedFiles); - candidates = Collections.unmodifiableSet(candidates); - return Optional.of(new Compactable.Files(files, candidates, runningJobsCopy)); - } - default: - throw new AssertionError(); - } - } - case SELECTOR: - // intentional fall through - case USER: - switch (selectStatus) { - case NOT_ACTIVE: - case NEW: - case SELECTING: - case CANCELED: - return Optional.of(new Compactable.Files(files, Set.of(), runningJobsCopy)); - case SELECTED: { - if (selectKind == kind) { - Set<StoredTabletFile> candidates = new HashSet<>(selectedFiles); - candidates.removeAll(allCompactingFiles); - candidates = Collections.unmodifiableSet(candidates); - Preconditions.checkState(files.keySet().containsAll(candidates), - "selected files not in all files %s %s", candidates, files.keySet()); - Map<String,String> hints = Map.of(); - if (kind == CompactionKind.USER) - hints = compactionConfig.getExecutionHints(); - return Optional - .of(new Compactable.Files(files, candidates, runningJobsCopy, hints)); - } else { - return Optional.of(new Compactable.Files(files, Set.of(), runningJobsCopy)); - } - } - default: - throw new AssertionError(); - } - case CHOP: { - switch (chopStatus) { - case NOT_ACTIVE: - case NEW: - case SELECTING: - return Optional.of(new Compactable.Files(files, Set.of(), runningJobsCopy)); - case SELECTED: { - if (selectStatus == FileSelectionStatus.NEW - || selectStatus == FileSelectionStatus.SELECTING) - return Optional.of(new Compactable.Files(files, Set.of(), runningJobsCopy)); + Set<StoredTabletFile> candidates = fileMgr.getCandidates( + Collections.unmodifiableSet(files.keySet()), kind, isCompactionStratConfigured()); - var filesToChop = getFilesToChop(files.keySet()); - filesToChop.removeAll(allCompactingFiles); - filesToChop = Collections.unmodifiableSet(filesToChop); - if (selectStatus == FileSelectionStatus.SELECTED) - filesToChop.removeAll(selectedFiles); - return Optional.of(new Compactable.Files(files, filesToChop, runningJobsCopy)); - } - case CANCELED: // intentional fall through, not expected status for chop - default: - throw new AssertionError(); - } - } - default: - throw new AssertionError(); + if (kind == CompactionKind.USER && !candidates.isEmpty()) { + Map<String,String> hints = compactionConfig.getExecutionHints(); + return Optional.of(new Compactable.Files(files, candidates, runningJobsCopy, hints)); + } else { + return Optional.of(new Compactable.Files(files, candidates, runningJobsCopy)); } } } @@ -806,70 +1062,31 @@ public class CompactableImpl implements Compactable { if (closed) return Optional.empty(); - if (!service.equals(getConfiguredService(job.getKind()))) + if (runningJobs.contains(job)) return Optional.empty(); - switch (selectStatus) { - case NEW: - case SELECTING: - log.trace( - "Ignoring compaction because files are being selected for user compaction {} {}", - getExtent(), job); - return Optional.empty(); - case SELECTED: { - if (job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR) { - if (selectKind == job.getKind()) { - if (!selectedFiles.containsAll(cInfo.jobFiles)) { - log.error("Ignoring {} compaction that does not contain selected files {} {} {}", - job.getKind(), getExtent(), asFileNames(selectedFiles), - asFileNames(cInfo.jobFiles)); - return Optional.empty(); - } - } else { - log.trace("Ingoring {} compaction because not selected kind {}", job.getKind(), - getExtent()); - return Optional.empty(); - } - } else if (!Collections.disjoint(selectedFiles, cInfo.jobFiles)) { - log.trace("Ingoring compaction that overlaps with selected files {} {} {}", getExtent(), - job.getKind(), asFileNames(Sets.intersection(selectedFiles, cInfo.jobFiles))); - return Optional.empty(); - } - break; - } - case CANCELED: - case NOT_ACTIVE: { - if (job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR) { - log.trace("Ignoring {} compaction because selectStatus is {} for {}", job.getKind(), - selectStatus, getExtent()); - return Optional.empty(); - } - break; - } - default: - throw new AssertionError(); - } + if (!service.equals(getConfiguredService(job.getKind()))) + return Optional.empty(); - if (Collections.disjoint(allCompactingFiles, cInfo.jobFiles)) { - allCompactingFiles.addAll(cInfo.jobFiles); - runnningJobs.add(job); - } else { + if (!fileMgr.reserveFiles(job, cInfo.jobFiles)) return Optional.empty(); - } - compactionRunning = !allCompactingFiles.isEmpty(); + if (!addJob(job)) { + throw new AssertionError(); + } switch (job.getKind()) { case SELECTOR: case USER: - Preconditions.checkState(selectStatus == FileSelectionStatus.SELECTED); - if (job.getKind() == selectKind && initiallySelectedAll - && cInfo.jobFiles.containsAll(selectedFiles)) { + var si = fileMgr.getSelectedInfo(); + + if (job.getKind() == si.selectKind && si.initiallySelectedAll + && cInfo.jobFiles.containsAll(si.selectedFiles)) { cInfo.propagateDeletes = false; } - cInfo.selectedFiles = Set.copyOf(selectedFiles); - cInfo.initiallySelectedAll = initiallySelectedAll; + cInfo.selectedFiles = si.selectedFiles; + cInfo.initiallySelectedAll = si.initiallySelectedAll; break; default: @@ -897,26 +1114,16 @@ public class CompactableImpl implements Compactable { private void completeCompaction(CompactionJob job, Set<StoredTabletFile> jobFiles, StoredTabletFile metaFile) { synchronized (this) { - Preconditions.checkState(allCompactingFiles.removeAll(jobFiles)); - Preconditions.checkState(runnningJobs.remove(job)); - compactionRunning = !allCompactingFiles.isEmpty(); + Preconditions.checkState(removeJob(job)); + fileMgr.completed(job, jobFiles, metaFile); - if (allCompactingFiles.isEmpty()) { + if (!compactionRunning) { notifyAll(); } - - if (metaFile != null) { - choppedFiles.add(metaFile); - } } checkifChopComplete(tablet.getDatafiles().keySet()); - - if ((job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR) - && metaFile != null) - selectedCompactionCompleted(job, jobFiles, metaFile); - else - selectFiles(); + selectFiles(); } @Override @@ -1112,9 +1319,9 @@ public class CompactableImpl implements Compactable { if (kind == CompactionKind.USER) { synchronized (this) { - if (selectStatus != FileSelectionStatus.NOT_ACTIVE - && selectStatus != FileSelectionStatus.CANCELED - && selectKind == CompactionKind.USER) { + if (fileMgr.getSelectionStatus() != FileSelectionStatus.NOT_ACTIVE + && fileMgr.getSelectionStatus() != FileSelectionStatus.CANCELED + && fileMgr.getSelectionKind() == CompactionKind.USER) { tmpHints = compactionConfig.getExecutionHints(); } } @@ -1183,7 +1390,7 @@ public class CompactableImpl implements Compactable { // wait while internal jobs are running or external compactions are committing, but do not // wait on external compactions that are running - while (runnningJobs.stream() + while (runningJobs.stream() .anyMatch(job -> !((CompactionExecutorIdImpl) job.getExecutor()).isExternalId()) || !externalCompactionsCommitting.isEmpty()) { try {