Repository: accumulo Updated Branches: refs/heads/master 6d8f41145 -> 3c57b662f
ACCUMULO-4572 Avoid blocking when looking for files w/o sample Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/95be9f3c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/95be9f3c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/95be9f3c Branch: refs/heads/master Commit: 95be9f3c420b28e9dfcc11bc05ef05c3cf47a74a Parents: 2a0375a Author: Keith Turner <ktur...@apache.org> Authored: Fri Jan 20 15:01:17 2017 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Fri Jan 20 15:01:17 2017 -0500 ---------------------------------------------------------------------- .../ConfigurableCompactionStrategy.java | 64 ++++++++++++++++---- 1 file changed, 52 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/95be9f3c/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java index 04915ef..5ec175b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java @@ -19,6 +19,8 @@ package org.apache.accumulo.tserver.compaction.strategies; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -39,27 +41,57 @@ import org.apache.hadoop.fs.Path; public class ConfigurableCompactionStrategy extends CompactionStrategy { - private static interface Test { - boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request); + private static abstract class Test { + // Do any work that blocks in this method. This method is not always called before shouldCompact(). See CompactionStrategy javadocs. + void gatherInformation(MajorCompactionRequest request) {} + + abstract boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request); } - private static class NoSampleTest implements Test { + private static class NoSampleTest extends Test { + + private Set<FileRef> filesWithSample = Collections.emptySet(); + private boolean samplingConfigured = true; + private boolean gatherCalled = false; + + @Override + void gatherInformation(MajorCompactionRequest request) { + gatherCalled = true; + + SamplerConfigurationImpl sc = SamplerConfigurationImpl.newSamplerConfig(new ConfigurationCopy(request.getTableProperties())); + if (sc == null) { + samplingConfigured = false; + } else { + filesWithSample = new HashSet<>(); + for (FileRef fref : request.getFiles().keySet()) { + try (FileSKVIterator reader = request.openReader(fref)) { + if (reader.getSample(sc) != null) { + filesWithSample.add(fref); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } @Override public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) { - try (FileSKVIterator reader = request.openReader(file.getKey())) { + + if (!gatherCalled) { SamplerConfigurationImpl sc = SamplerConfigurationImpl.newSamplerConfig(new ConfigurationCopy(request.getTableProperties())); - if (sc == null) { - return false; - } - return reader.getSample(sc) == null; - } catch (IOException e) { - throw new RuntimeException(e); + return sc != null; + } + + if (!samplingConfigured) { + return false; } + + return !filesWithSample.contains(file.getKey()); } } - private static abstract class FileSizeTest implements Test { + private static abstract class FileSizeTest extends Test { private final long esize; private FileSizeTest(String s) { @@ -74,7 +106,7 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy { public abstract boolean shouldCompact(long fsize, long esize); } - private static abstract class PatternPathTest implements Test { + private static abstract class PatternPathTest extends Test { private Pattern pattern; private PatternPathTest(String p) { @@ -188,6 +220,14 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy { } @Override + public void gatherInformation(MajorCompactionRequest request) throws IOException { + // Gather any information that requires blocking calls here. This is only called before getCompactionPlan() is called. + for (Test test : tests) { + test.gatherInformation(request); + } + } + + @Override public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { List<FileRef> filesToCompact = getFilesToCompact(request); if (filesToCompact.size() >= minFiles) {