Repository: accumulo Updated Branches: refs/heads/master e123c26c7 -> a50f80225
ACCUMULO-3976 Remove some unnecessary reflection Earlier HDFS didn't have the getPipeline method always exposed. Since Hadoop2, this isn't an issue so we can do away with the reflection. Also made a change to try to avoid excessive DatanodeInfo arrays being created. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/77922c59 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/77922c59 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/77922c59 Branch: refs/heads/master Commit: 77922c596b6edc27d0c71a06388e68d7beaf8e2e Parents: e123c26 Author: Josh Elser <[email protected]> Authored: Sat Aug 29 16:03:30 2015 -0400 Committer: Josh Elser <[email protected]> Committed: Sat Aug 29 16:03:30 2015 -0400 ---------------------------------------------------------------------- .../apache/accumulo/tserver/log/DfsLogger.java | 48 +++++--------------- 1 file changed, 11 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/77922c59/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index bdc7364..33a640d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -62,6 +62,7 @@ import org.apache.accumulo.tserver.logger.LogFileValue; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +79,7 @@ public class DfsLogger implements Comparable<DfsLogger> { public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; private static final Logger log = LoggerFactory.getLogger(DfsLogger.class); - private static final Object[] NO_ARGS = new Object[] {}; + private static final DatanodeInfo[] EMPTY_PIPELINE = new DatanodeInfo[0]; public static class LogClosedException extends IOException { private static final long serialVersionUID = 1L; @@ -196,7 +197,7 @@ public class DfsLogger implements Comparable<DfsLogger> { } long duration = System.currentTimeMillis() - start; if (duration > slowFlushMillis) { - String msg = new StringBuilder().append("Slow sync cost: ").append(duration).append(" ms, current pipeline: ").append(Arrays.toString(getPipeLine())) + String msg = new StringBuilder(128).append("Slow sync cost: ").append(duration).append(" ms, current pipeline: ").append(Arrays.toString(getPipeLine())) .toString(); log.info(msg); } @@ -275,7 +276,6 @@ public class DfsLogger implements Comparable<DfsLogger> { private AtomicLong syncCounter; private AtomicLong flushCounter; private final long slowFlushMillis; - private Method getPipeLine; public DfsLogger(ServerResources conf, AtomicLong syncCounter, AtomicLong flushCounter) throws IOException { this.conf = conf; @@ -417,7 +417,6 @@ public class DfsLogger implements Comparable<DfsLogger> { sync = logFile.getClass().getMethod("hsync"); flush = logFile.getClass().getMethod("hflush"); - getPipeLine = this.getGetPipeline(logFile); // Initialize the crypto operations. org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf @@ -644,49 +643,24 @@ public class DfsLogger implements Comparable<DfsLogger> { } /* - * The following two methods were shamelessly lifted from HBASE-11240. Thanks HBase! + * The following method was shamelessly lifted from HBASE-11240 (sans reflection). Thanks HBase! */ /** - * Find the 'getPipeline' on the passed <code>os</code> stream. - * - * @return Method or null. - */ - private Method getGetPipeline(final FSDataOutputStream os) { - Method m = null; - if (os != null) { - Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass(); - try { - m = wrappedStreamClass.getDeclaredMethod("getPipeline", new Class<?>[] {}); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - log.info("FileSystem's output stream doesn't support getPipeline; not available; fsOut=" + wrappedStreamClass.getName()); - } catch (SecurityException e) { - log.info("Doesn't have access to getPipeline on FileSystems's output stream ; fsOut=" + wrappedStreamClass.getName(), e); - m = null; // could happen on setAccessible() - } - } - return m; - } - - /** * This method gets the pipeline for the current walog. * * @return non-null array of DatanodeInfo */ DatanodeInfo[] getPipeLine() { - if (this.getPipeLine != null) { - Object repl; - try { - repl = this.getPipeLine.invoke(this.logFile, NO_ARGS); - if (repl instanceof DatanodeInfo[]) { - return ((DatanodeInfo[]) repl); - } - } catch (Exception e) { - log.info("Get pipeline failed", e); + if (null != logFile) { + OutputStream os = logFile.getWrappedStream(); + if (os instanceof DFSOutputStream) { + return ((DFSOutputStream) os).getPipeline(); } } - return new DatanodeInfo[0]; + + // Don't have a pipeline or can't figure it out. + return EMPTY_PIPELINE; } }
