Updated Branches:
  refs/heads/1.5.1-SNAPSHOT 4822b1318 -> 3b41d37ed

ACCUMULO-2172 wait for minor compaction flags to be flushed to the WAL


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3b41d37e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3b41d37e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3b41d37e

Branch: refs/heads/1.5.1-SNAPSHOT
Commit: 3b41d37ed27b5dc8724b427ed6970b083464bd94
Parents: 4822b13
Author: Eric Newton <eric.new...@gmail.com>
Authored: Fri Jan 10 16:27:01 2014 -0500
Committer: Eric Newton <eric.new...@gmail.com>
Committed: Fri Jan 10 16:27:01 2014 -0500

----------------------------------------------------------------------
 .../server/tabletserver/log/DfsLogger.java      | 58 +++++++++-----------
 .../tabletserver/log/TabletServerLogger.java    |  4 +-
 2 files changed, 29 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3b41d37e/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
index 213c885..ded2962 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
@@ -89,7 +90,7 @@ public class DfsLogger {
 
   private final Object closeLock = new Object();
 
-  private static final DfsLogger.LogWork CLOSED_MARKER = new 
DfsLogger.LogWork(null, null);
+  private static final DfsLogger.LogWork CLOSED_MARKER = new 
DfsLogger.LogWork(null);
 
   private static final LogFileValue EMPTY = new LogFileValue();
 
@@ -145,12 +146,10 @@ public class DfsLogger {
   }
 
   static class LogWork {
-    List<TabletMutations> mutations;
     CountDownLatch latch;
     volatile Exception exception;
 
-    public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
-      this.mutations = mutations;
+    public LogWork(CountDownLatch latch) {
       this.latch = latch;
     }
   }
@@ -439,27 +438,20 @@ public class DfsLogger {
   public LoggerOperation log(int seq, int tid, Mutation mutation) throws 
IOException {
     return logManyTablets(Collections.singletonList(new TabletMutations(tid, 
seq, Collections.singletonList(mutation))));
   }
-
-  public LoggerOperation logManyTablets(List<TabletMutations> mutations) 
throws IOException {
-    DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new 
CountDownLatch(1));
-
+  
+  private LoggerOperation logFileData(List<Pair<LogFileKey, LogFileValue>> 
keys) throws IOException {
+    DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1));
     synchronized (DfsLogger.this) {
       try {
-        for (TabletMutations tabletMutations : mutations) {
-          LogFileKey key = new LogFileKey();
-          key.event = MANY_MUTATIONS;
-          key.seq = tabletMutations.getSeq();
-          key.tid = tabletMutations.getTid();
-          LogFileValue value = new LogFileValue();
-          value.mutations = tabletMutations.getMutations();
-          write(key, value);
+        for (Pair<LogFileKey,LogFileValue> pair : keys) {
+          write(pair.getFirst(), pair.getSecond());
         }
       } catch (Exception e) {
         log.error(e, e);
         work.exception = e;
       }
     }
-
+    
     synchronized (closeLock) {
       // use a different lock for close check so that adding to work queue 
does not need
       // to wait on walog I/O operations
@@ -472,31 +464,35 @@ public class DfsLogger {
     return new LoggerOperation(work);
   }
 
-  public synchronized void minorCompactionFinished(int seq, int tid, String 
fqfn) throws IOException {
+  public LoggerOperation logManyTablets(List<TabletMutations> mutations) 
throws IOException {
+    List<Pair<LogFileKey, LogFileValue>> data = new ArrayList<Pair<LogFileKey, 
LogFileValue>>();
+    for (TabletMutations tabletMutations : mutations) {
+      LogFileKey key = new LogFileKey();
+      key.event = MANY_MUTATIONS;
+      key.seq = tabletMutations.getSeq();
+      key.tid = tabletMutations.getTid();
+      LogFileValue value = new LogFileValue();
+      value.mutations = tabletMutations.getMutations();
+      data.add(new Pair<LogFileKey, LogFileValue>(key, value));
+    }
+    return logFileData(data);
+  }
+
+  public LoggerOperation minorCompactionFinished(int seq, int tid, String 
fqfn) throws IOException {
     LogFileKey key = new LogFileKey();
     key.event = COMPACTION_FINISH;
     key.seq = seq;
     key.tid = tid;
-    try {
-      write(key, EMPTY);
-    } catch (IOException ex) {
-      log.error(ex);
-      throw ex;
-    }
+    return logFileData(Collections.singletonList(new Pair<LogFileKey, 
LogFileValue>(key, EMPTY)));
   }
 
-  public synchronized void minorCompactionStarted(int seq, int tid, String 
fqfn) throws IOException {
+  public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) 
throws IOException {
     LogFileKey key = new LogFileKey();
     key.event = COMPACTION_START;
     key.seq = seq;
     key.tid = tid;
     key.filename = fqfn;
-    try {
-      write(key, EMPTY);
-    } catch (IOException ex) {
-      log.error(ex);
-      throw ex;
-    }
+    return logFileData(Collections.singletonList(new Pair<LogFileKey, 
LogFileValue>(key, EMPTY)));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3b41d37e/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
index dd9d3fa..6ca1ad0 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
@@ -390,7 +390,7 @@ public class TabletServerLogger {
     int seq = write(commitSession, true, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
-        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), 
fullyQualifiedFileName);
+        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), 
fullyQualifiedFileName).await();
         return null;
       }
     });
@@ -406,7 +406,7 @@ public class TabletServerLogger {
     write(commitSession, false, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
-        logger.minorCompactionStarted(seq, commitSession.getLogId(), 
fullyQualifiedFileName);
+        logger.minorCompactionStarted(seq, commitSession.getLogId(), 
fullyQualifiedFileName).await();
         return null;
       }
     });

Reply via email to