ACCUMULO-2160 back-port real bugs found by findbugs
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cb50a743 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cb50a743 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cb50a743 Branch: refs/heads/1.5.1-SNAPSHOT Commit: cb50a743dee006bcbe1bf196ff224af5557722f6 Parents: 1413ebc 5dd6f84 Author: Eric Newton <eric.new...@gmail.com> Authored: Thu Jan 9 15:48:23 2014 -0500 Committer: Eric Newton <eric.new...@gmail.com> Committed: Thu Jan 9 15:48:23 2014 -0500 ---------------------------------------------------------------------- .../apache/accumulo/core/client/mapreduce/RangeInputSplit.java | 3 ++- server/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +- .../org/apache/accumulo/server/tabletserver/TabletServer.java | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/cb50a743/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java index 0718505,0000000..561e7ac mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java @@@ -1,432 -1,0 +1,433 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.ArrayList; ++import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.CredentialHelper; +import org.apache.accumulo.core.util.Pair; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.log4j.Level; + +/** + * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. + */ +public class RangeInputSplit extends InputSplit implements Writable { + private Range range; + private String[] locations; + private String table, instanceName, zooKeepers, principal; + private AuthenticationToken token; + private Boolean offline, mockInstance, isolatedScan, localIterators; + private Authorizations auths; + private Set<Pair<Text,Text>> fetchedColumns; + private List<IteratorSetting> iterators; + private Level level; + + public RangeInputSplit() { + range = new Range(); + locations = new String[0]; + } + + public RangeInputSplit(Range range, String[] locations) { + this.range = range; + this.locations = locations; + } + + public Range getRange() { + return range; + } + + private static byte[] extractBytes(ByteSequence seq, int numBytes) { + byte[] bytes = new byte[numBytes + 1]; + bytes[0] = 0; + for (int i = 0; i < numBytes; i++) { + if (i >= seq.length()) + bytes[i + 1] = 0; + else + bytes[i + 1] = seq.byteAt(i); + } + return bytes; + } + + public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { + int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); + BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); + BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); + BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); + return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); + } + + public float getProgress(Key currentKey) { + if (currentKey == null) + return 0f; + if (range.getStartKey() != null && range.getEndKey() != null) { + if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { + // just look at the row progress + return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { + // just look at the column family progress + return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { + // just look at the column qualifier progress + return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); + } + } + // if we can't figure it out, then claim no progress + return 0f; + } + + /** + * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. + */ + @Override + public long getLength() throws IOException { + Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); + Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); + int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); + long diff = 0; + + byte[] start = startRow.getBytes(); + byte[] stop = stopRow.getBytes(); + for (int i = 0; i < maxCommon; ++i) { + diff |= 0xff & (start[i] ^ stop[i]); + diff <<= Byte.SIZE; + } + + if (startRow.getLength() != stopRow.getLength()) + diff |= 0xff; + + return diff + 1; + } + + @Override + public String[] getLocations() throws IOException { + return locations; + } + + @Override + public void readFields(DataInput in) throws IOException { + range.readFields(in); + int numLocs = in.readInt(); + locations = new String[numLocs]; + for (int i = 0; i < numLocs; ++i) + locations[i] = in.readUTF(); + + if (in.readBoolean()) { + isolatedScan = in.readBoolean(); + } + + if (in.readBoolean()) { + offline = in.readBoolean(); + } + + if (in.readBoolean()) { + localIterators = in.readBoolean(); + } + + if (in.readBoolean()) { + mockInstance = in.readBoolean(); + } + + if (in.readBoolean()) { + int numColumns = in.readInt(); + List<String> columns = new ArrayList<String>(numColumns); + for (int i = 0; i < numColumns; i++) { + columns.add(in.readUTF()); + } + + fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns); + } + + if (in.readBoolean()) { + String strAuths = in.readUTF(); + auths = new Authorizations(strAuths.getBytes(Charset.forName("UTF-8"))); + } + + if (in.readBoolean()) { + principal = in.readUTF(); + } + + if (in.readBoolean()) { + String tokenClass = in.readUTF(); + byte[] base64TokenBytes = in.readUTF().getBytes(Charset.forName("UTF-8")); + byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes); + + try { + token = CredentialHelper.extractToken(tokenClass, tokenBytes); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + } + + if (in.readBoolean()) { + instanceName = in.readUTF(); + } + + if (in.readBoolean()) { + zooKeepers = in.readUTF(); + } + + if (in.readBoolean()) { + level = Level.toLevel(in.readInt()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + range.write(out); + out.writeInt(locations.length); + for (int i = 0; i < locations.length; ++i) + out.writeUTF(locations[i]); + + out.writeBoolean(null != isolatedScan); + if (null != isolatedScan) { + out.writeBoolean(isolatedScan); + } + + out.writeBoolean(null != offline); + if (null != offline) { + out.writeBoolean(offline); + } + + out.writeBoolean(null != localIterators); + if (null != localIterators) { + out.writeBoolean(localIterators); + } + + out.writeBoolean(null != mockInstance); + if (null != mockInstance) { + out.writeBoolean(mockInstance); + } + + out.writeBoolean(null != fetchedColumns); + if (null != fetchedColumns) { + String[] cols = InputConfigurator.serializeColumns(fetchedColumns); + out.writeInt(cols.length); + for (String col : cols) { + out.writeUTF(col); + } + } + + out.writeBoolean(null != auths); + if (null != auths) { + out.writeUTF(auths.serialize()); + } + + out.writeBoolean(null != principal); + if (null != principal) { + out.writeUTF(principal); + } + + out.writeBoolean(null != token); + if (null != token) { + out.writeUTF(token.getClass().getCanonicalName()); + try { + out.writeUTF(CredentialHelper.tokenAsBase64(token)); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + } + + out.writeBoolean(null != instanceName); + if (null != instanceName) { + out.writeUTF(instanceName); + } + + out.writeBoolean(null != zooKeepers); + if (null != zooKeepers) { + out.writeUTF(zooKeepers); + } + + out.writeBoolean(null != level); + if (null != level) { + out.writeInt(level.toInt()); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(256); + sb.append("Range: ").append(range); - sb.append(" Locations: ").append(locations); ++ sb.append(" Locations: ").append(Arrays.asList(locations)); + sb.append(" Table: ").append(table); + sb.append(" InstanceName: ").append(instanceName); + sb.append(" zooKeepers: ").append(zooKeepers); + sb.append(" principal: ").append(principal); + sb.append(" authenticationToken: ").append(token); + sb.append(" Authorizations: ").append(auths); + sb.append(" offlineScan: ").append(offline); + sb.append(" mockInstance: ").append(mockInstance); + sb.append(" isolatedScan: ").append(isolatedScan); + sb.append(" localIterators: ").append(localIterators); + sb.append(" fetchColumns: ").append(fetchedColumns); + sb.append(" iterators: ").append(iterators); + sb.append(" logLevel: ").append(level); + return sb.toString(); + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public Instance getInstance() { + if (null == instanceName) { + return null; + } + + if (isMockInstance()) { + return new MockInstance(getInstanceName()); + } + + if (null == zooKeepers) { + return null; + } + + return new ZooKeeperInstance(getInstanceName(), getZooKeepers()); + } + + public String getInstanceName() { + return instanceName; + } + + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + public String getZooKeepers() { + return zooKeepers; + } + + public void setZooKeepers(String zooKeepers) { + this.zooKeepers = zooKeepers; + } + + public String getPrincipal() { + return principal; + } + + public void setPrincipal(String principal) { + this.principal = principal; + } + + public AuthenticationToken getToken() { + return token; + } + + public void setToken(AuthenticationToken token) { + this.token = token; + ; + } + + public Boolean isOffline() { + return offline; + } + + public void setOffline(Boolean offline) { + this.offline = offline; + } + + public void setLocations(String[] locations) { + this.locations = locations; + } + + public Boolean isMockInstance() { + return mockInstance; + } + + public void setMockInstance(Boolean mockInstance) { + this.mockInstance = mockInstance; + } + + public Boolean isIsolatedScan() { + return isolatedScan; + } + + public void setIsolatedScan(Boolean isolatedScan) { + this.isolatedScan = isolatedScan; + } + + public Authorizations getAuths() { + return auths; + } + + public void setAuths(Authorizations auths) { + this.auths = auths; + } + + public void setRange(Range range) { + this.range = range; + } + + public Boolean usesLocalIterators() { + return localIterators; + } + + public void setUsesLocalIterators(Boolean localIterators) { + this.localIterators = localIterators; + } + + public Set<Pair<Text,Text>> getFetchedColumns() { + return fetchedColumns; + } + + public void setFetchedColumns(Set<Pair<Text,Text>> fetchedColumns) { + this.fetchedColumns = fetchedColumns; + } + + public List<IteratorSetting> getIterators() { + return iterators; + } + + public void setIterators(List<IteratorSetting> iterators) { + this.iterators = iterators; + } + + public Level getLogLevel() { + return level; + } + + public void setLogLevel(Level level) { + this.level = level; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/cb50a743/server/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --cc server/src/main/java/org/apache/accumulo/server/Accumulo.java index 33bb871,0000000..f56dfd8 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java @@@ -1,309 -1,0 +1,309 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.trace.DistributedTrace; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.Version; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.helpers.FileWatchdog; +import org.apache.log4j.helpers.LogLog; +import org.apache.log4j.xml.DOMConfigurator; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +public class Accumulo { + + private static final Logger log = Logger.getLogger(Accumulo.class); + + public static synchronized void updateAccumuloVersion(FileSystem fs) { + try { + if (getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) { + fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.DATA_VERSION)); + fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION), false); + } + } catch (IOException e) { + throw new RuntimeException("Unable to set accumulo version: an error occurred.", e); + } + } + + public static synchronized int getAccumuloPersistentVersion(FileSystem fs) { + int dataVersion; + try { + FileStatus[] files = fs.listStatus(ServerConstants.getDataVersionLocation()); + if (files == null || files.length == 0) { + dataVersion = -1; // assume it is 0.5 or earlier + } else { + dataVersion = Integer.parseInt(files[0].getPath().getName()); + } + return dataVersion; + } catch (IOException e) { + throw new RuntimeException("Unable to read accumulo version: an error occurred.", e); + } + } + + public static void enableTracing(String address, String application) { + try { + DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address); + } catch (Exception ex) { + log.error("creating remote sink for trace spans", ex); + } + } + + private static class LogMonitor extends FileWatchdog implements Watcher { + String path; + + protected LogMonitor(String instance, String filename, int delay) { + super(filename); + setDelay(delay); + this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT; + } + + private void setMonitorPort() { + try { + String port = new String(ZooReaderWriter.getInstance().getData(path, null)); + System.setProperty("org.apache.accumulo.core.host.log.port", port); + log.info("Changing monitor log4j port to "+port); + doOnChange(); + } catch (Exception e) { + log.error("Error reading zookeeper data for monitor log4j port", e); + } + } + + @Override + public void run() { + try { + if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null) + setMonitorPort(); + log.info("Set watch for monitor log4j port"); + } catch (Exception e) { + log.error("Unable to set watch for monitor log4j port " + path); + } + super.run(); + } + + @Override + protected void doOnChange() { + LogManager.resetConfiguration(); + new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository()); + } + + @Override + public void process(WatchedEvent event) { + setMonitorPort(); + if (event.getPath() != null) { + try { + ZooReaderWriter.getInstance().exists(event.getPath(), this); + } catch (Exception ex) { + log.error("Unable to reset watch for monitor log4j port", ex); + } + } + } + } + + public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException { + + System.setProperty("org.apache.accumulo.core.application", application); + + if (System.getenv("ACCUMULO_LOG_DIR") != null) + System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR")); + else + System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME") + "/logs/"); + + String localhost = InetAddress.getLocalHost().getHostName(); + System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost); + + if (System.getenv("ACCUMULO_LOG_HOST") != null) + System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST")); + else + System.setProperty("org.apache.accumulo.core.host.log", localhost); + + int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT); + System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort)); + + // Use a specific log config, if it exists - String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application); ++ String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR")); + if (!new File(logConfig).exists()) { + // otherwise, use the generic config + logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR")); + } + // Turn off messages about not being able to reach the remote logger... we protect against that. + LogLog.setQuietMode(true); + + // Configure logging + if (logPort==0) + new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start(); + else + DOMConfigurator.configureAndWatch(logConfig, 5000); + + log.info(application + " starting"); + log.info("Instance " + config.getInstance().getInstanceID()); + int dataVersion = Accumulo.getAccumuloPersistentVersion(fs); + log.info("Data Version " + dataVersion); + Accumulo.waitForZookeeperAndHdfs(fs); + + Version codeVersion = new Version(Constants.VERSION); + if (dataVersion != Constants.DATA_VERSION && dataVersion != Constants.PREV_DATA_VERSION) { + throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not compatible with files stored using data version " + dataVersion); + } + + TreeMap<String,String> sortedProps = new TreeMap<String,String>(); + for (Entry<String,String> entry : config.getConfiguration()) + sortedProps.put(entry.getKey(), entry.getValue()); + + for (Entry<String,String> entry : sortedProps.entrySet()) { + if (entry.getKey().toLowerCase().contains("password") || entry.getKey().toLowerCase().contains("secret") + || entry.getKey().startsWith(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey())) + log.info(entry.getKey() + " = <hidden>"); + else + log.info(entry.getKey() + " = " + entry.getValue()); + } + + monitorSwappiness(); + } + + /** + * + */ + public static void monitorSwappiness() { + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + try { + String procFile = "/proc/sys/vm/swappiness"; + File swappiness = new File(procFile); + if (swappiness.exists() && swappiness.canRead()) { + InputStream is = new FileInputStream(procFile); + try { + byte[] buffer = new byte[10]; + int bytes = is.read(buffer); + String setting = new String(buffer, 0, bytes); + setting = setting.trim(); + if (bytes > 0 && Integer.parseInt(setting) > 10) { + log.warn("System swappiness setting is greater than ten (" + setting + ") which can cause time-sensitive operations to be delayed. " + + " Accumulo is time sensitive because it needs to maintain distributed lock agreement."); + } + } finally { + is.close(); + } + } + } catch (Throwable t) { + log.error(t, t); + } + } + }, 1000, 10 * 60 * 1000); + } + + public static String getLocalAddress(String[] args) throws UnknownHostException { + InetAddress result = InetAddress.getLocalHost(); + for (int i = 0; i < args.length - 1; i++) { + if (args[i].equals("-a") || args[i].equals("--address")) { + result = InetAddress.getByName(args[i + 1]); + log.debug("Local address is: " + args[i + 1] + " (" + result.toString() + ")"); + break; + } + } + return result.getHostName(); + } + + public static void waitForZookeeperAndHdfs(FileSystem fs) { + log.info("Attempting to talk to zookeeper"); + while (true) { + try { + ZooReaderWriter.getInstance().getChildren(Constants.ZROOT); + break; + } catch (InterruptedException e) { + // ignored + } catch (KeeperException ex) { + log.info("Waiting for accumulo to be initialized"); + UtilWaitThread.sleep(1000); + } + } + log.info("Zookeeper connected and initialized, attemping to talk to HDFS"); + long sleep = 1000; + while (true) { + try { + if (!isInSafeMode(fs)) + break; + log.warn("Waiting for the NameNode to leave safemode"); + } catch (IOException ex) { + log.warn("Unable to connect to HDFS"); + } + log.info("Sleeping " + sleep / 1000. + " seconds"); + UtilWaitThread.sleep(sleep); + sleep = Math.min(60 * 1000, sleep * 2); + } + log.info("Connected to HDFS"); + } + + private static boolean isInSafeMode(FileSystem fs) throws IOException { + if (!(fs instanceof DistributedFileSystem)) + return false; + DistributedFileSystem dfs = (DistributedFileSystem)fs; + // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET)) + // Becomes this: + Class<?> safeModeAction; + try { + // hadoop 2.0 + safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"); + } catch (ClassNotFoundException ex) { + // hadoop 1.0 + try { + safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot figure out the right class for Constants"); + } + } + Object get = null; + for (Object obj : safeModeAction.getEnumConstants()) { + if (obj.toString().equals("SAFEMODE_GET")) + get = obj; + } + if (get == null) { + throw new RuntimeException("cannot find SAFEMODE_GET"); + } + try { + Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction); + return (Boolean) setSafeMode.invoke(dfs, get); + } catch (Exception ex) { + throw new RuntimeException("cannot find method setSafeMode"); + } + } +}