ACCUMULO-3259 Be a little smarter in Bulk.xml to let the backlog of tasks work 
itself down before adding more.

If the queue size on the threadpool is greater than 50 times the number of
tservers, wait 30 seconds before adding more tasks. This should still ensure
that we can keep Accumulo sufficiently busy without creating an absurd
number of tasks that the test client will never reasonably work through.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1477c130
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1477c130
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1477c130

Branch: refs/heads/1.6
Commit: 1477c130143bd2477fb8e29bc22ba4698aae3599
Parents: c4f3fc8
Author: Josh Elser <els...@apache.org>
Authored: Fri Oct 24 15:01:22 2014 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Fri Oct 24 15:24:47 2014 -0400

----------------------------------------------------------------------
 .../test/randomwalk/bulk/BulkImportTest.java    | 84 ++++++++++++++++++++
 .../test/randomwalk/bulk/BulkMinusOne.java      |  8 +-
 .../test/randomwalk/bulk/BulkPlusOne.java       | 20 ++---
 .../accumulo/test/randomwalk/bulk/BulkTest.java |  8 +-
 .../accumulo/test/randomwalk/bulk/Compact.java  |  6 +-
 .../test/randomwalk/bulk/ConsistencyCheck.java  |  6 +-
 .../accumulo/test/randomwalk/bulk/Merge.java    | 12 +--
 .../test/randomwalk/bulk/SelectiveBulkTest.java | 41 ++++++++++
 .../test/randomwalk/bulk/SelectiveQueueing.java | 48 +++++++++++
 .../accumulo/test/randomwalk/bulk/Split.java    |  6 +-
 10 files changed, 206 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
