Merge branch '1.6'

Conflicts:
        test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
        
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
        
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ebe6eecc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ebe6eecc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ebe6eecc

Branch: refs/heads/master
Commit: ebe6eecc749e4456c7e53e232d61d7f9d73f86ed
Parents: eaaebdf 1635ad5
Author: Josh Elser <els...@apache.org>
Authored: Fri Oct 24 16:20:27 2014 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Fri Oct 24 16:20:27 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/test/randomwalk/Module.java | 418 +++++++++++--------
 .../test/randomwalk/bulk/BulkImportTest.java    |  85 ++++
 .../test/randomwalk/bulk/BulkMinusOne.java      |   8 +-
 .../test/randomwalk/bulk/BulkPlusOne.java       |  20 +-
 .../accumulo/test/randomwalk/bulk/BulkTest.java |   8 +-
 .../accumulo/test/randomwalk/bulk/Compact.java  |   6 +-
 .../test/randomwalk/bulk/ConsistencyCheck.java  |   6 +-
 .../accumulo/test/randomwalk/bulk/Merge.java    |  12 +-
 .../test/randomwalk/bulk/SelectiveBulkTest.java |  42 ++
 .../test/randomwalk/bulk/SelectiveQueueing.java |  49 +++
 .../accumulo/test/randomwalk/bulk/Split.java    |   6 +-
 11 files changed, 450 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
index adf9b04,5756934..353a02b
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
@@@ -36,9 -41,10 +42,10 @@@ import javax.xml.parsers.DocumentBuilde
  import javax.xml.validation.Schema;
  import javax.xml.validation.SchemaFactory;
  
 -import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+ import org.apache.accumulo.core.util.SimpleThreadPool;
  import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
  import org.w3c.dom.Document;
  import org.w3c.dom.Element;
  import org.w3c.dom.NodeList;
