Repository: accumulo
Updated Branches:
  refs/heads/master 6d8f41145 -> 3c57b662f


ACCUMULO-4572 Avoid blocking when looking for files w/o sample


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/95be9f3c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/95be9f3c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/95be9f3c

Branch: refs/heads/master
Commit: 95be9f3c420b28e9dfcc11bc05ef05c3cf47a74a
Parents: 2a0375a
Author: Keith Turner <ktur...@apache.org>
Authored: Fri Jan 20 15:01:17 2017 -0500
Committer: Keith Turner <ktur...@apache.org>
Committed: Fri Jan 20 15:01:17 2017 -0500

----------------------------------------------------------------------
 .../ConfigurableCompactionStrategy.java         | 64 ++++++++++++++++----
 1 file changed, 52 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/95be9f3c/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
index 04915ef..5ec175b 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
@@ -19,6 +19,8 @@ package org.apache.accumulo.tserver.compaction.strategies;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,27 +41,57 @@ import org.apache.hadoop.fs.Path;
 
 public class ConfigurableCompactionStrategy extends CompactionStrategy {
 
-  private static interface Test {
-    boolean shouldCompact(Entry<FileRef,DataFileValue> file, 
MajorCompactionRequest request);
+  private static abstract class Test {
+    // Do any work that blocks in this method. This method is not always 
called before shouldCompact(). See CompactionStrategy javadocs.
+    void gatherInformation(MajorCompactionRequest request) {}
+
+    abstract boolean shouldCompact(Entry<FileRef,DataFileValue> file, 
MajorCompactionRequest request);
   }
 
-  private static class NoSampleTest implements Test {
+  private static class NoSampleTest extends Test {
+
+    private Set<FileRef> filesWithSample = Collections.emptySet();
+    private boolean samplingConfigured = true;
+    private boolean gatherCalled = false;
+
+    @Override
+    void gatherInformation(MajorCompactionRequest request) {
+      gatherCalled = true;
+
+      SamplerConfigurationImpl sc = 
SamplerConfigurationImpl.newSamplerConfig(new 
ConfigurationCopy(request.getTableProperties()));
+      if (sc == null) {
+        samplingConfigured = false;
+      } else {
+        filesWithSample = new HashSet<>();
+        for (FileRef fref : request.getFiles().keySet()) {
+          try (FileSKVIterator reader = request.openReader(fref)) {
+            if (reader.getSample(sc) != null) {
+              filesWithSample.add(fref);
+            }
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
 
     @Override
     public boolean shouldCompact(Entry<FileRef,DataFileValue> file, 
MajorCompactionRequest request) {
-      try (FileSKVIterator reader = request.openReader(file.getKey())) {
+
+      if (!gatherCalled) {
         SamplerConfigurationImpl sc = 
SamplerConfigurationImpl.newSamplerConfig(new 
ConfigurationCopy(request.getTableProperties()));
-        if (sc == null) {
-          return false;
-        }
-        return reader.getSample(sc) == null;
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+        return sc != null;
+      }
+
+      if (!samplingConfigured) {
+        return false;
       }
+
+      return !filesWithSample.contains(file.getKey());
     }
   }
 
-  private static abstract class FileSizeTest implements Test {
+  private static abstract class FileSizeTest extends Test {
     private final long esize;
 
     private FileSizeTest(String s) {
@@ -74,7 +106,7 @@ public class ConfigurableCompactionStrategy extends 
CompactionStrategy {
     public abstract boolean shouldCompact(long fsize, long esize);
   }
 
-  private static abstract class PatternPathTest implements Test {
+  private static abstract class PatternPathTest extends Test {
     private Pattern pattern;
 
     private PatternPathTest(String p) {
@@ -188,6 +220,14 @@ public class ConfigurableCompactionStrategy extends 
CompactionStrategy {
   }
 
   @Override
+  public void gatherInformation(MajorCompactionRequest request) throws 
IOException {
+    // Gather any information that requires blocking calls here. This is only 
called before getCompactionPlan() is called.
+    for (Test test : tests) {
+      test.gatherInformation(request);
+    }
+  }
+
+  @Override
   public CompactionPlan getCompactionPlan(MajorCompactionRequest request) 
throws IOException {
     List<FileRef> filesToCompact = getFilesToCompact(request);
     if (filesToCompact.size() >= minFiles) {

Reply via email to