new file mode 100644
index 0000000..aa44741
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.test.randomwalk.State;
+
+/**
+ * If we have a sufficient back-up of imports, let them work off before adding 
even more bulk-imports. Imports of PlusOne must always be balanced with imports
+ * of MinusOne.
+ */
+public abstract class BulkImportTest extends BulkTest {
+
+  public static final String SKIPPED_IMPORT = "skipped.import", TRUE = 
Boolean.TRUE.toString(), FALSE = Boolean.FALSE.toString();
+
+  @Override
+  public void visit(final State state, Properties props) throws Exception {
+    /**
+     * Each visit() is performed sequentially and then submitted to the 
threadpool which will have async execution. As long as we're checking the state 
and
+     * making decisions about what to do before we submit something to the 
thread pool, we're fine.
+     */
+
+    String lastImportSkipped = state.getString(SKIPPED_IMPORT);
+    // We have a marker in the state for the previous insert, we have to 
balance skipping BulkPlusOne
+    // with skipping the new BulkMinusOne to make sure that we maintain 
consistency
+    if (null != lastImportSkipped) {
+      if (!getClass().equals(BulkMinusOne.class)) {
+        throw new IllegalStateException("Should not have a skipped import 
marker for a class other than " + BulkMinusOne.class.getName() + " but was "
+            + getClass().getName());
+      }
+
+      if (TRUE.equals(lastImportSkipped)) {
+        log.debug("Last import was skipped, skipping this import to ensure 
consistency");
+        state.remove(SKIPPED_IMPORT);
+
+        // Wait 30s to balance the skip of a BulkPlusOne/BulkMinusOne pair
+        log.debug("Waiting 30s before continuing");
+        try {
+          Thread.sleep(30 * 1000);
+        } catch (InterruptedException e) {}
+
+        return;
+      } else {
+        // last import was not skipped, remove the marker
+        state.remove(SKIPPED_IMPORT);
+      }
+    }
+
+    if (shouldQueueMoreImports(state)) {
+      super.visit(state, props);
+    } else {
+      log.debug("Not queuing more imports this round because too many are 
already queued");
+      state.set(SKIPPED_IMPORT, TRUE);
+      // Don't sleep here, let the sleep happen when we skip the next 
BulkMinusOne
+    }
+  }
+
+  private boolean shouldQueueMoreImports(State state) throws Exception {
+    // Only selectively import when it's BulkPlusOne. If we did a BulkPlusOne,
+    // we must also do a BulkMinusOne to keep the table consistent
+    if (getClass().equals(BulkPlusOne.class)) {
+      // Only queue up more imports if the number of queued tasks already
+      // exceeds the number of tservers by 50x
+      return SelectiveQueueing.shouldQueueOperation(state);
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
index 4ebf23f..1704e49 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
@@ -20,14 +20,14 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.test.randomwalk.State;
 
-public class BulkMinusOne extends BulkTest {
-  
+public class BulkMinusOne extends BulkImportTest {
+
   private static final Value negOne = new Value("-1".getBytes(Constants.UTF8));
-  
+
   @Override
   protected void runLater(State state) throws Exception {
     log.info("Decrementing");
     BulkPlusOne.bulkLoadLots(log, state, negOne);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
index cdfbb36..6d56f13 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
-public class BulkPlusOne extends BulkTest {
-  
+public class BulkPlusOne extends BulkImportTest {
+
   public static final int LOTS = 100000;
   public static final int COLS = 10;
   public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) / 
Math.log(16));
@@ -53,7 +53,7 @@ public class BulkPlusOne extends BulkTest {
   }
   public static final Text MARKER_CF = new Text("marker");
   static final AtomicLong counter = new AtomicLong();
-  
+
   private static final Value ONE = new Value("1".getBytes());
 
   static void bulkLoadLots(Logger log, State state, Value value) throws 
Exception {
@@ -64,22 +64,22 @@ public class BulkPlusOne extends BulkTest {
     final FileSystem fs = (FileSystem) state.get("fs");
     fs.mkdirs(fail);
     final int parts = rand.nextInt(10) + 1;
-    
+
     TreeSet<Integer> startRows = new TreeSet<Integer>();
     startRows.add(0);
     while (startRows.size() < parts)
       startRows.add(rand.nextInt(LOTS));
-    
+
     List<String> printRows = new ArrayList<String>(startRows.size());
     for (Integer row : startRows)
       printRows.add(String.format(FMT, row));
-    
+
     String markerColumnQualifier = String.format("%07d", 
counter.incrementAndGet());
     log.debug("preparing bulk files with start rows " + printRows + " last row 
" + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
-    
+
     List<Integer> rows = new ArrayList<Integer>(startRows);
     rows.add(LOTS);
-    
+
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.", i) + 
RFile.EXTENSION;
       FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, 
fs.getConf(), defaultConfiguration);
@@ -105,11 +105,11 @@ public class BulkPlusOne extends BulkTest {
     fs.delete(fail, true);
     log.debug("Finished bulk import, start rows " + printRows + " last row " + 
String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
   }
-  
+
   @Override
   protected void runLater(State state) throws Exception {
     log.info("Incrementing");
     bulkLoadLots(log, state, ONE);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
index 4afefd9..b24f61a 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.test.randomwalk.State;
 import org.apache.accumulo.test.randomwalk.Test;
 
 public abstract class BulkTest extends Test {
-  
+
   @Override
   public void visit(final State state, Properties props) throws Exception {
     Setup.run(state, new Runnable() {
@@ -34,10 +34,10 @@ public abstract class BulkTest extends Test {
           log.error(ex, ex);
         }
       }
-      
+
     });
   }
-  
+
   abstract protected void runLater(State state) throws Exception;
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
index 86dae5c..8b17256 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.test.randomwalk.bulk;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Compact extends BulkTest {
-  
+public class Compact extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     final Text[] points = Merge.getRandomTabletRange(state);
@@ -29,5 +29,5 @@ public class Compact extends BulkTest {
     state.getConnector().tableOperations().compact(Setup.getTableName(), 
points[0], points[1], false, true);
     log.info("Compaction " + rangeString + " finished");
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
index e60f8cf..7e528a7 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
@@ -28,8 +28,8 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class ConsistencyCheck extends BulkTest {
-  
+public class ConsistencyCheck extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     Random rand = (Random) state.get("rand");
@@ -52,5 +52,5 @@ public class ConsistencyCheck extends BulkTest {
         throw new RuntimeException("Inconsistent value at " + entry.getKey() + 
" was " + entry.getValue() + " should be " + v + " first read at " + first);
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
index 2dd0345..9d66e0c 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
@@ -22,8 +22,8 @@ import java.util.Random;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Merge extends BulkTest {
-  
+public class Merge extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     Text[] points = getRandomTabletRange(state);
@@ -31,15 +31,15 @@ public class Merge extends BulkTest {
     state.getConnector().tableOperations().merge(Setup.getTableName(), 
points[0], points[1]);
     log.info("merging " + rangeToString(points) + " complete");
   }
-  
+
   public static String rangeToString(Text[] points) {
     return "(" + (points[0] == null ? "-inf" : points[0]) + " -> " + 
(points[1] == null ? "+inf" : points[1]) + "]";
   }
-  
+
   public static Text getRandomRow(Random rand) {
     return new Text(String.format(BulkPlusOne.FMT, (rand.nextLong() & 
0x7fffffffffffffffl) % BulkPlusOne.LOTS));
   }
-  
+
   public static Text[] getRandomTabletRange(State state) {
     Random rand = (Random) state.get("rand");
     Text points[] = {getRandomRow(rand), getRandomRow(rand),};
@@ -56,5 +56,5 @@ public class Merge extends BulkTest {
     }
     return points;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
new file mode 100644
index 0000000..ca66775
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.test.randomwalk.State;
+
+/**
+ * Selectively runs the actual {@link BulkTest} based on the number of active 
TServers and the number of queued operations.
+ */
+public abstract class SelectiveBulkTest extends BulkTest {
+
+  @Override
+  public void visit(State state, Properties props) throws Exception {
+    if (SelectiveQueueing.shouldQueueOperation(state)) {
+      super.visit(state, props);
+    } else {
+      log.debug("Skipping queueing of " + getClass().getSimpleName() + " 
because of excessive queued tasks already");
+      log.debug("Waiting 30 seconds before continuing");
+      try {
+        Thread.sleep(30 * 1000);
+      } catch (InterruptedException e) {}
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
new file mode 100644
index 0000000..e6dde4d
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.test.randomwalk.State;
+import org.apache.log4j.Logger;
+
+/**
+ * Chooses whether or not an operation should be queued based on the current 
thread pool queue length and the number of available TServers.
+ */
+public class SelectiveQueueing {
+  private static final Logger log = Logger.getLogger(SelectiveQueueing.class);
+
+  public static boolean shouldQueueOperation(State state) throws Exception {
+    final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool");
+    long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - 
pool.getCompletedTaskCount();
+    final Connector conn = state.getConnector();
+    int numTservers = conn.instanceOperations().getTabletServers().size();
+
+    if (!shouldQueue(queuedThreads, numTservers)) {
+      log.info("Not queueing because of " + queuedThreads + " outstanding 
tasks");
+      return false;
+    }
+
+    return true;
+  }
+
+  private static boolean shouldQueue(long queuedThreads, int numTservers) {
+    return queuedThreads < numTservers * 50;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
index 157e2ab..7a93321 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
@@ -23,8 +23,8 @@ import java.util.TreeSet;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Split extends BulkTest {
-  
+public class Split extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     SortedSet<Text> splits = new TreeSet<Text>();
@@ -36,5 +36,5 @@ public class Split extends BulkTest {
     state.getConnector().tableOperations().addSplits(Setup.getTableName(), 
splits);
     log.info("split for " + splits + " finished");
   }
-  
+
 }

Reply via email to