Updated Branches:
  refs/heads/master 9ac95f869 -> addacd0d6

ACCUMULO-1608 added stats collection for conditional writer


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/addacd0d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/addacd0d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/addacd0d

Branch: refs/heads/master
Commit: addacd0d64a11bf00e668024a08c84c791e90ff8
Parents: 9ac95f8
Author: Keith Turner <ktur...@apache.org>
Authored: Mon Sep 30 11:53:36 2013 -0400
Committer: Keith Turner <ktur...@apache.org>
Committed: Mon Sep 30 11:53:36 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/ConditionalWriterImpl.java |   5 +-
 .../tabletserver/ConditionalMutationSet.java    |   2 -
 .../server/tabletserver/TabletServer.java       | 167 ++++++++++++-------
 3 files changed, 110 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/addacd0d/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index d7eb144..bb5987d 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -75,6 +75,7 @@ import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
+import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.commons.collections.map.LRUMap;
@@ -306,7 +307,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
       serverQueue.queue.add(mutations);
       // never execute more than one task per server
       if (!serverQueue.taskQueued) {
-        threadPool.execute(new LoggingRunnable(log, new SendTask(location)));
+        threadPool.execute(new LoggingRunnable(log, Trace.wrap(new 
SendTask(location))));
         serverQueue.taskQueued = true;
       }
     }
@@ -323,7 +324,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     
     synchronized (serverQueue) {
       if (serverQueue.queue.size() > 0)
-        threadPool.execute(new LoggingRunnable(log, task));
+        threadPool.execute(new LoggingRunnable(log, Trace.wrap(task)));
       else
         serverQueue.taskQueued = false;
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/addacd0d/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
index c25e729..ffa6b77 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
@@ -78,8 +78,6 @@ public class ConditionalMutationSet {
 
   static void 
sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> 
updates) {
     for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : 
updates.entrySet()) {
-      // TODO check if its already in sorted order?
-      // TODO maybe the potential benefit of sorting is not worth the cost
       Collections.sort(entry.getValue(), new 
Comparator<ServerConditionalMutation>() {
         @Override
         public int compare(ServerConditionalMutation o1, 
ServerConditionalMutation o2) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/addacd0d/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 575d7e7..6ccd163 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -66,11 +66,11 @@ import 
org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.CompressedIterators;
 import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
-import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
-import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.TabletType;
 import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
+import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator;
 import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -1584,10 +1584,8 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       }
       
       long pt2 = System.currentTimeMillis();
-      long avgPrepareTime = (long) ((pt2 - pt1) / (double) 
us.queuedMutations.size());
       us.prepareTimes.addStat(pt2 - pt1);
-      if (updateMetrics.isEnabled())
-        updateMetrics.add(TabletServerUpdateMetrics.commitPrep, 
(avgPrepareTime));
+      updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
       
       if (error != null) {
         for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
@@ -1606,9 +1604,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
               
               long t2 = System.currentTimeMillis();
               us.walogTimes.addStat(t2 - t1);
-              if (updateMetrics.isEnabled())
-                updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, 
(t2 - t1));
-              
+              updateWalogWriteTime((t2 - t1));
               break;
             } catch (IOException ex) {
               log.warn("logging mutations failed, retrying");
@@ -1645,14 +1641,11 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
             }
           }
           long t2 = System.currentTimeMillis();
-          
-          long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
-          
+
           us.flushTime += (t2 - pt1);
           us.commitTimes.addStat(t2 - t1);
           
-          if (updateMetrics.isEnabled())
-            updateMetrics.add(TabletServerUpdateMetrics.commitTime, 
avgCommitTime);
+          updateAvgCommitTime(t2 - t1, sendables.size());
         } finally {
           commit.stop();
         }
@@ -1665,6 +1658,21 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       }
       us.totalUpdates += mutationCount;
     }
+
+    private void updateWalogWriteTime(long time) {
+      if (updateMetrics.isEnabled())
+        updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, time);
+    }
+    
+    private void updateAvgCommitTime(long time, int size) {
+      if (updateMetrics.isEnabled())
+        updateMetrics.add(TabletServerUpdateMetrics.commitTime, (long) ((time) 
/ (double) size));
+    }
+
+    private void updateAvgPrepTime(long time, int size) {
+      if (updateMetrics.isEnabled())
+        updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (long) ((time) 
/ (double) size));
+    }
     
     @Override
     public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws 
