Updated Branches: refs/heads/master 9ac95f869 -> addacd0d6
ACCUMULO-1608 added stats collection for conditional writer Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/addacd0d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/addacd0d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/addacd0d Branch: refs/heads/master Commit: addacd0d64a11bf00e668024a08c84c791e90ff8 Parents: 9ac95f8 Author: Keith Turner <ktur...@apache.org> Authored: Mon Sep 30 11:53:36 2013 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Sep 30 11:53:36 2013 -0400 ---------------------------------------------------------------------- .../core/client/impl/ConditionalWriterImpl.java | 5 +- .../tabletserver/ConditionalMutationSet.java | 2 - .../server/tabletserver/TabletServer.java | 167 ++++++++++++------- 3 files changed, 110 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/addacd0d/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index d7eb144..bb5987d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -75,6 +75,7 @@ import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID; +import org.apache.accumulo.trace.instrument.Trace; import org.apache.accumulo.trace.instrument.Tracer; import org.apache.accumulo.trace.thrift.TInfo; import org.apache.commons.collections.map.LRUMap; @@ -306,7 +307,7 @@ class ConditionalWriterImpl implements ConditionalWriter { serverQueue.queue.add(mutations); // never execute more than one task per server if (!serverQueue.taskQueued) { - threadPool.execute(new LoggingRunnable(log, new SendTask(location))); + threadPool.execute(new LoggingRunnable(log, Trace.wrap(new SendTask(location)))); serverQueue.taskQueued = true; } } @@ -323,7 +324,7 @@ class ConditionalWriterImpl implements ConditionalWriter { synchronized (serverQueue) { if (serverQueue.queue.size() > 0) - threadPool.execute(new LoggingRunnable(log, task)); + threadPool.execute(new LoggingRunnable(log, Trace.wrap(task))); else serverQueue.taskQueued = false; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/addacd0d/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java index c25e729..ffa6b77 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java @@ -78,8 +78,6 @@ public class ConditionalMutationSet { static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) { for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) { - // TODO check if its already in sorted order? - // TODO maybe the potential benefit of sorting is not worth the cost Collections.sort(entry.getValue(), new Comparator<ServerConditionalMutation>() { @Override public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/addacd0d/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index 575d7e7..6ccd163 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -66,11 +66,11 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.CompressedIterators; import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig; -import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator; -import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator; import org.apache.accumulo.core.client.impl.ScannerImpl; import org.apache.accumulo.core.client.impl.TabletType; import org.apache.accumulo.core.client.impl.Translator; +import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator; +import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator; import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -1584,10 +1584,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } long pt2 = System.currentTimeMillis(); - long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size()); us.prepareTimes.addStat(pt2 - pt1); - if (updateMetrics.isEnabled()) - updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (avgPrepareTime)); + updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size()); if (error != null) { for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) { @@ -1606,9 +1604,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu long t2 = System.currentTimeMillis(); us.walogTimes.addStat(t2 - t1); - if (updateMetrics.isEnabled()) - updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1)); - + updateWalogWriteTime((t2 - t1)); break; } catch (IOException ex) { log.warn("logging mutations failed, retrying"); @@ -1645,14 +1641,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } } long t2 = System.currentTimeMillis(); - - long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size()); - + us.flushTime += (t2 - pt1); us.commitTimes.addStat(t2 - t1); - if (updateMetrics.isEnabled()) - updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime); + updateAvgCommitTime(t2 - t1, sendables.size()); } finally { commit.stop(); } @@ -1665,6 +1658,21 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } us.totalUpdates += mutationCount; } + + private void updateWalogWriteTime(long time) { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, time); + } + + private void updateAvgCommitTime(long time, int size) { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.commitTime, (long) ((time) / (double) size)); + } + + private void updateAvgPrepTime(long time, int size) { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (long) ((time) / (double) size)); + } @Override public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException { @@ -1850,61 +1858,86 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu boolean sessionCanceled = sess.interruptFlag.get(); - for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) { - Tablet tablet = onlineTablets.get(entry.getKey()); - if (tablet == null || tablet.isClosed() || sessionCanceled) { - for (ServerConditionalMutation scm : entry.getValue()) - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - } else { - try { - - @SuppressWarnings("unchecked") - List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue(); - if (mutations.size() > 0) { - - CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations); + Span prepSpan = Trace.start("prep"); + try { + long t1 = System.currentTimeMillis(); + for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) { + Tablet tablet = onlineTablets.get(entry.getKey()); + if (tablet == null || tablet.isClosed() || sessionCanceled) { + for (ServerConditionalMutation scm : entry.getValue()) + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + } else { + try { - if (cs == null) { - for (ServerConditionalMutation scm : entry.getValue()) - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - } else { - for (ServerConditionalMutation scm : entry.getValue()) - results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED)); - sendables.put(cs, mutations); + @SuppressWarnings("unchecked") + List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue(); + if (mutations.size() > 0) { + + CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations); + + if (cs == null) { + for (ServerConditionalMutation scm : entry.getValue()) + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + } else { + for (ServerConditionalMutation scm : entry.getValue()) + results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED)); + sendables.put(cs, mutations); + } } + } catch (TConstraintViolationException e) { + if (e.getNonViolators().size() > 0) { + sendables.put(e.getCommitSession(), e.getNonViolators()); + for (Mutation m : e.getNonViolators()) + results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED)); + } + + for (Mutation m : e.getViolators()) + results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED)); } - } catch (TConstraintViolationException e) { - if (e.getNonViolators().size() > 0) { - sendables.put(e.getCommitSession(), e.getNonViolators()); - for (Mutation m : e.getNonViolators()) - results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED)); - } - - for (Mutation m : e.getViolators()) - results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED)); } } + + long t2 = System.currentTimeMillis(); + updateAvgPrepTime(t2 - t1, es.size()); + } finally { + prepSpan.stop(); } - while (true && sendables.size() > 0) { - try { - logger.logManyTablets(sendables); - break; - } catch (IOException ex) { - log.warn("logging mutations failed, retrying"); - } catch (FSError ex) { // happens when DFS is localFS - log.warn("logging mutations failed, retrying"); - } catch (Throwable t) { - log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t); - throw new RuntimeException(t); + Span walSpan = Trace.start("wal"); + try { + while (true && sendables.size() > 0) { + try { + long t1 = System.currentTimeMillis(); + logger.logManyTablets(sendables); + long t2 = System.currentTimeMillis(); + updateWalogWriteTime(t2 - t1); + break; + } catch (IOException ex) { + log.warn("logging mutations failed, retrying"); + } catch (FSError ex) { // happens when DFS is localFS + log.warn("logging mutations failed, retrying"); + } catch (Throwable t) { + log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t); + throw new RuntimeException(t); + } } + } finally { + walSpan.stop(); } - for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) { - CommitSession commitSession = entry.getKey(); - List<Mutation> mutations = entry.getValue(); - - commitSession.commit(mutations); + Span commitSpan = Trace.start("commit"); + try { + long t1 = System.currentTimeMillis(); + for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) { + CommitSession commitSession = entry.getKey(); + List<Mutation> mutations = entry.getValue(); + + commitSession.commit(mutations); + } + long t2 = System.currentTimeMillis(); + updateAvgCommitTime(t2 - t1, sendables.size()); + } finally { + commitSpan.stop(); } } @@ -1922,8 +1955,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu // get as many locks as possible w/o blocking... defer any rows that are locked List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred); try { - checkConditions(updates, results, cs, symbols); - writeConditionalMutations(updates, results, cs); + Span checkSpan = Trace.start("Check conditions"); + try { + checkConditions(updates, results, cs, symbols); + } finally { + checkSpan.stop(); + } + + Span updateSpan = Trace.start("apply conditional mutations"); + try { + writeConditionalMutations(updates, results, cs); + } finally { + updateSpan.stop(); + } } finally { rowLocks.releaseRowLocks(locks); } @@ -1962,6 +2006,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu if (cs == null || cs.interruptFlag.get()) throw new NoSuchScanIDException(); + if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) + TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); + Text tid = new Text(cs.tableId); long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));