Repository: accumulo Updated Branches: refs/heads/master 9e770ca3d -> 8be4a3d74
ACCUMULO-2766 fix wal group commit Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1e16159c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1e16159c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1e16159c Branch: refs/heads/master Commit: 1e16159cdb25121123a3ea64df8a47c0eda54709 Parents: 05a64c1 Author: Keith Turner <ktur...@apache.org> Authored: Thu May 15 08:52:04 2014 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Tue Jun 3 12:52:19 2014 -0400 ---------------------------------------------------------------------- .../server/tabletserver/log/DfsLogger.java | 60 +++++++++----------- 1 file changed, 28 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e16159c/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java index 01c2448..fb21ba5 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.LoggingRunnable; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.StringUtil; import org.apache.accumulo.server.logger.LogFileKey; @@ -100,7 +101,8 @@ public class DfsLogger { @Override public void run() { ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>(); - while (true) { + boolean sawClosedMarker = false; + while (!sawClosedMarker) { work.clear(); try { @@ -110,36 +112,20 @@ public class DfsLogger { } workQueue.drainTo(work); - synchronized (closeLock) { - if (!closed) { - try { - sync.invoke(logFile); - } catch (Exception ex) { - log.warn("Exception syncing " + ex); - for (DfsLogger.LogWork logWork : work) { - logWork.exception = ex; - } - } - } else { - for (DfsLogger.LogWork logWork : work) { - logWork.exception = new LogClosedException(); - } + try { + sync.invoke(logFile); + } catch (Exception ex) { + log.warn("Exception syncing " + ex); + for (DfsLogger.LogWork logWork : work) { + logWork.exception = ex; } } - boolean sawClosedMarker = false; for (DfsLogger.LogWork logWork : work) if (logWork == CLOSED_MARKER) sawClosedMarker = true; else logWork.latch.countDown(); - - if (sawClosedMarker) { - synchronized (closeLock) { - closeLock.notifyAll(); - } - break; - } } } } @@ -200,6 +186,7 @@ public class DfsLogger { private Method sync; private Path logPath; private String logger; + private Daemon syncThread; public DfsLogger(ServerResources conf) throws IOException { this.conf = conf; @@ -319,9 +306,9 @@ public class DfsLogger { throw ex; } - Thread t = new Daemon(new LogSyncingTask()); - t.setName("Accumulo WALog thread " + toString()); - t.start(); + syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask())); + syncThread.setName("Accumulo WALog thread " + toString()); + syncThread.start(); } private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException { @@ -386,14 +373,23 @@ public class DfsLogger { // thread to do work closed = true; workQueue.add(CLOSED_MARKER); - while (!workQueue.isEmpty()) - try { - closeLock.wait(); - } catch (InterruptedException e) { - log.info("Interrupted"); - } } + // wait for background thread to finish before closing log file + if(syncThread != null){ + try { + syncThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + //expect workq should be empty at this point + if(workQueue.size() != 0){ + log.error("WAL work queue not empty after sync thread exited"); + throw new IllegalStateException("WAL work queue not empty after sync thread exited"); + } + if (logFile != null) try { logFile.close();