Merge branch '1.6' Conflicts: test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ebe6eecc Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ebe6eecc Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ebe6eecc Branch: refs/heads/master Commit: ebe6eecc749e4456c7e53e232d61d7f9d73f86ed Parents: eaaebdf 1635ad5 Author: Josh Elser <els...@apache.org> Authored: Fri Oct 24 16:20:27 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Fri Oct 24 16:20:27 2014 -0400 ---------------------------------------------------------------------- .../apache/accumulo/test/randomwalk/Module.java | 418 +++++++++++-------- .../test/randomwalk/bulk/BulkImportTest.java | 85 ++++ .../test/randomwalk/bulk/BulkMinusOne.java | 8 +- .../test/randomwalk/bulk/BulkPlusOne.java | 20 +- .../accumulo/test/randomwalk/bulk/BulkTest.java | 8 +- .../accumulo/test/randomwalk/bulk/Compact.java | 6 +- .../test/randomwalk/bulk/ConsistencyCheck.java | 6 +- .../accumulo/test/randomwalk/bulk/Merge.java | 12 +- .../test/randomwalk/bulk/SelectiveBulkTest.java | 42 ++ .../test/randomwalk/bulk/SelectiveQueueing.java | 49 +++ .../accumulo/test/randomwalk/bulk/Split.java | 6 +- 11 files changed, 450 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java index adf9b04,5756934..353a02b --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java @@@ -36,9 -41,10 +42,10 @@@ import javax.xml.parsers.DocumentBuilde import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.security.tokens.PasswordToken; + import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; @@@ -47,19 -53,17 +54,20 @@@ * A module is directed graph of tests */ public class Module extends Node { - + + private static final Logger log = Logger.getLogger(Module.class); - ++ ++ private class Dummy extends Node { - + String name; - + Dummy(String name) { this.name = name; } - + @Override - public void visit(State state, Properties props) { + public void visit(State state, Environment env, Properties props) { String print; if ((print = props.getProperty("print")) != null) { Level level = Level.toLevel(print); @@@ -82,12 -87,13 +91,13 @@@ target = null; this.id = id; } - + @Override - public void visit(State state, Properties props) throws Exception { + public void visit(State state, Environment env, Properties props) throws Exception { throw new Exception("You don't visit aliases!"); } - + + @Override public String toString() { return id; } @@@ -168,12 -174,12 +178,12 @@@ this.xmlFile = xmlFile; loadFromXml(); } - + @Override - public void visit(State state, Environment env, Properties props) throws Exception { - public void visit(final State state, Properties props) throws Exception { ++ public void visit(final State state, final Environment env, Properties props) throws Exception { int maxHops, maxSec; boolean teardown; - + Properties initProps = getProps("_init"); initProps.putAll(props); String prop; @@@ -191,117 -197,173 +201,171 @@@ teardown = true; else teardown = false; - + if (fixture != null) { - fixture.setUp(state); + fixture.setUp(state, env); } - - Node initNode = getNode(initNodeId); - - boolean test = false; - if (initNode instanceof Test) { - startTimer(initNode); - test = true; - } - initNode.visit(state, env, getProps(initNodeId)); - if (test) - stopTimer(initNode); - - // update aliases - Set<String> aliases; - if ((aliases = aliasMap.get(initNodeId)) != null) - for (String alias : aliases) { - ((Alias) nodes.get(alias)).update(initNodeId); - } - - String curNodeId = initNodeId; - int numHops = 0; - long startTime = System.currentTimeMillis() / 1000; - while (true) { - // check if END state was reached - if (curNodeId.equalsIgnoreCase("END")) { - log.debug("reached END state"); - break; - } - // check if maxSec was reached - long curTime = System.currentTimeMillis() / 1000; - if ((curTime - startTime) > maxSec) { - log.debug("reached maxSec(" + maxSec + ")"); - break; - } - // check if maxHops was reached - if (numHops > maxHops) { - log.debug("reached maxHops(" + maxHops + ")"); - break; - } - numHops++; - - if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) { - throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")"); - } - AdjList adj = adjMap.get(curNodeId); - String nextNodeId = adj.randomNeighbor(); - Node nextNode = getNode(nextNodeId); - if (nextNode instanceof Alias) { - nextNodeId = ((Alias) nextNode).getTargetId(); - nextNode = ((Alias) nextNode).get(); + + ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner"); + + try { + Node initNode = getNode(initNodeId); + + boolean test = false; + if (initNode instanceof Test) { + startTimer(initNode); + test = true; } - Properties nodeProps = getProps(nextNodeId); - try { - test = false; - if (nextNode instanceof Test) { - startTimer(nextNode); - test = true; - initNode.visit(state, getProps(initNodeId)); ++ initNode.visit(state, env, getProps(initNodeId)); + if (test) + stopTimer(initNode); + - state.visitedNode(); + // update aliases + Set<String> aliases; + if ((aliases = aliasMap.get(initNodeId)) != null) + for (String alias : aliases) { + ((Alias) nodes.get(alias)).update(initNodeId); } - nextNode.visit(state, env, nodeProps); - if (test) - stopTimer(nextNode); - } catch (Exception e) { - log.debug("Connector belongs to user: " + env.getConnector().whoami()); - log.debug("Exception occured at: " + System.currentTimeMillis()); - log.debug("Properties for node: " + nextNodeId); - for (Entry<Object,Object> entry : nodeProps.entrySet()) { - log.debug(" " + entry.getKey() + ": " + entry.getValue()); + + String curNodeId = initNodeId; + int numHops = 0; + long startTime = System.currentTimeMillis() / 1000; + while (true) { + // check if END state was reached + if (curNodeId.equalsIgnoreCase("END")) { + log.debug("reached END state"); + break; } - log.debug("Overall Configuration Properties"); - for (Entry<Object,Object> entry : env.copyConfigProperties().entrySet()) { - log.debug(" " + entry.getKey() + ": " + entry.getValue()); + // check if maxSec was reached + long curTime = System.currentTimeMillis() / 1000; + if ((curTime - startTime) > maxSec) { + log.debug("reached maxSec(" + maxSec + ")"); + break; } - log.debug("State information"); - for (String key : new TreeSet<String>(state.getMap().keySet())) { - Object value = state.getMap().get(key); - String logMsg = " " + key + ": "; - if (value == null) - logMsg += "null"; - else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number) - logMsg += value; - else if (value instanceof byte[]) - logMsg += new String((byte[])value, StandardCharsets.UTF_8); - else if (value instanceof PasswordToken) - logMsg += new String(((PasswordToken) value).getPassword(), StandardCharsets.UTF_8); - else - logMsg += value.getClass()+ " - " + value; - - log.debug(logMsg); + + // The number of seconds before the test should exit + long secondsRemaining = maxSec - (curTime - startTime); + + // check if maxHops was reached + if (numHops > maxHops) { + log.debug("reached maxHops(" + maxHops + ")"); + break; } - throw new Exception("Error running node " + nextNodeId, e); - } - - // update aliases - if ((aliases = aliasMap.get(curNodeId)) != null) - for (String alias : aliases) { - ((Alias) nodes.get(alias)).update(curNodeId); + numHops++; + + if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) { + throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")"); + } + AdjList adj = adjMap.get(curNodeId); + String nextNodeId = adj.randomNeighbor(); + final Node nextNode; + Node nextNodeOrAlias = getNode(nextNodeId); + if (nextNodeOrAlias instanceof Alias) { + nextNodeId = ((Alias) nextNodeOrAlias).getTargetId(); + nextNode = ((Alias) nextNodeOrAlias).get(); + } else { + nextNode = nextNodeOrAlias; + } + final Properties nodeProps = getProps(nextNodeId); + try { + test = false; + if (nextNode instanceof Test) { + startTimer(nextNode); + test = true; + } + + // Wrap the visit of the next node in the module in a callable that returns a thrown exception + FutureTask<Exception> task = new FutureTask<Exception>(new Callable<Exception>() { + + @Override + public Exception call() throws Exception { + try { - nextNode.visit(state, nodeProps); ++ nextNode.visit(state, env, nodeProps); + return null; + } catch (Exception e) { + return e; + } + } + + }); + + // Run the task (should execute immediately) + service.submit(task); + + Exception nodeException; + try { + // Bound the time we'll wait for the node to complete + nodeException = task.get(secondsRemaining, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName() + " to complete. Exiting.", e); + break; + } catch (ExecutionException e) { + log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e); + throw e; + } catch (TimeoutException e) { + log.info("Timed out waiting for " + nextNode.getClass().getSimpleName() + " to complete (waited " + secondsRemaining + " seconds). Exiting.", e); + break; + } + + // The RandomWalk node throw an Exception that that Callable handed back + // Throw it and let the Module perform cleanup + if (null != nodeException) { + throw nodeException; + } + + if (test) + stopTimer(nextNode); + } catch (Exception e) { - log.debug("Connector belongs to user: " + state.getConnector().whoami()); ++ log.debug("Connector belongs to user: " + env.getConnector().whoami()); + log.debug("Exception occured at: " + System.currentTimeMillis()); + log.debug("Properties for node: " + nextNodeId); + for (Entry<Object,Object> entry : nodeProps.entrySet()) { + log.debug(" " + entry.getKey() + ": " + entry.getValue()); + } - log.debug("Overall Properties"); - for (Entry<Object,Object> entry : state.getProperties().entrySet()) { ++ log.debug("Overall Configuration Properties"); ++ for (Entry<Object,Object> entry : env.copyConfigProperties().entrySet()) { + log.debug(" " + entry.getKey() + ": " + entry.getValue()); + } + log.debug("State information"); + for (String key : new TreeSet<String>(state.getMap().keySet())) { + Object value = state.getMap().get(key); + String logMsg = " " + key + ": "; + if (value == null) + logMsg += "null"; + else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number) + logMsg += value; + else if (value instanceof byte[]) - logMsg += new String((byte[]) value, Constants.UTF8); ++ logMsg += new String((byte[]) value, StandardCharsets.UTF_8); + else if (value instanceof PasswordToken) - logMsg += new String(((PasswordToken) value).getPassword(), Constants.UTF8); ++ logMsg += new String(((PasswordToken) value).getPassword(), StandardCharsets.UTF_8); + else + logMsg += value.getClass() + " - " + value; + + log.debug(logMsg); + } + throw new Exception("Error running node " + nextNodeId, e); } - - curNodeId = nextNodeId; - state.visitedNode(); + + // update aliases + if ((aliases = aliasMap.get(curNodeId)) != null) + for (String alias : aliases) { + ((Alias) nodes.get(alias)).update(curNodeId); + } + + curNodeId = nextNodeId; + } + } finally { + if (null != service) { + service.shutdownNow(); + } } - + if (teardown && (fixture != null)) { log.debug("tearing down module"); - fixture.tearDown(state); + fixture.tearDown(state, env); } } - + Thread timer = null; final int time = 5 * 1000 * 60; AtomicBoolean runningLong = new AtomicBoolean(false); http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java index 0000000,aa44741..33d7701 mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java @@@ -1,0 -1,84 +1,85 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.test.randomwalk.bulk; + + import java.util.Properties; + ++import org.apache.accumulo.test.randomwalk.Environment; + import org.apache.accumulo.test.randomwalk.State; + + /** + * If we have a sufficient back-up of imports, let them work off before adding even more bulk-imports. Imports of PlusOne must always be balanced with imports + * of MinusOne. + */ + public abstract class BulkImportTest extends BulkTest { + + public static final String SKIPPED_IMPORT = "skipped.import", TRUE = Boolean.TRUE.toString(), FALSE = Boolean.FALSE.toString(); + + @Override - public void visit(final State state, Properties props) throws Exception { ++ public void visit(final State state, Environment env, Properties props) throws Exception { + /** + * Each visit() is performed sequentially and then submitted to the threadpool which will have async execution. As long as we're checking the state and + * making decisions about what to do before we submit something to the thread pool, we're fine. + */ + + String lastImportSkipped = state.getString(SKIPPED_IMPORT); + // We have a marker in the state for the previous insert, we have to balance skipping BulkPlusOne + // with skipping the new BulkMinusOne to make sure that we maintain consistency + if (null != lastImportSkipped) { + if (!getClass().equals(BulkMinusOne.class)) { + throw new IllegalStateException("Should not have a skipped import marker for a class other than " + BulkMinusOne.class.getName() + " but was " + + getClass().getName()); + } + + if (TRUE.equals(lastImportSkipped)) { + log.debug("Last import was skipped, skipping this import to ensure consistency"); + state.remove(SKIPPED_IMPORT); + + // Wait 30s to balance the skip of a BulkPlusOne/BulkMinusOne pair + log.debug("Waiting 30s before continuing"); + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) {} + + return; + } else { + // last import was not skipped, remove the marker + state.remove(SKIPPED_IMPORT); + } + } + - if (shouldQueueMoreImports(state)) { - super.visit(state, props); ++ if (shouldQueueMoreImports(state, env)) { ++ super.visit(state, env, props); + } else { + log.debug("Not queuing more imports this round because too many are already queued"); + state.set(SKIPPED_IMPORT, TRUE); + // Don't sleep here, let the sleep happen when we skip the next BulkMinusOne + } + } + - private boolean shouldQueueMoreImports(State state) throws Exception { ++ private boolean shouldQueueMoreImports(State state, Environment env) throws Exception { + // Only selectively import when it's BulkPlusOne. If we did a BulkPlusOne, + // we must also do a BulkMinusOne to keep the table consistent + if (getClass().equals(BulkPlusOne.class)) { + // Only queue up more imports if the number of queued tasks already + // exceeds the number of tservers by 50x - return SelectiveQueueing.shouldQueueOperation(state); ++ return SelectiveQueueing.shouldQueueOperation(state, env); + } + + return true; + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java index f6f2e87,1704e49..6338d29 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java @@@ -16,20 -16,18 +16,20 @@@ */ package org.apache.accumulo.test.randomwalk.bulk; -import org.apache.accumulo.core.Constants; +import java.nio.charset.StandardCharsets; + import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.test.randomwalk.Environment; import org.apache.accumulo.test.randomwalk.State; - public class BulkMinusOne extends BulkTest { - + public class BulkMinusOne extends BulkImportTest { + - private static final Value negOne = new Value("-1".getBytes(Constants.UTF8)); + private static final Value negOne = new Value("-1".getBytes(StandardCharsets.UTF_8)); - + @Override - protected void runLater(State state) throws Exception { + protected void runLater(State state, Environment env) throws Exception { log.info("Decrementing"); - BulkPlusOne.bulkLoadLots(log, state, negOne); + BulkPlusOne.bulkLoadLots(log, state, env, negOne); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java index d605e8e,6d56f13..c54a8e7 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java @@@ -54,10 -53,10 +54,10 @@@ public class BulkPlusOne extends BulkIm } public static final Text MARKER_CF = new Text("marker"); static final AtomicLong counter = new AtomicLong(); - + private static final Value ONE = new Value("1".getBytes()); - static void bulkLoadLots(Logger log, State state, Value value) throws Exception { + static void bulkLoadLots(Logger log, State state, Environment env, Value value) throws Exception { final Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString()); final Path fail = new Path(dir.toString() + "_fail"); final DefaultConfiguration defaultConfiguration = AccumuloConfiguration.getDefaultConfiguration(); @@@ -106,11 -105,11 +106,11 @@@ fs.delete(fail, true); log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); } - + @Override - protected void runLater(State state) throws Exception { + protected void runLater(State state, Environment env) throws Exception { log.info("Incrementing"); - bulkLoadLots(log, state, ONE); + bulkLoadLots(log, state, env, ONE); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java index 6c0c68e,b24f61a..07d1f4c --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java @@@ -23,9 -22,9 +23,9 @@@ import org.apache.accumulo.test.randomw import org.apache.accumulo.test.randomwalk.Test; public abstract class BulkTest extends Test { - + @Override - public void visit(final State state, Properties props) throws Exception { + public void visit(final State state, final Environment env, Properties props) throws Exception { Setup.run(state, new Runnable() { @Override public void run() { @@@ -35,10 -34,10 +35,10 @@@ log.error(ex, ex); } } - + }); } - + - abstract protected void runLater(State state) throws Exception; + abstract protected void runLater(State state, Environment env) throws Exception; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java index 7561709,8b17256..c526ffa --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java @@@ -20,15 -19,15 +20,15 @@@ import org.apache.accumulo.test.randomw import org.apache.accumulo.test.randomwalk.State; import org.apache.hadoop.io.Text; - public class Compact extends BulkTest { - + public class Compact extends SelectiveBulkTest { + @Override - protected void runLater(State state) throws Exception { + protected void runLater(State state, Environment env) throws Exception { final Text[] points = Merge.getRandomTabletRange(state); final String rangeString = Merge.rangeToString(points); log.info("Compacting " + rangeString); - state.getConnector().tableOperations().compact(Setup.getTableName(), points[0], points[1], false, true); + env.getConnector().tableOperations().compact(Setup.getTableName(), points[0], points[1], false, true); log.info("Compaction " + rangeString + " finished"); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java index d1ff8cb,7e528a7..39ef3d8 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java @@@ -29,10 -28,10 +29,10 @@@ import org.apache.accumulo.test.randomw import org.apache.accumulo.test.randomwalk.State; import org.apache.hadoop.io.Text; - public class ConsistencyCheck extends BulkTest { - + public class ConsistencyCheck extends SelectiveBulkTest { + @Override - protected void runLater(State state) throws Exception { + protected void runLater(State state, Environment env) throws Exception { Random rand = (Random) state.get("rand"); Text row = Merge.getRandomRow(rand); log.info("Checking " + row); http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java index 04af6c5,9d66e0c..8508242 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java @@@ -23,16 -22,16 +23,16 @@@ import org.apache.accumulo.test.randomw import org.apache.accumulo.test.randomwalk.State; import org.apache.hadoop.io.Text; - public class Merge extends BulkTest { - + public class Merge extends SelectiveBulkTest { + @Override - protected void runLater(State state) throws Exception { + protected void runLater(State state, Environment env) throws Exception { Text[] points = getRandomTabletRange(state); log.info("merging " + rangeToString(points)); - state.getConnector().tableOperations().merge(Setup.getTableName(), points[0], points[1]); + env.getConnector().tableOperations().merge(Setup.getTableName(), points[0], points[1]); log.info("merging " + rangeToString(points) + " complete"); } - + public static String rangeToString(Text[] points) { return "(" + (points[0] == null ? "-inf" : points[0]) + " -> " + (points[1] == null ? "+inf" : points[1]) + "]"; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java index 0000000,ca66775..48bf86a mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java @@@ -1,0 -1,41 +1,42 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.test.randomwalk.bulk; + + import java.util.Properties; + ++import org.apache.accumulo.test.randomwalk.Environment; + import org.apache.accumulo.test.randomwalk.State; + + /** + * Selectively runs the actual {@link BulkTest} based on the number of active TServers and the number of queued operations. + */ + public abstract class SelectiveBulkTest extends BulkTest { + + @Override - public void visit(State state, Properties props) throws Exception { - if (SelectiveQueueing.shouldQueueOperation(state)) { - super.visit(state, props); ++ public void visit(State state, Environment env, Properties props) throws Exception { ++ if (SelectiveQueueing.shouldQueueOperation(state, env)) { ++ super.visit(state, env, props); + } else { + log.debug("Skipping queueing of " + getClass().getSimpleName() + " because of excessive queued tasks already"); + log.debug("Waiting 30 seconds before continuing"); + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) {} + } + } + + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java index 0000000,e6dde4d..c7f490c mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java @@@ -1,0 -1,48 +1,49 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.test.randomwalk.bulk; + + import java.util.concurrent.ThreadPoolExecutor; + + import org.apache.accumulo.core.client.Connector; ++import org.apache.accumulo.test.randomwalk.Environment; + import org.apache.accumulo.test.randomwalk.State; + import org.apache.log4j.Logger; + + /** + * Chooses whether or not an operation should be queued based on the current thread pool queue length and the number of available TServers. + */ + public class SelectiveQueueing { + private static final Logger log = Logger.getLogger(SelectiveQueueing.class); + - public static boolean shouldQueueOperation(State state) throws Exception { ++ public static boolean shouldQueueOperation(State state, Environment env) throws Exception { + final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool"); + long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount(); - final Connector conn = state.getConnector(); ++ final Connector conn = env.getConnector(); + int numTservers = conn.instanceOperations().getTabletServers().size(); + + if (!shouldQueue(queuedThreads, numTservers)) { + log.info("Not queueing because of " + queuedThreads + " outstanding tasks"); + return false; + } + + return true; + } + + private static boolean shouldQueue(long queuedThreads, int numTservers) { + return queuedThreads < numTservers * 50; + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java index 21b453d,7a93321..b69805d --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java @@@ -24,18 -23,18 +24,18 @@@ import org.apache.accumulo.test.randomw import org.apache.accumulo.test.randomwalk.State; import org.apache.hadoop.io.Text; - public class Split extends BulkTest { - + public class Split extends SelectiveBulkTest { + @Override - protected void runLater(State state) throws Exception { + protected void runLater(State state, Environment env) throws Exception { SortedSet<Text> splits = new TreeSet<Text>(); Random rand = (Random) state.get("rand"); int count = rand.nextInt(20); for (int i = 0; i < count; i++) splits.add(new Text(String.format(BulkPlusOne.FMT, (rand.nextLong() & 0x7fffffffffffffffl) % BulkPlusOne.LOTS))); log.info("splitting " + splits); - state.getConnector().tableOperations().addSplits(Setup.getTableName(), splits); + env.getConnector().tableOperations().addSplits(Setup.getTableName(), splits); log.info("split for " + splits + " finished"); } - + }