Repository: accumulo Updated Branches: refs/heads/master fec719f5d -> 9d25381b7
ACCUMULO-3980 watch the replication and reset the walog if it drops below the initial value Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d25381b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d25381b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d25381b Branch: refs/heads/master Commit: 9d25381b7a0fa1f728eda13158db0888cb4b6b7d Parents: fec719f Author: Eric C. Newton <[email protected]> Authored: Mon Aug 31 11:44:37 2015 -0400 Committer: Eric C. Newton <[email protected]> Committed: Mon Aug 31 11:44:37 2015 -0400 ---------------------------------------------------------------------- .../apache/accumulo/tserver/log/DfsLogger.java | 27 +++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d25381b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 47fd57d..131c61d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -190,16 +190,24 @@ public class DfsLogger implements Comparable<DfsLogger> { } } } catch (Exception ex) { - log.warn("Exception syncing " + ex); - for (DfsLogger.LogWork logWork : work) { - logWork.exception = ex; - } + fail(work, ex, "synching"); } long duration = System.currentTimeMillis() - start; if (duration > slowFlushMillis) { String msg = new StringBuilder(128).append("Slow sync cost: ").append(duration).append(" ms, current pipeline: ") .append(Arrays.toString(getPipeLine())).toString(); log.info(msg); + if (expectedReplication > 0) { + int current = expectedReplication; + try { + current = ((DFSOutputStream) logFile.getWrappedStream()).getCurrentBlockReplication(); + } catch (IOException e) { + fail(work, e, "getting replication level"); + } + if (current < expectedReplication) { + fail(work, new IOException("replication of " + current + " is less than " + expectedReplication), "replication check"); + } + } } for (DfsLogger.LogWork logWork : work) @@ -209,6 +217,13 @@ public class DfsLogger implements Comparable<DfsLogger> { logWork.latch.countDown(); } } + + private void fail(ArrayList<DfsLogger.LogWork> work, Exception ex, String why) { + log.warn("Exception " + why + " " + ex); + for (DfsLogger.LogWork logWork : work) { + logWork.exception = ex; + } + } } private static class LogWork { @@ -276,6 +291,7 @@ public class DfsLogger implements Comparable<DfsLogger> { private AtomicLong syncCounter; private AtomicLong flushCounter; private final long slowFlushMillis; + private int expectedReplication = 0; private DfsLogger(ServerResources conf) { this.conf = conf; @@ -417,6 +433,9 @@ public class DfsLogger implements Comparable<DfsLogger> { logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize); else logFile = fs.create(new Path(logPath), true, 0, replication, blockSize); + if (logFile.getWrappedStream() instanceof DFSOutputStream) { + expectedReplication = ((DFSOutputStream) logFile.getWrappedStream()).getCurrentBlockReplication(); + } sync = logFile.getClass().getMethod("hsync"); flush = logFile.getClass().getMethod("hflush");
