Updated Branches: refs/heads/1.5.1-SNAPSHOT 4822b1318 -> 3b41d37ed
ACCUMULO-2172 wait for minor compaction flags to be flushed to the WAL Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3b41d37e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3b41d37e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3b41d37e Branch: refs/heads/1.5.1-SNAPSHOT Commit: 3b41d37ed27b5dc8724b427ed6970b083464bd94 Parents: 4822b13 Author: Eric Newton <eric.new...@gmail.com> Authored: Fri Jan 10 16:27:01 2014 -0500 Committer: Eric Newton <eric.new...@gmail.com> Committed: Fri Jan 10 16:27:01 2014 -0500 ---------------------------------------------------------------------- .../server/tabletserver/log/DfsLogger.java | 58 +++++++++----------- .../tabletserver/log/TabletServerLogger.java | 4 +- 2 files changed, 29 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3b41d37e/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java index 213c885..ded2962 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java @@ -43,6 +43,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.StringUtil; import org.apache.accumulo.server.logger.LogFileKey; import org.apache.accumulo.server.logger.LogFileValue; @@ -89,7 +90,7 @@ public class DfsLogger { private final Object closeLock = new Object(); - private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null); + private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null); private static final LogFileValue EMPTY = new LogFileValue(); @@ -145,12 +146,10 @@ public class DfsLogger { } static class LogWork { - List<TabletMutations> mutations; CountDownLatch latch; volatile Exception exception; - public LogWork(List<TabletMutations> mutations, CountDownLatch latch) { - this.mutations = mutations; + public LogWork(CountDownLatch latch) { this.latch = latch; } } @@ -439,27 +438,20 @@ public class DfsLogger { public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException { return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation)))); } - - public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException { - DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1)); - + + private LoggerOperation logFileData(List<Pair<LogFileKey, LogFileValue>> keys) throws IOException { + DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1)); synchronized (DfsLogger.this) { try { - for (TabletMutations tabletMutations : mutations) { - LogFileKey key = new LogFileKey(); - key.event = MANY_MUTATIONS; - key.seq = tabletMutations.getSeq(); - key.tid = tabletMutations.getTid(); - LogFileValue value = new LogFileValue(); - value.mutations = tabletMutations.getMutations(); - write(key, value); + for (Pair<LogFileKey,LogFileValue> pair : keys) { + write(pair.getFirst(), pair.getSecond()); } } catch (Exception e) { log.error(e, e); work.exception = e; } } - + synchronized (closeLock) { // use a different lock for close check so that adding to work queue does not need // to wait on walog I/O operations @@ -472,31 +464,35 @@ public class DfsLogger { return new LoggerOperation(work); } - public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException { + public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException { + List<Pair<LogFileKey, LogFileValue>> data = new ArrayList<Pair<LogFileKey, LogFileValue>>(); + for (TabletMutations tabletMutations : mutations) { + LogFileKey key = new LogFileKey(); + key.event = MANY_MUTATIONS; + key.seq = tabletMutations.getSeq(); + key.tid = tabletMutations.getTid(); + LogFileValue value = new LogFileValue(); + value.mutations = tabletMutations.getMutations(); + data.add(new Pair<LogFileKey, LogFileValue>(key, value)); + } + return logFileData(data); + } + + public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException { LogFileKey key = new LogFileKey(); key.event = COMPACTION_FINISH; key.seq = seq; key.tid = tid; - try { - write(key, EMPTY); - } catch (IOException ex) { - log.error(ex); - throw ex; - } + return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY))); } - public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException { + public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException { LogFileKey key = new LogFileKey(); key.event = COMPACTION_START; key.seq = seq; key.tid = tid; key.filename = fqfn; - try { - write(key, EMPTY); - } catch (IOException ex) { - log.error(ex); - throw ex; - } + return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY))); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3b41d37e/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java index dd9d3fa..6ca1ad0 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java @@ -390,7 +390,7 @@ public class TabletServerLogger { int seq = write(commitSession, true, new Writer() { @Override public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { - logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName); + logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName).await(); return null; } }); @@ -406,7 +406,7 @@ public class TabletServerLogger { write(commitSession, false, new Writer() { @Override public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { - logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName); + logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName).await(); return null; } });