Repository: accumulo
Updated Branches:
  refs/heads/master db2131598 -> 0f2f80d97


ACCUMULO-4112 use metadata table durability config when writing minc events to 
walog


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/37857c53
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/37857c53
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/37857c53

Branch: refs/heads/master
Commit: 37857c5325ff189413a86a073a7a70f8a23d3104
Parents: e52d547
Author: Keith Turner <ktur...@apache.org>
Authored: Mon Mar 14 15:30:28 2016 -0400
Committer: Keith Turner <ktur...@apache.org>
Committed: Mon Mar 14 15:30:28 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/TabletServer.java   | 17 ++++++++++++--
 .../apache/accumulo/tserver/log/DfsLogger.java  | 24 ++++++++++++++++----
 .../tserver/log/TabletServerLogger.java         | 22 +++++++++---------
 3 files changed, 45 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/37857c53/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 038d3e8..71ad6bc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2966,13 +2966,26 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
     }
   }
 
+  private Durability getMincEventDurability(KeyExtent extent) {
+    TableConfiguration conf;
+    if (extent.isMeta()) {
+      conf = confFactory.getTableConfiguration(RootTable.ID);
+    } else {
+      conf = confFactory.getTableConfiguration(MetadataTable.ID);
+    }
+    Durability durability = 
DurabilityImpl.fromString(conf.get(Property.TABLE_DURABILITY));
+    return durability;
+  }
+
   public void minorCompactionFinished(CommitSession tablet, String 
newDatafile, int walogSeq) throws IOException {
+    Durability durability = getMincEventDurability(tablet.getExtent());
     totalMinorCompactions.incrementAndGet();
-    logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
+    logger.minorCompactionFinished(tablet, newDatafile, walogSeq, durability);
   }
 
   public void minorCompactionStarted(CommitSession tablet, int 
lastUpdateSequence, String newMapfileLocation) throws IOException {
-    logger.minorCompactionStarted(tablet, lastUpdateSequence, 
newMapfileLocation);
+    Durability durability = getMincEventDurability(tablet.getExtent());
+    logger.minorCompactionStarted(tablet, lastUpdateSequence, 
newMapfileLocation, durability);
   }
 
   public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration 
tconf, List<LogEntry> logEntries, Set<String> tabletFiles,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/37857c53/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 8512690..a36463d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -237,6 +237,20 @@ public class DfsLogger {
     }
   }
 
+  private static class NoWaitLoggerOperation extends LoggerOperation {
+
+    public NoWaitLoggerOperation() {
+      super(null);
+    }
+
+    @Override
+    public void await() throws IOException {
+      return;
+    }
+  }
+
+  static final LoggerOperation NO_WAIT_LOGGER_OP = new NoWaitLoggerOperation();
+
   @Override
   public boolean equals(Object obj) {
     // filename is unique
@@ -545,7 +559,7 @@ public class DfsLogger {
     }
 
     if (durability == Durability.LOG)
-      return null;
+      return NO_WAIT_LOGGER_OP;
 
     synchronized (closeLock) {
       // use a different lock for close check so that adding to work queue 
does not need
@@ -587,21 +601,21 @@ public class DfsLogger {
     return result;
   }
 
-  public LoggerOperation minorCompactionFinished(int seq, int tid, String 
fqfn) throws IOException {
+  public LoggerOperation minorCompactionFinished(int seq, int tid, String 
fqfn, Durability durability) throws IOException {
     LogFileKey key = new LogFileKey();
     key.event = COMPACTION_FINISH;
     key.seq = seq;
     key.tid = tid;
-    return logFileData(Collections.singletonList(new 
Pair<LogFileKey,LogFileValue>(key, EMPTY)), Durability.SYNC);
+    return logFileData(Collections.singletonList(new 
Pair<LogFileKey,LogFileValue>(key, EMPTY)), durability);
   }
 
-  public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) 
throws IOException {
+  public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn, 
Durability durability) throws IOException {
     LogFileKey key = new LogFileKey();
     key.event = COMPACTION_START;
     key.seq = seq;
     key.tid = tid;
     key.filename = fqfn;
-    return logFileData(Collections.singletonList(new 
Pair<LogFileKey,LogFileValue>(key, EMPTY)), Durability.SYNC);
+    return logFileData(Collections.singletonList(new 
Pair<LogFileKey,LogFileValue>(key, EMPTY)), durability);
   }
 
   public String getLogger() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/37857c53/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 13c742a..11585f2 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -318,9 +318,7 @@ public class TabletServerLogger {
             throw new RuntimeException("Logger sequence generator wrapped!  
Onos!!!11!eleven");
           ArrayList<LoggerOperation> queuedOperations = new 
ArrayList<LoggerOperation>(copy.size());
           for (DfsLogger wal : copy) {
-            LoggerOperation lop = writer.write(wal, seq);
-            if (lop != null)
-              queuedOperations.add(lop);
+            queuedOperations.add(writer.write(wal, seq));
           }
 
           for (LoggerOperation lop : queuedOperations) {
@@ -387,7 +385,7 @@ public class TabletServerLogger {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
         logger.defineTablet(commitSession.getWALogSeq(), 
commitSession.getLogId(), commitSession.getExtent());
-        return null;
+        return DfsLogger.NO_WAIT_LOGGER_OP;
       }
     });
   }
@@ -443,29 +441,31 @@ public class TabletServerLogger {
     return seq;
   }
 
-  public void minorCompactionFinished(final CommitSession commitSession, final 
String fullyQualifiedFileName, final int walogSeq) throws IOException {
+  public void minorCompactionFinished(final CommitSession commitSession, final 
String fullyQualifiedFileName, final int walogSeq, final Durability durability)
+      throws IOException {
 
     long t1 = System.currentTimeMillis();
 
     int seq = write(commitSession, true, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
-        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), 
fullyQualifiedFileName).await();
-        return null;
+        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), 
fullyQualifiedFileName, durability).await();
+        return DfsLogger.NO_WAIT_LOGGER_OP;
       }
     });
 
     long t2 = System.currentTimeMillis();
 
-    log.debug(" wrote MinC finish  " + seq + ": writeTime:" + (t2 - t1) + "ms 
");
+    log.debug(" wrote MinC finish  {}: writeTime:{}ms  durability:{}", seq, 
(t2 - t1), durability);
   }
 
-  public int minorCompactionStarted(final CommitSession commitSession, final 
int seq, final String fullyQualifiedFileName) throws IOException {
+  public int minorCompactionStarted(final CommitSession commitSession, final 
int seq, final String fullyQualifiedFileName, final Durability durability)
+      throws IOException {
     write(commitSession, false, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
-        logger.minorCompactionStarted(seq, commitSession.getLogId(), 
fullyQualifiedFileName).await();
-        return null;
+        logger.minorCompactionStarted(seq, commitSession.getLogId(), 
fullyQualifiedFileName, durability).await();
+        return DfsLogger.NO_WAIT_LOGGER_OP;
       }
     });
     return seq;

Reply via email to