ACCUMULO-804 initial attempt to make accumulo binary compatible with 2.0 git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/1.5@1483399 13f79535-47bb-0310-9956-ffa450edef68 (cherry picked from commit d6c612d087e7f922c9935888cf443d4a9f1999c2)
Reason: Hadoop2 Compat Author: Eric C. Newton <e...@apache.org> Ref: ACCUMULO-1792 Signed-off-by: Eric Newton <eric.new...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2c83ca33 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2c83ca33 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2c83ca33 Branch: refs/heads/1.5.1-SNAPSHOT Commit: 2c83ca337ac54e3c71aff1596bcf6ea04aea6491 Parents: 1b1334f Author: Jonathan M Hsieh <j...@cloudera.com> Authored: Wed May 29 14:23:01 2013 -0700 Committer: Eric Newton <eric.new...@gmail.com> Committed: Mon Nov 25 16:06:42 2013 -0500 ---------------------------------------------------------------------- .../accumulo/core/util/TTimeoutTransport.java | 12 ++++- .../org/apache/accumulo/server/Accumulo.java | 51 ++++++++++++++++++-- 2 files changed, 58 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c83ca33/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java index 3c1fa6a..0aebc39 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java +++ b/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java @@ -21,6 +21,7 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Method; import java.net.Socket; import java.net.SocketAddress; import java.nio.channels.spi.SelectorProvider; @@ -31,12 +32,21 @@ import org.apache.thrift.transport.TTransport; public class TTimeoutTransport { + private static InputStream getInputStream(Socket socket, long timeout) { + try { + Method m = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE); + return (InputStream)m.invoke(null, socket, timeout); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException { Socket socket = SelectorProvider.provider().openSocketChannel().socket(); socket.setSoLinger(false, 0); socket.setTcpNoDelay(true); socket.connect(addr); - InputStream input = new BufferedInputStream(NetUtils.getInputStream(socket, timeoutMillis), 1024 * 10); + InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10); OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10); return new TIOStreamTransport(input, output); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c83ca33/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java index b2feb5c..32462b7 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Map.Entry; @@ -212,10 +213,7 @@ public class Accumulo { while (true) { try { FileSystem fs = FileSystem.get(CachedConfiguration.getInstance()); - if (!(fs instanceof DistributedFileSystem)) - break; - DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(CachedConfiguration.getInstance()); - if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET)) + if (!isInSafeMode(fs)) break; log.warn("Waiting for the NameNode to leave safemode"); } catch (IOException ex) { @@ -227,4 +225,49 @@ public class Accumulo { } log.info("Connected to HDFS"); } + + private static boolean isInSafeMode(FileSystem fs) throws IOException { + if (!(fs instanceof DistributedFileSystem)) + return false; + DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(CachedConfiguration.getInstance()); + // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET)) + // Becomes this: + Class<?> constantClass; + try { + // hadoop 2.0 + constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants"); + } catch (ClassNotFoundException ex) { + // hadoop 1.0 + try { + constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot figure out the right class for Constants"); + } + } + Class<?> safeModeAction = null; + for (Class<?> klass : constantClass.getDeclaredClasses()) { + if (klass.getSimpleName().equals("SafeModeAction")) { + safeModeAction = klass; + break; + } + } + if (safeModeAction == null) { + throw new RuntimeException("Cannot find SafeModeAction in constants class"); + } + + Object get = null; + for (Object obj : safeModeAction.getEnumConstants()) { + if (obj.toString().equals("SAFEMODE_GET")) + get = obj; + } + if (get == null) { + throw new RuntimeException("cannot find SAFEMODE_GET"); + } + try { + Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction); + return (Boolean)setSafeMode.invoke(dfs, get); + } catch (Exception ex) { + throw new RuntimeException("cannot find method setSafeMode"); + } + } }