ACCUMULO-3259 Be a little smarter in Bulk.xml to let the backlog of tasks work itself down before adding more.
If the queue size on the threadpool is greater than 50 times the number of tservers, wait 30 seconds before adding more tasks. This should still ensure that we can keep Accumulo sufficiently busy without creating an absurd number of tasks that the test client will never reasonably work through. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1477c130 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1477c130 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1477c130 Branch: refs/heads/1.6 Commit: 1477c130143bd2477fb8e29bc22ba4698aae3599 Parents: c4f3fc8 Author: Josh Elser <els...@apache.org> Authored: Fri Oct 24 15:01:22 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Fri Oct 24 15:24:47 2014 -0400 ---------------------------------------------------------------------- .../test/randomwalk/bulk/BulkImportTest.java | 84 ++++++++++++++++++++ .../test/randomwalk/bulk/BulkMinusOne.java | 8 +- .../test/randomwalk/bulk/BulkPlusOne.java | 20 ++--- .../accumulo/test/randomwalk/bulk/BulkTest.java | 8 +- .../accumulo/test/randomwalk/bulk/Compact.java | 6 +- .../test/randomwalk/bulk/ConsistencyCheck.java | 6 +- .../accumulo/test/randomwalk/bulk/Merge.java | 12 +-- .../test/randomwalk/bulk/SelectiveBulkTest.java | 41 ++++++++++ .../test/randomwalk/bulk/SelectiveQueueing.java | 48 +++++++++++ .../accumulo/test/randomwalk/bulk/Split.java | 6 +- 10 files changed, 206 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java new file mode 100644 index 0000000..aa44741 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.randomwalk.bulk; + +import java.util.Properties; + +import org.apache.accumulo.test.randomwalk.State; + +/** + * If we have a sufficient back-up of imports, let them work off before adding even more bulk-imports. Imports of PlusOne must always be balanced with imports + * of MinusOne. + */ +public abstract class BulkImportTest extends BulkTest { + + public static final String SKIPPED_IMPORT = "skipped.import", TRUE = Boolean.TRUE.toString(), FALSE = Boolean.FALSE.toString(); + + @Override + public void visit(final State state, Properties props) throws Exception { + /** + * Each visit() is performed sequentially and then submitted to the threadpool which will have async execution. As long as we're checking the state and + * making decisions about what to do before we submit something to the thread pool, we're fine. + */ + + String lastImportSkipped = state.getString(SKIPPED_IMPORT); + // We have a marker in the state for the previous insert, we have to balance skipping BulkPlusOne + // with skipping the new BulkMinusOne to make sure that we maintain consistency + if (null != lastImportSkipped) { + if (!getClass().equals(BulkMinusOne.class)) { + throw new IllegalStateException("Should not have a skipped import marker for a class other than " + BulkMinusOne.class.getName() + " but was " + + getClass().getName()); + } + + if (TRUE.equals(lastImportSkipped)) { + log.debug("Last import was skipped, skipping this import to ensure consistency"); + state.remove(SKIPPED_IMPORT); + + // Wait 30s to balance the skip of a BulkPlusOne/BulkMinusOne pair + log.debug("Waiting 30s before continuing"); + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) {} + + return; + } else { + // last import was not skipped, remove the marker + state.remove(SKIPPED_IMPORT); + } + } + + if (shouldQueueMoreImports(state)) { + super.visit(state, props); + } else { + log.debug("Not queuing more imports this round because too many are already queued"); + state.set(SKIPPED_IMPORT, TRUE); + // Don't sleep here, let the sleep happen when we skip the next BulkMinusOne + } + } + + private boolean shouldQueueMoreImports(State state) throws Exception { + // Only selectively import when it's BulkPlusOne. If we did a BulkPlusOne, + // we must also do a BulkMinusOne to keep the table consistent + if (getClass().equals(BulkPlusOne.class)) { + // Only queue up more imports if the number of queued tasks already + // exceeds the number of tservers by 50x + return SelectiveQueueing.shouldQueueOperation(state); + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java index 4ebf23f..1704e49 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java @@ -20,14 +20,14 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.test.randomwalk.State; -public class BulkMinusOne extends BulkTest { - +public class BulkMinusOne extends BulkImportTest { + private static final Value negOne = new Value("-1".getBytes(Constants.UTF8)); - + @Override protected void runLater(State state) throws Exception { log.info("Decrementing"); BulkPlusOne.bulkLoadLots(log, state, negOne); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java index cdfbb36..6d56f13 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java @@ -38,8 +38,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; -public class BulkPlusOne extends BulkTest { - +public class BulkPlusOne extends BulkImportTest { + public static final int LOTS = 100000; public static final int COLS = 10; public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) / Math.log(16)); @@ -53,7 +53,7 @@ public class BulkPlusOne extends BulkTest { } public static final Text MARKER_CF = new Text("marker"); static final AtomicLong counter = new AtomicLong(); - + private static final Value ONE = new Value("1".getBytes()); static void bulkLoadLots(Logger log, State state, Value value) throws Exception { @@ -64,22 +64,22 @@ public class BulkPlusOne extends BulkTest { final FileSystem fs = (FileSystem) state.get("fs"); fs.mkdirs(fail); final int parts = rand.nextInt(10) + 1; - + TreeSet<Integer> startRows = new TreeSet<Integer>(); startRows.add(0); while (startRows.size() < parts) startRows.add(rand.nextInt(LOTS)); - + List<String> printRows = new ArrayList<String>(startRows.size()); for (Integer row : startRows) printRows.add(String.format(FMT, row)); - + String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); - + List<Integer> rows = new ArrayList<Integer>(startRows); rows.add(LOTS); - + for (int i = 0; i < parts; i++) { String fileName = dir + "/" + String.format("part_%d.", i) + RFile.EXTENSION; FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), defaultConfiguration); @@ -105,11 +105,11 @@ public class BulkPlusOne extends BulkTest { fs.delete(fail, true); log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); } - + @Override protected void runLater(State state) throws Exception { log.info("Incrementing"); bulkLoadLots(log, state, ONE); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java index 4afefd9..b24f61a 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java @@ -22,7 +22,7 @@ import org.apache.accumulo.test.randomwalk.State; import org.apache.accumulo.test.randomwalk.Test; public abstract class BulkTest extends Test { - + @Override public void visit(final State state, Properties props) throws Exception { Setup.run(state, new Runnable() { @@ -34,10 +34,10 @@ public abstract class BulkTest extends Test { log.error(ex, ex); } } - + }); } - + abstract protected void runLater(State state) throws Exception; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java index 86dae5c..8b17256 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java @@ -19,8 +19,8 @@ package org.apache.accumulo.test.randomwalk.bulk; import org.apache.accumulo.test.randomwalk.State; import org.apache.hadoop.io.Text; -public class Compact extends BulkTest { - +public class Compact extends SelectiveBulkTest { + @Override protected void runLater(State state) throws Exception { final Text[] points = Merge.getRandomTabletRange(state); @@ -29,5 +29,5 @@ public class Compact extends BulkTest { state.getConnector().tableOperations().compact(Setup.getTableName(), points[0], points[1], false, true); log.info("Compaction " + rangeString + " finished"); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java index e60f8cf..7e528a7 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java @@ -28,8 +28,8 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.test.randomwalk.State; import org.apache.hadoop.io.Text; -public class ConsistencyCheck extends BulkTest { - +public class ConsistencyCheck extends SelectiveBulkTest { + @Override protected void runLater(State state) throws Exception { Random rand = (Random) state.get("rand"); @@ -52,5 +52,5 @@ public class ConsistencyCheck extends BulkTest { throw new RuntimeException("Inconsistent value at " + entry.getKey() + " was " + entry.getValue() + " should be " + v + " first read at " + first); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java index 2dd0345..9d66e0c 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java @@ -22,8 +22,8 @@ import java.util.Random; import org.apache.accumulo.test.randomwalk.State; import org.apache.hadoop.io.Text; -public class Merge extends BulkTest { - +public class Merge extends SelectiveBulkTest { + @Override protected void runLater(State state) throws Exception { Text[] points = getRandomTabletRange(state); @@ -31,15 +31,15 @@ public class Merge extends BulkTest { state.getConnector().tableOperations().merge(Setup.getTableName(), points[0], points[1]); log.info("merging " + rangeToString(points) + " complete"); } - + public static String rangeToString(Text[] points) { return "(" + (points[0] == null ? "-inf" : points[0]) + " -> " + (points[1] == null ? "+inf" : points[1]) + "]"; } - + public static Text getRandomRow(Random rand) { return new Text(String.format(BulkPlusOne.FMT, (rand.nextLong() & 0x7fffffffffffffffl) % BulkPlusOne.LOTS)); } - + public static Text[] getRandomTabletRange(State state) { Random rand = (Random) state.get("rand"); Text points[] = {getRandomRow(rand), getRandomRow(rand),}; @@ -56,5 +56,5 @@ public class Merge extends BulkTest { } return points; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java new file mode 100644 index 0000000..ca66775 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.randomwalk.bulk; + +import java.util.Properties; + +import org.apache.accumulo.test.randomwalk.State; + +/** + * Selectively runs the actual {@link BulkTest} based on the number of active TServers and the number of queued operations. + */ +public abstract class SelectiveBulkTest extends BulkTest { + + @Override + public void visit(State state, Properties props) throws Exception { + if (SelectiveQueueing.shouldQueueOperation(state)) { + super.visit(state, props); + } else { + log.debug("Skipping queueing of " + getClass().getSimpleName() + " because of excessive queued tasks already"); + log.debug("Waiting 30 seconds before continuing"); + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) {} + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java new file mode 100644 index 0000000..e6dde4d --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.randomwalk.bulk; + +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.test.randomwalk.State; +import org.apache.log4j.Logger; + +/** + * Chooses whether or not an operation should be queued based on the current thread pool queue length and the number of available TServers. + */ +public class SelectiveQueueing { + private static final Logger log = Logger.getLogger(SelectiveQueueing.class); + + public static boolean shouldQueueOperation(State state) throws Exception { + final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool"); + long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount(); + final Connector conn = state.getConnector(); + int numTservers = conn.instanceOperations().getTabletServers().size(); + + if (!shouldQueue(queuedThreads, numTservers)) { + log.info("Not queueing because of " + queuedThreads + " outstanding tasks"); + return false; + } + + return true; + } + + private static boolean shouldQueue(long queuedThreads, int numTservers) { + return queuedThreads < numTservers * 50; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java index 157e2ab..7a93321 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java @@ -23,8 +23,8 @@ import java.util.TreeSet; import org.apache.accumulo.test.randomwalk.State; import org.apache.hadoop.io.Text; -public class Split extends BulkTest { - +public class Split extends SelectiveBulkTest { + @Override protected void runLater(State state) throws Exception { SortedSet<Text> splits = new TreeSet<Text>(); @@ -36,5 +36,5 @@ public class Split extends BulkTest { state.getConnector().tableOperations().addSplits(Setup.getTableName(), splits); log.info("split for " + splits + " finished"); } - + }