@@@ -47,19 -53,17 +54,20 @@@
   * A module is directed graph of tests
   */
  public class Module extends Node {
-   
+ 
 +  private static final Logger log = Logger.getLogger(Module.class);
-   
++
++
    private class Dummy extends Node {
-     
+ 
      String name;
-     
+ 
      Dummy(String name) {
        this.name = name;
      }
-     
+ 
      @Override
 -    public void visit(State state, Properties props) {
 +    public void visit(State state, Environment env, Properties props) {
        String print;
        if ((print = props.getProperty("print")) != null) {
          Level level = Level.toLevel(print);
@@@ -82,12 -87,13 +91,13 @@@
        target = null;
        this.id = id;
      }
-     
+ 
      @Override
 -    public void visit(State state, Properties props) throws Exception {
 +    public void visit(State state, Environment env, Properties props) throws 
Exception {
        throw new Exception("You don't visit aliases!");
      }
-     
+ 
+     @Override
      public String toString() {
        return id;
      }
@@@ -168,12 -174,12 +178,12 @@@
      this.xmlFile = xmlFile;
      loadFromXml();
    }
-   
+ 
    @Override
-   public void visit(State state, Environment env, Properties props) throws 
Exception {
 -  public void visit(final State state, Properties props) throws Exception {
++  public void visit(final State state, final Environment env, Properties 
props) throws Exception {
      int maxHops, maxSec;
      boolean teardown;
-     
+ 
      Properties initProps = getProps("_init");
      initProps.putAll(props);
      String prop;
@@@ -191,117 -197,173 +201,171 @@@
        teardown = true;
      else
        teardown = false;
-     
+ 
      if (fixture != null) {
 -      fixture.setUp(state);
 +      fixture.setUp(state, env);
      }
-     
-     Node initNode = getNode(initNodeId);
-     
-     boolean test = false;
-     if (initNode instanceof Test) {
-       startTimer(initNode);
-       test = true;
-     }
-     initNode.visit(state, env, getProps(initNodeId));
-     if (test)
-       stopTimer(initNode);
-     
-     // update aliases
-     Set<String> aliases;
-     if ((aliases = aliasMap.get(initNodeId)) != null)
-       for (String alias : aliases) {
-         ((Alias) nodes.get(alias)).update(initNodeId);
-       }
-     
-     String curNodeId = initNodeId;
-     int numHops = 0;
-     long startTime = System.currentTimeMillis() / 1000;
-     while (true) {
-       // check if END state was reached
-       if (curNodeId.equalsIgnoreCase("END")) {
-         log.debug("reached END state");
-         break;
-       }
-       // check if maxSec was reached
-       long curTime = System.currentTimeMillis() / 1000;
-       if ((curTime - startTime) > maxSec) {
-         log.debug("reached maxSec(" + maxSec + ")");
-         break;
-       }
-       // check if maxHops was reached
-       if (numHops > maxHops) {
-         log.debug("reached maxHops(" + maxHops + ")");
-         break;
-       }
-       numHops++;
-       
-       if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
-         throw new Exception("Reached node(" + curNodeId + ") without outgoing 
edges in module(" + this + ")");
-       }
-       AdjList adj = adjMap.get(curNodeId);
-       String nextNodeId = adj.randomNeighbor();
-       Node nextNode = getNode(nextNodeId);
-       if (nextNode instanceof Alias) {
-         nextNodeId = ((Alias) nextNode).getTargetId();
-         nextNode = ((Alias) nextNode).get();
+ 
+     ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner");
+ 
+     try {
+       Node initNode = getNode(initNodeId);
+ 
+       boolean test = false;
+       if (initNode instanceof Test) {
+         startTimer(initNode);
+         test = true;
        }
-       Properties nodeProps = getProps(nextNodeId);
-       try {
-         test = false;
-         if (nextNode instanceof Test) {
-           startTimer(nextNode);
-           test = true;
 -      initNode.visit(state, getProps(initNodeId));
++      initNode.visit(state, env, getProps(initNodeId));
+       if (test)
+         stopTimer(initNode);
+ 
 -      state.visitedNode();
+       // update aliases
+       Set<String> aliases;
+       if ((aliases = aliasMap.get(initNodeId)) != null)
+         for (String alias : aliases) {
+           ((Alias) nodes.get(alias)).update(initNodeId);
          }
-         nextNode.visit(state, env, nodeProps);
-         if (test)
-           stopTimer(nextNode);
-       } catch (Exception e) {
-         log.debug("Connector belongs to user: " + 
env.getConnector().whoami());
-         log.debug("Exception occured at: " + System.currentTimeMillis());
-         log.debug("Properties for node: " + nextNodeId);
-         for (Entry<Object,Object> entry : nodeProps.entrySet()) {
-           log.debug("  " + entry.getKey() + ": " + entry.getValue());
+ 
+       String curNodeId = initNodeId;
+       int numHops = 0;
+       long startTime = System.currentTimeMillis() / 1000;
+       while (true) {
+         // check if END state was reached
+         if (curNodeId.equalsIgnoreCase("END")) {
+           log.debug("reached END state");
+           break;
          }
-         log.debug("Overall Configuration Properties");
-         for (Entry<Object,Object> entry : 
env.copyConfigProperties().entrySet()) {
-           log.debug("  " + entry.getKey() + ": " + entry.getValue());
+         // check if maxSec was reached
+         long curTime = System.currentTimeMillis() / 1000;
+         if ((curTime - startTime) > maxSec) {
+           log.debug("reached maxSec(" + maxSec + ")");
+           break;
          }
-         log.debug("State information");
-         for (String key : new TreeSet<String>(state.getMap().keySet()))  {
-           Object value = state.getMap().get(key);
-           String logMsg = "  " + key + ": ";
-           if (value == null)
-             logMsg += "null";
-           else if (value instanceof String || value instanceof Map || value 
instanceof Collection || value instanceof Number)
-             logMsg += value;
-           else if (value instanceof byte[])
-             logMsg += new String((byte[])value, StandardCharsets.UTF_8);
-           else if (value instanceof PasswordToken)
-             logMsg += new String(((PasswordToken) value).getPassword(), 
StandardCharsets.UTF_8);
-           else
-             logMsg += value.getClass()+ " - " + value;
-           
-           log.debug(logMsg);
+ 
+         // The number of seconds before the test should exit
+         long secondsRemaining = maxSec - (curTime - startTime);
+ 
+         // check if maxHops was reached
+         if (numHops > maxHops) {
+           log.debug("reached maxHops(" + maxHops + ")");
+           break;
          }
-         throw new Exception("Error running node " + nextNodeId, e);
-       }
-       
-       // update aliases
-       if ((aliases = aliasMap.get(curNodeId)) != null)
-         for (String alias : aliases) {
-           ((Alias) nodes.get(alias)).update(curNodeId);
+         numHops++;
+ 
+         if (!adjMap.containsKey(curNodeId) && 
!curNodeId.startsWith("alias.")) {
+           throw new Exception("Reached node(" + curNodeId + ") without 
outgoing edges in module(" + this + ")");
+         }
+         AdjList adj = adjMap.get(curNodeId);
+         String nextNodeId = adj.randomNeighbor();
+         final Node nextNode;
+         Node nextNodeOrAlias = getNode(nextNodeId);
+         if (nextNodeOrAlias instanceof Alias) {
+           nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
+           nextNode = ((Alias) nextNodeOrAlias).get();
+         } else {
+           nextNode = nextNodeOrAlias;
+         }
+         final Properties nodeProps = getProps(nextNodeId);
+         try {
+           test = false;
+           if (nextNode instanceof Test) {
+             startTimer(nextNode);
+             test = true;
+           }
+ 
+           // Wrap the visit of the next node in the module in a callable that 
returns a thrown exception
+           FutureTask<Exception> task = new FutureTask<Exception>(new 
Callable<Exception>() {
+ 
+             @Override
+             public Exception call() throws Exception {
+               try {
 -                nextNode.visit(state, nodeProps);
++                nextNode.visit(state, env, nodeProps);
+                 return null;
+               } catch (Exception e) {
+                 return e;
+               }
+             }
+ 
+           });
+ 
+           // Run the task (should execute immediately)
+           service.submit(task);
+ 
+           Exception nodeException;
+           try {
+             // Bound the time we'll wait for the node to complete
+             nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
+           } catch (InterruptedException e) {
+             log.warn("Interrupted waiting for " + 
nextNode.getClass().getSimpleName() + " to complete. Exiting.", e);
+             break;
+           } catch (ExecutionException e) {
+             log.error("Caught error executing " + 
nextNode.getClass().getSimpleName(), e);
+             throw e;
+           } catch (TimeoutException e) {
+             log.info("Timed out waiting for " + 
nextNode.getClass().getSimpleName() + " to complete (waited " + 
secondsRemaining + " seconds). Exiting.", e);
+             break;
+           }
+ 
+           // The RandomWalk node throw an Exception that that Callable handed 
back
+           // Throw it and let the Module perform cleanup
+           if (null != nodeException) {
+             throw nodeException;
+           }
+ 
+           if (test)
+             stopTimer(nextNode);
+         } catch (Exception e) {
 -          log.debug("Connector belongs to user: " + 
state.getConnector().whoami());
++          log.debug("Connector belongs to user: " + 
env.getConnector().whoami());
+           log.debug("Exception occured at: " + System.currentTimeMillis());
+           log.debug("Properties for node: " + nextNodeId);
+           for (Entry<Object,Object> entry : nodeProps.entrySet()) {
+             log.debug("  " + entry.getKey() + ": " + entry.getValue());
+           }
 -          log.debug("Overall Properties");
 -          for (Entry<Object,Object> entry : state.getProperties().entrySet()) 
{
++          log.debug("Overall Configuration Properties");
++          for (Entry<Object,Object> entry : 
env.copyConfigProperties().entrySet()) {
+             log.debug("  " + entry.getKey() + ": " + entry.getValue());
+           }
+           log.debug("State information");
+           for (String key : new TreeSet<String>(state.getMap().keySet())) {
+             Object value = state.getMap().get(key);
+             String logMsg = "  " + key + ": ";
+             if (value == null)
+               logMsg += "null";
+             else if (value instanceof String || value instanceof Map || value 
instanceof Collection || value instanceof Number)
+               logMsg += value;
+             else if (value instanceof byte[])
 -              logMsg += new String((byte[]) value, Constants.UTF8);
++              logMsg += new String((byte[]) value, StandardCharsets.UTF_8);
+             else if (value instanceof PasswordToken)
 -              logMsg += new String(((PasswordToken) value).getPassword(), 
Constants.UTF8);
++              logMsg += new String(((PasswordToken) value).getPassword(), 
StandardCharsets.UTF_8);
+             else
+               logMsg += value.getClass() + " - " + value;
+ 
+             log.debug(logMsg);
+           }
+           throw new Exception("Error running node " + nextNodeId, e);
          }
-       
-       curNodeId = nextNodeId;
 -        state.visitedNode();
+ 
+         // update aliases
+         if ((aliases = aliasMap.get(curNodeId)) != null)
+           for (String alias : aliases) {
+             ((Alias) nodes.get(alias)).update(curNodeId);
+           }
+ 
+         curNodeId = nextNodeId;
+       }
+     } finally {
+       if (null != service) {
+         service.shutdownNow();
+       }
      }
-     
+ 
      if (teardown && (fixture != null)) {
        log.debug("tearing down module");
 -      fixture.tearDown(state);
 +      fixture.tearDown(state, env);
      }
    }
-   
+ 
    Thread timer = null;
    final int time = 5 * 1000 * 60;
    AtomicBoolean runningLong = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
index 0000000,aa44741..33d7701
mode 000000,100644..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
@@@ -1,0 -1,84 +1,85 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.randomwalk.bulk;
+ 
+ import java.util.Properties;
+ 
++import org.apache.accumulo.test.randomwalk.Environment;
+ import org.apache.accumulo.test.randomwalk.State;
+ 
+ /**
+  * If we have a sufficient back-up of imports, let them work off before 
adding even more bulk-imports. Imports of PlusOne must always be balanced with 
imports
+  * of MinusOne.
+  */
+ public abstract class BulkImportTest extends BulkTest {
+ 
+   public static final String SKIPPED_IMPORT = "skipped.import", TRUE = 
Boolean.TRUE.toString(), FALSE = Boolean.FALSE.toString();
+ 
+   @Override
 -  public void visit(final State state, Properties props) throws Exception {
++  public void visit(final State state, Environment env, Properties props) 
throws Exception {
+     /**
+      * Each visit() is performed sequentially and then submitted to the 
threadpool which will have async execution. As long as we're checking the state 
and
+      * making decisions about what to do before we submit something to the 
thread pool, we're fine.
+      */
+ 
+     String lastImportSkipped = state.getString(SKIPPED_IMPORT);
+     // We have a marker in the state for the previous insert, we have to 
balance skipping BulkPlusOne
+     // with skipping the new BulkMinusOne to make sure that we maintain 
consistency
+     if (null != lastImportSkipped) {
+       if (!getClass().equals(BulkMinusOne.class)) {
+         throw new IllegalStateException("Should not have a skipped import 
marker for a class other than " + BulkMinusOne.class.getName() + " but was "
+             + getClass().getName());
+       }
+ 
+       if (TRUE.equals(lastImportSkipped)) {
+         log.debug("Last import was skipped, skipping this import to ensure 
consistency");
+         state.remove(SKIPPED_IMPORT);
+ 
+         // Wait 30s to balance the skip of a BulkPlusOne/BulkMinusOne pair
+         log.debug("Waiting 30s before continuing");
+         try {
+           Thread.sleep(30 * 1000);
+         } catch (InterruptedException e) {}
+ 
+         return;
+       } else {
+         // last import was not skipped, remove the marker
+         state.remove(SKIPPED_IMPORT);
+       }
+     }
+ 
 -    if (shouldQueueMoreImports(state)) {
 -      super.visit(state, props);
++    if (shouldQueueMoreImports(state, env)) {
++      super.visit(state, env, props);
+     } else {
+       log.debug("Not queuing more imports this round because too many are 
already queued");
+       state.set(SKIPPED_IMPORT, TRUE);
+       // Don't sleep here, let the sleep happen when we skip the next 
BulkMinusOne
+     }
+   }
+ 
 -  private boolean shouldQueueMoreImports(State state) throws Exception {
++  private boolean shouldQueueMoreImports(State state, Environment env) throws 
Exception {
+     // Only selectively import when it's BulkPlusOne. If we did a BulkPlusOne,
+     // we must also do a BulkMinusOne to keep the table consistent
+     if (getClass().equals(BulkPlusOne.class)) {
+       // Only queue up more imports if the number of queued tasks already
+       // exceeds the number of tservers by 50x
 -      return SelectiveQueueing.shouldQueueOperation(state);
++      return SelectiveQueueing.shouldQueueOperation(state, env);
+     }
+ 
+     return true;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
index f6f2e87,1704e49..6338d29
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
@@@ -16,20 -16,18 +16,20 @@@
   */
  package org.apache.accumulo.test.randomwalk.bulk;
  
 -import org.apache.accumulo.core.Constants;
 +import java.nio.charset.StandardCharsets;
 +
  import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.test.randomwalk.Environment;
  import org.apache.accumulo.test.randomwalk.State;
  
- public class BulkMinusOne extends BulkTest {
-   
+ public class BulkMinusOne extends BulkImportTest {
+ 
 -  private static final Value negOne = new 
Value("-1".getBytes(Constants.UTF8));
 +  private static final Value negOne = new 
Value("-1".getBytes(StandardCharsets.UTF_8));
-   
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      log.info("Decrementing");
 -    BulkPlusOne.bulkLoadLots(log, state, negOne);
 +    BulkPlusOne.bulkLoadLots(log, state, env, negOne);
    }
-   
+ 
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
index d605e8e,6d56f13..c54a8e7
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
@@@ -54,10 -53,10 +54,10 @@@ public class BulkPlusOne extends BulkIm
    }
    public static final Text MARKER_CF = new Text("marker");
    static final AtomicLong counter = new AtomicLong();
-   
+ 
    private static final Value ONE = new Value("1".getBytes());
  
 -  static void bulkLoadLots(Logger log, State state, Value value) throws 
Exception {
 +  static void bulkLoadLots(Logger log, State state, Environment env, Value 
value) throws Exception {
      final Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
      final Path fail = new Path(dir.toString() + "_fail");
      final DefaultConfiguration defaultConfiguration = 
AccumuloConfiguration.getDefaultConfiguration();
@@@ -106,11 -105,11 +106,11 @@@
      fs.delete(fail, true);
      log.debug("Finished bulk import, start rows " + printRows + " last row " 
+ String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
    }
-   
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      log.info("Incrementing");
 -    bulkLoadLots(log, state, ONE);
 +    bulkLoadLots(log, state, env, ONE);
    }
-   
+ 
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
index 6c0c68e,b24f61a..07d1f4c
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
@@@ -23,9 -22,9 +23,9 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.Test;
  
  public abstract class BulkTest extends Test {
-   
+ 
    @Override
 -  public void visit(final State state, Properties props) throws Exception {
 +  public void visit(final State state, final Environment env, Properties 
props) throws Exception {
      Setup.run(state, new Runnable() {
        @Override
        public void run() {
@@@ -35,10 -34,10 +35,10 @@@
            log.error(ex, ex);
          }
        }
-       
+ 
      });
    }
-   
+ 
 -  abstract protected void runLater(State state) throws Exception;
 +  abstract protected void runLater(State state, Environment env) throws 
Exception;
-   
+ 
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
index 7561709,8b17256..c526ffa
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
@@@ -20,15 -19,15 +20,15 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.State;
  import org.apache.hadoop.io.Text;
  
- public class Compact extends BulkTest {
-   
+ public class Compact extends SelectiveBulkTest {
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      final Text[] points = Merge.getRandomTabletRange(state);
      final String rangeString = Merge.rangeToString(points);
      log.info("Compacting " + rangeString);
 -    state.getConnector().tableOperations().compact(Setup.getTableName(), 
points[0], points[1], false, true);
 +    env.getConnector().tableOperations().compact(Setup.getTableName(), 
points[0], points[1], false, true);
      log.info("Compaction " + rangeString + " finished");
    }
-   
+ 
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
index d1ff8cb,7e528a7..39ef3d8
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
@@@ -29,10 -28,10 +29,10 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.State;
  import org.apache.hadoop.io.Text;
  
- public class ConsistencyCheck extends BulkTest {
-   
+ public class ConsistencyCheck extends SelectiveBulkTest {
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      Random rand = (Random) state.get("rand");
      Text row = Merge.getRandomRow(rand);
      log.info("Checking " + row);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
index 04af6c5,9d66e0c..8508242
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
@@@ -23,16 -22,16 +23,16 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.State;
  import org.apache.hadoop.io.Text;
  
- public class Merge extends BulkTest {
-   
+ public class Merge extends SelectiveBulkTest {
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      Text[] points = getRandomTabletRange(state);
      log.info("merging " + rangeToString(points));
 -    state.getConnector().tableOperations().merge(Setup.getTableName(), 
points[0], points[1]);
 +    env.getConnector().tableOperations().merge(Setup.getTableName(), 
points[0], points[1]);
      log.info("merging " + rangeToString(points) + " complete");
    }
-   
+ 
    public static String rangeToString(Text[] points) {
      return "(" + (points[0] == null ? "-inf" : points[0]) + " -> " + 
(points[1] == null ? "+inf" : points[1]) + "]";
    }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
index 0000000,ca66775..48bf86a
mode 000000,100644..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
@@@ -1,0 -1,41 +1,42 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.randomwalk.bulk;
+ 
+ import java.util.Properties;
+ 
++import org.apache.accumulo.test.randomwalk.Environment;
+ import org.apache.accumulo.test.randomwalk.State;
+ 
+ /**
+  * Selectively runs the actual {@link BulkTest} based on the number of active 
TServers and the number of queued operations.
+  */
+ public abstract class SelectiveBulkTest extends BulkTest {
+ 
+   @Override
 -  public void visit(State state, Properties props) throws Exception {
 -    if (SelectiveQueueing.shouldQueueOperation(state)) {
 -      super.visit(state, props);
++  public void visit(State state, Environment env, Properties props) throws 
Exception {
++    if (SelectiveQueueing.shouldQueueOperation(state, env)) {
++      super.visit(state, env, props);
+     } else {
+       log.debug("Skipping queueing of " + getClass().getSimpleName() + " 
because of excessive queued tasks already");
+       log.debug("Waiting 30 seconds before continuing");
+       try {
+         Thread.sleep(30 * 1000);
+       } catch (InterruptedException e) {}
+     }
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
index 0000000,e6dde4d..c7f490c
mode 000000,100644..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
@@@ -1,0 -1,48 +1,49 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.randomwalk.bulk;
+ 
+ import java.util.concurrent.ThreadPoolExecutor;
+ 
+ import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.test.randomwalk.Environment;
+ import org.apache.accumulo.test.randomwalk.State;
+ import org.apache.log4j.Logger;
+ 
+ /**
+  * Chooses whether or not an operation should be queued based on the current 
thread pool queue length and the number of available TServers.
+  */
+ public class SelectiveQueueing {
+   private static final Logger log = Logger.getLogger(SelectiveQueueing.class);
+ 
 -  public static boolean shouldQueueOperation(State state) throws Exception {
++  public static boolean shouldQueueOperation(State state, Environment env) 
throws Exception {
+     final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool");
+     long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - 
pool.getCompletedTaskCount();
 -    final Connector conn = state.getConnector();
++    final Connector conn = env.getConnector();
+     int numTservers = conn.instanceOperations().getTabletServers().size();
+ 
+     if (!shouldQueue(queuedThreads, numTservers)) {
+       log.info("Not queueing because of " + queuedThreads + " outstanding 
tasks");
+       return false;
+     }
+ 
+     return true;
+   }
+ 
+   private static boolean shouldQueue(long queuedThreads, int numTservers) {
+     return queuedThreads < numTservers * 50;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
index 21b453d,7a93321..b69805d
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
@@@ -24,18 -23,18 +24,18 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.State;
  import org.apache.hadoop.io.Text;
  
- public class Split extends BulkTest {
-   
+ public class Split extends SelectiveBulkTest {
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      SortedSet<Text> splits = new TreeSet<Text>();
      Random rand = (Random) state.get("rand");
      int count = rand.nextInt(20);
      for (int i = 0; i < count; i++)
        splits.add(new Text(String.format(BulkPlusOne.FMT, (rand.nextLong() & 
0x7fffffffffffffffl) % BulkPlusOne.LOTS)));
      log.info("splitting " + splits);
 -    state.getConnector().tableOperations().addSplits(Setup.getTableName(), 
splits);
 +    env.getConnector().tableOperations().addSplits(Setup.getTableName(), 
splits);
      log.info("split for " + splits + " finished");
    }
-   
+ 
  }

Reply via email to