NoSuchScanIDException {
@@ -1850,61 +1858,86 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       
       boolean sessionCanceled = sess.interruptFlag.get();
       
-      for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
-        Tablet tablet = onlineTablets.get(entry.getKey());
-        if (tablet == null || tablet.isClosed() || sessionCanceled) {
-          for (ServerConditionalMutation scm : entry.getValue())
-            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-        } else {
-          try {
-            
-            @SuppressWarnings("unchecked")
-            List<Mutation> mutations = (List<Mutation>) (List<? extends 
Mutation>) entry.getValue();
-            if (mutations.size() > 0) {
-              
-              CommitSession cs = tablet.prepareMutationsForCommit(new 
TservConstraintEnv(security, sess.credentials), mutations);
+      Span prepSpan = Trace.start("prep");
+      try {
+        long t1 = System.currentTimeMillis();
+        for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+          Tablet tablet = onlineTablets.get(entry.getKey());
+          if (tablet == null || tablet.isClosed() || sessionCanceled) {
+            for (ServerConditionalMutation scm : entry.getValue())
+              results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          } else {
+            try {
               
-              if (cs == null) {
-                for (ServerConditionalMutation scm : entry.getValue())
-                  results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-              } else {
-                for (ServerConditionalMutation scm : entry.getValue())
-                  results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
-                sendables.put(cs, mutations);
+              @SuppressWarnings("unchecked")
+              List<Mutation> mutations = (List<Mutation>) (List<? extends 
Mutation>) entry.getValue();
+              if (mutations.size() > 0) {
+                
+                CommitSession cs = tablet.prepareMutationsForCommit(new 
TservConstraintEnv(security, sess.credentials), mutations);
+                
+                if (cs == null) {
+                  for (ServerConditionalMutation scm : entry.getValue())
+                    results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+                } else {
+                  for (ServerConditionalMutation scm : entry.getValue())
+                    results.add(new TCMResult(scm.getID(), 
TCMStatus.ACCEPTED));
+                  sendables.put(cs, mutations);
+                }
               }
+            } catch (TConstraintViolationException e) {
+              if (e.getNonViolators().size() > 0) {
+                sendables.put(e.getCommitSession(), e.getNonViolators());
+                for (Mutation m : e.getNonViolators())
+                  results.add(new TCMResult(((ServerConditionalMutation) 
m).getID(), TCMStatus.ACCEPTED));
+              }
+              
+              for (Mutation m : e.getViolators())
+                results.add(new TCMResult(((ServerConditionalMutation) 
m).getID(), TCMStatus.VIOLATED));
             }
-          } catch (TConstraintViolationException e) {
-            if (e.getNonViolators().size() > 0) {
-              sendables.put(e.getCommitSession(), e.getNonViolators());
-              for (Mutation m : e.getNonViolators())
-                results.add(new TCMResult(((ServerConditionalMutation) 
m).getID(), TCMStatus.ACCEPTED));
-            }
-            
-            for (Mutation m : e.getViolators())
-              results.add(new TCMResult(((ServerConditionalMutation) 
m).getID(), TCMStatus.VIOLATED));
           }
         }
+        
+        long t2 = System.currentTimeMillis();
+        updateAvgPrepTime(t2 - t1, es.size());
+      } finally {
+        prepSpan.stop();
       }
       
-      while (true && sendables.size() > 0) {
-        try {
-          logger.logManyTablets(sendables);
-          break;
-        } catch (IOException ex) {
-          log.warn("logging mutations failed, retrying");
-        } catch (FSError ex) { // happens when DFS is localFS
-          log.warn("logging mutations failed, retrying");
-        } catch (Throwable t) {
-          log.error("Unknown exception logging mutations, counts for mutations 
in flight not decremented!", t);
-          throw new RuntimeException(t);
+      Span walSpan = Trace.start("wal");
+      try {
+        while (true && sendables.size() > 0) {
+          try {
+            long t1 = System.currentTimeMillis();
+            logger.logManyTablets(sendables);
+            long t2 = System.currentTimeMillis();
+            updateWalogWriteTime(t2 - t1);
+            break;
+          } catch (IOException ex) {
+            log.warn("logging mutations failed, retrying");
+          } catch (FSError ex) { // happens when DFS is localFS
+            log.warn("logging mutations failed, retrying");
+          } catch (Throwable t) {
+            log.error("Unknown exception logging mutations, counts for 
mutations in flight not decremented!", t);
+            throw new RuntimeException(t);
+          }
         }
+      } finally {
+        walSpan.stop();
       }
       
-      for (Entry<CommitSession,? extends List<Mutation>> entry : 
sendables.entrySet()) {
-        CommitSession commitSession = entry.getKey();
-        List<Mutation> mutations = entry.getValue();
-        
-        commitSession.commit(mutations);
+      Span commitSpan = Trace.start("commit");
+      try {
+        long t1 = System.currentTimeMillis();
+        for (Entry<CommitSession,? extends List<Mutation>> entry : 
sendables.entrySet()) {
+          CommitSession commitSession = entry.getKey();
+          List<Mutation> mutations = entry.getValue();
+          
+          commitSession.commit(mutations);
+        }
+        long t2 = System.currentTimeMillis();
+        updateAvgCommitTime(t2 - t1, sendables.size());
+      } finally {
+        commitSpan.stop();
       }
       
     }
@@ -1922,8 +1955,19 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       // get as many locks as possible w/o blocking... defer any rows that are 
locked
       List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
       try {
-        checkConditions(updates, results, cs, symbols);
-        writeConditionalMutations(updates, results, cs);
+        Span checkSpan = Trace.start("Check conditions");
+        try {
+          checkConditions(updates, results, cs, symbols);
+        } finally {
+          checkSpan.stop();
+        }
+        
+        Span updateSpan = Trace.start("apply conditional mutations");
+        try {
+          writeConditionalMutations(updates, results, cs);
+        } finally {
+          updateSpan.stop();
+        }
       } finally {
         rowLocks.releaseRowLocks(locks);
       }
@@ -1962,6 +2006,9 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       if (cs == null || cs.interruptFlag.get())
         throw new NoSuchScanIDException();
       
+      if (!cs.tableId.equals(MetadataTable.ID) && 
!cs.tableId.equals(RootTable.ID))
+        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+
       Text tid = new Text(cs.tableId);
       long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, 
null, null)));
       

Reply via email to