Repository: accumulo Updated Branches: refs/heads/1.6 b033b04fd -> f446b9007
ACCUMULO-1755: Removed synchronization of binning mutations in TabletServerBatchWriter The TabletServerBatchWriter will attempt to bin mutations in a background thread. If that thread is busy then the binning will occur in the client thread. Previously, if binning were to occur in one client thread, it would block all client threads from adding mutations. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f446b900 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f446b900 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f446b900 Branch: refs/heads/1.6 Commit: f446b9007ce8a4f0820e89c9e8e41a866ee8d548 Parents: b033b04 Author: Dave Marion <dlmar...@apache.org> Authored: Wed Mar 2 15:08:40 2016 -0500 Committer: Dave Marion <dlmar...@apache.org> Committed: Wed Mar 2 15:08:40 2016 -0500 ---------------------------------------------------------------------- .../client/impl/TabletServerBatchWriter.java | 135 +++++++++++++------ .../accumulo/core/util/SimpleThreadPool.java | 6 + .../test/functional/BatchWriterFlushIT.java | 88 +++++++++++- 3 files changed, 185 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f446b900/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java index 404b494..491bcc1 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java @@ -32,7 +32,10 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.AccumuloException; @@ -137,13 +140,13 @@ public class TabletServerBatchWriter { private long initialCompileTimes; private double initialSystemLoad; - private int tabletServersBatchSum = 0; - private int tabletBatchSum = 0; - private int numBatches = 0; - private int maxTabletBatch = Integer.MIN_VALUE; - private int minTabletBatch = Integer.MAX_VALUE; - private int minTabletServersBatch = Integer.MAX_VALUE; - private int maxTabletServersBatch = Integer.MIN_VALUE; + private AtomicInteger tabletServersBatchSum = new AtomicInteger(0); + private AtomicInteger tabletBatchSum = new AtomicInteger(0); + private AtomicInteger numBatches = new AtomicInteger(0); + private AtomicInteger maxTabletBatch = new AtomicInteger(Integer.MIN_VALUE); + private AtomicInteger minTabletBatch = new AtomicInteger(Integer.MAX_VALUE); + private AtomicInteger minTabletServersBatch = new AtomicInteger(Integer.MAX_VALUE); + private AtomicInteger maxTabletServersBatch = new AtomicInteger(Integer.MIN_VALUE); private Throwable lastUnknownError = null; @@ -230,7 +233,12 @@ public class TabletServerBatchWriter { if (mutations.getMemoryUsed() == 0) return; lastProcessingStartTime = System.currentTimeMillis(); - writer.addMutations(mutations); + try { + writer.queueMutations(mutations); + } catch (InterruptedException e) { + log.warn("Mutations rejected from binning thread, retrying..."); + failedMutations.add(mutations); + } mutations = new MutationSet(); } @@ -354,6 +362,7 @@ public class TabletServerBatchWriter { checkForFailures(); } finally { // make a best effort to release these resources + writer.binningThreadPool.shutdownNow(); writer.sendThreadPool.shutdownNow(); jtimer.cancel(); span.stop(); @@ -361,26 +370,26 @@ public class TabletServerBatchWriter { } private void logStats() { - long finishTime = System.currentTimeMillis(); + if (log.isTraceEnabled()) { + long finishTime = System.currentTimeMillis(); - long finalGCTimes = 0; - List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); - for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) { - finalGCTimes += garbageCollectorMXBean.getCollectionTime(); - } + long finalGCTimes = 0; + List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) { + finalGCTimes += garbageCollectorMXBean.getCollectionTime(); + } - CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean(); - long finalCompileTimes = 0; - if (compMxBean.isCompilationTimeMonitoringSupported()) { - finalCompileTimes = compMxBean.getTotalCompilationTime(); - } + CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean(); + long finalCompileTimes = 0; + if (compMxBean.isCompilationTimeMonitoringSupported()) { + finalCompileTimes = compMxBean.getTotalCompilationTime(); + } - double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0); - double overallRate = totalAdded / ((finishTime - startTime) / 1000.0); + double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0); + double overallRate = totalAdded / ((finishTime - startTime) / 1000.0); - double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); + double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); - if (log.isTraceEnabled()) { log.trace(""); log.trace("TABLET SERVER BATCH WRITER STATISTICS"); log.trace(String.format("Added : %,10d mutations", totalAdded)); @@ -397,9 +406,10 @@ public class TabletServerBatchWriter { log.trace(String.format("Total bin time : %,10.2f secs %6.2f%s", totalBinTime.get() / 1000.0, 100.0 * totalBinTime.get() / (finishTime - startTime), "%")); log.trace(String.format("Average bin rate : %,10.2f mutations/sec", totalBinned.get() / (totalBinTime.get() / 1000.0))); - log.trace(String.format("tservers per batch : %,8.2f avg %,6d min %,6d max", tabletServersBatchSum / (double) numBatches, minTabletServersBatch, - maxTabletServersBatch)); - log.trace(String.format("tablets per batch : %,8.2f avg %,6d min %,6d max", tabletBatchSum / (double) numBatches, minTabletBatch, maxTabletBatch)); + log.trace(String.format("tservers per batch : %,8.2f avg %,6d min %,6d max", (float) (tabletServersBatchSum.get() / numBatches.get()), + minTabletServersBatch.get(), maxTabletServersBatch.get())); + log.trace(String.format("tablets per batch : %,8.2f avg %,6d min %,6d max", (float) (tabletBatchSum.get() / numBatches.get()), minTabletBatch.get(), + maxTabletBatch.get())); log.trace(""); log.trace("SYSTEM STATISTICS"); log.trace(String.format("JVM GC Time : %,10.2f secs", ((finalGCTimes - initialGCTimes) / 1000.0))); @@ -416,16 +426,32 @@ public class TabletServerBatchWriter { } public void updateBinningStats(int count, long time, Map<String,TabletServerMutations<Mutation>> binnedMutations) { - totalBinTime.addAndGet(time); - totalBinned.addAndGet(count); - updateBatchStats(binnedMutations); + if (log.isTraceEnabled()) { + totalBinTime.addAndGet(time); + totalBinned.addAndGet(count); + updateBatchStats(binnedMutations); + } } - private synchronized void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) { - tabletServersBatchSum += binnedMutations.size(); + private static void computeMin(AtomicInteger stat, int update) { + int old = stat.get(); + while (!stat.compareAndSet(old, Math.min(old, update))) { + old = stat.get(); + } + } + + private static void computeMax(AtomicInteger stat, int update) { + int old = stat.get(); + while (!stat.compareAndSet(old, Math.max(old, update))) { + old = stat.get(); + } + } - minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size()); - maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size()); + private void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) { + tabletServersBatchSum.addAndGet(binnedMutations.size()); + + computeMin(minTabletServersBatch, binnedMutations.size()); + computeMax(maxTabletServersBatch, binnedMutations.size()); int numTablets = 0; @@ -434,12 +460,12 @@ public class TabletServerBatchWriter { numTablets += tsm.getMutations().size(); } - tabletBatchSum += numTablets; + tabletBatchSum.addAndGet(numTablets); - minTabletBatch = Math.min(minTabletBatch, numTablets); - maxTabletBatch = Math.max(maxTabletBatch, numTablets); + computeMin(minTabletBatch, numTablets); + computeMax(maxTabletBatch, numTablets); - numBatches++; + numBatches.incrementAndGet(); } private void waitRTE() { @@ -616,19 +642,22 @@ public class TabletServerBatchWriter { private class MutationWriter { private static final int MUTATION_BATCH_SIZE = 1 << 17; - private ExecutorService sendThreadPool; - private Map<String,TabletServerMutations<Mutation>> serversMutations; - private Set<String> queued; - private Map<String,TabletLocator> locators; + private final ExecutorService sendThreadPool; + private final SimpleThreadPool binningThreadPool; + private final Map<String,TabletServerMutations<Mutation>> serversMutations; + private final Set<String> queued; + private final Map<String,TabletLocator> locators; public MutationWriter(int numSendThreads) { serversMutations = new HashMap<String,TabletServerMutations<Mutation>>(); queued = new HashSet<String>(); sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); locators = new HashMap<String,TabletLocator>(); + binningThreadPool = new SimpleThreadPool(1, "BinMutations", new SynchronousQueue<Runnable>()); + binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } - private TabletLocator getLocator(String tableId) { + private synchronized TabletLocator getLocator(String tableId) { TabletLocator ret = locators.get(tableId); if (ret == null) { ret = TabletLocator.getLocator(instance, new Text(tableId)); @@ -686,7 +715,27 @@ public class TabletServerBatchWriter { } - void addMutations(MutationSet mutationsToSend) { + void queueMutations(final MutationSet mutationsToSend) throws InterruptedException { + if (null == mutationsToSend) + return; + binningThreadPool.execute(new Runnable() { + + @Override + public void run() { + if (null != mutationsToSend) { + try { + if (log.isTraceEnabled()) + log.trace(Thread.currentThread().getName() + " - binning " + mutationsToSend.size() + " mutations"); + addMutations(mutationsToSend); + } catch (Exception e) { + updateUnknownErrors("Error processing mutation set", e); + } + } + } + }); + } + + private void addMutations(MutationSet mutationsToSend) { Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>(); Span span = Trace.start("binMutations"); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/f446b900/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java b/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java index a406233..8991991 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java +++ b/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.core.util; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -30,4 +31,9 @@ public class SimpleThreadPool extends ThreadPoolExecutor { allowCoreThreadTimeOut(true); } + public SimpleThreadPool(int max, final String name, BlockingQueue<Runnable> queue) { + super(max, max, 4l, TimeUnit.SECONDS, queue, new NamingThreadFactory(name)); + allowCoreThreadTimeOut(true); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f446b900/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java index 52d9c93..e2277a3 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java @@ -18,9 +18,16 @@ package org.apache.accumulo.test.functional; import static com.google.common.base.Charsets.UTF_8; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; @@ -36,14 +43,17 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterIT; import org.apache.hadoop.io.Text; +import org.junit.Assert; import org.junit.Test; public class BatchWriterFlushIT extends AccumuloClusterIT { private static final int NUM_TO_FLUSH = 100000; + private static final int NUM_THREADS = 3; @Override protected int defaultTimeoutSeconds() { @@ -60,7 +70,6 @@ public class BatchWriterFlushIT extends AccumuloClusterIT { c.tableOperations().create(bwlt); runFlushTest(bwft); runLatencyTest(bwlt); - } private void runLatencyTest(String tableName) throws Exception { @@ -170,6 +179,83 @@ public class BatchWriterFlushIT extends AccumuloClusterIT { } } + @Test + public void runMultiThreadedBinningTest() throws Exception { + Connector c = getConnector(); + String[] tableNames = getUniqueNames(1); + String tableName = tableNames[0]; + c.tableOperations().create(tableName); + for (int x = 0; x < NUM_THREADS; x++) { + c.tableOperations().addSplits(tableName, new TreeSet<Text>(Collections.singleton(new Text(Integer.toString(x * NUM_TO_FLUSH))))); + } + + // Logger.getLogger(TabletServerBatchWriter.class).setLevel(Level.TRACE); + final List<Set<Mutation>> allMuts = new LinkedList<Set<Mutation>>(); + List<Mutation> data = new ArrayList<Mutation>(); + for (int i = 0; i < NUM_THREADS; i++) { + final int thread = i; + for (int j = 0; j < NUM_TO_FLUSH; j++) { + int row = thread * NUM_TO_FLUSH + j; + Mutation m = new Mutation(new Text(String.format("%10d", row))); + m.put(new Text("cf" + thread), new Text("cq"), new Value(("" + row).getBytes())); + data.add(m); + } + } + Assert.assertEquals(NUM_THREADS * NUM_TO_FLUSH, data.size()); + Collections.shuffle(data); + for (int n = 0; n < (NUM_THREADS * NUM_TO_FLUSH); n += NUM_TO_FLUSH) { + Set<Mutation> muts = new HashSet<Mutation>(data.subList(n, n + NUM_TO_FLUSH)); + allMuts.add(muts); + } + + SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, "ClientThreads"); + threads.allowCoreThreadTimeOut(false); + threads.prestartAllCoreThreads(); + + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setMaxLatency(10, TimeUnit.SECONDS); + cfg.setMaxMemory(1 * 1024 * 1024); + cfg.setMaxWriteThreads(NUM_THREADS); + final BatchWriter bw = getConnector().createBatchWriter(tableName, cfg); + + for (int k = 0; k < NUM_THREADS; k++) { + final int idx = k; + threads.execute(new Runnable() { + @Override + public void run() { + try { + bw.addMutations(allMuts.get(idx)); + bw.flush(); + } catch (MutationsRejectedException e) { + Assert.fail("Error adding mutations to batch writer"); + } + } + }); + } + threads.shutdown(); + threads.awaitTermination(3, TimeUnit.MINUTES); + bw.close(); + Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); + for (Entry<Key,Value> e : scanner) { + Mutation m = new Mutation(e.getKey().getRow()); + m.put(e.getKey().getColumnFamily(), e.getKey().getColumnQualifier(), e.getValue()); + boolean found = false; + for (int l = 0; l < NUM_THREADS; l++) { + if (allMuts.get(l).contains(m)) { + found = true; + allMuts.get(l).remove(m); + break; + } + } + Assert.assertTrue("Mutation not found: " + m.toString(), found); + } + + for (int m = 0; m < NUM_THREADS; m++) { + Assert.assertEquals(0, allMuts.get(m).size()); + } + + } + private void verifyEntry(int row, Entry<Key,Value> entry) throws Exception { if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", row))) { throw new Exception("Unexpected key returned, expected " + row + " got " + entry.getKey());