Repository: accumulo Updated Branches: refs/heads/1.6.0-SNAPSHOT 96d866218 -> a8058a69f
ACCUMULO-2049 Made shell use new instance.volumes config Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a8058a69 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a8058a69 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a8058a69 Branch: refs/heads/1.6.0-SNAPSHOT Commit: a8058a69f24ebda32e2f54e52390a98b7f7e1bf0 Parents: 96d8662 Author: Keith Turner <ktur...@apache.org> Authored: Wed Feb 26 17:58:27 2014 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Wed Feb 26 19:48:26 2014 -0500 ---------------------------------------------------------------------- .../core/client/admin/TableOperationsImpl.java | 4 +- .../core/client/impl/OfflineScanner.java | 4 +- .../org/apache/accumulo/core/file/FileUtil.java | 130 ------------------- .../accumulo/core/file/VolumeConfiguration.java | 96 ++++++++++++++ .../accumulo/core/file/rfile/PrintInfo.java | 4 +- .../core/file/rfile/bcfile/PrintInfo.java | 4 +- .../apache/accumulo/core/util/shell/Shell.java | 3 +- .../apache/accumulo/core/zookeeper/ZooUtil.java | 4 +- .../core/util/shell/ShellSetInstanceTest.java | 3 + .../apache/accumulo/server/ServerConstants.java | 80 ++---------- .../accumulo/server/client/BulkImporter.java | 5 +- .../accumulo/server/fs/VolumeManagerImpl.java | 6 +- .../apache/accumulo/server/init/Initialize.java | 19 +-- .../monitor/servlets/DefaultServlet.java | 4 +- .../tserver/BulkFailedCopyProcessor.java | 4 +- .../accumulo/test/functional/BulkFileIT.java | 4 +- 16 files changed, 141 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java index 0245ef1..0b2f10e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java @@ -82,7 +82,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -1139,7 +1139,7 @@ public class TableOperationsImpl extends TableOperationsHelper { @SuppressWarnings("deprecation") private Path checkPath(String dir, String kind, String type) throws IOException, AccumuloException { Path ret; - FileSystem fs = FileUtil.getFileSystem(dir, CachedConfiguration.getInstance(), ServerConfigurationUtil.getConfiguration(instance)); + FileSystem fs = VolumeConfiguration.getFileSystem(dir, CachedConfiguration.getInstance(), ServerConfigurationUtil.getConfiguration(instance)); if (dir.contains(":")) { ret = new Path(dir); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java index c60e153..c90d380 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java @@ -43,7 +43,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; @@ -306,7 +306,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> { // TODO need to close files - ACCUMULO-1303 for (String file : absFiles) { - FileSystem fs = FileUtil.getFileSystem(file, conf, config); + FileSystem fs = VolumeConfiguration.getFileSystem(file, conf, config); FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null); readers.add(reader); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java b/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java deleted file mode 100644 index 0ee16cf..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -public class FileUtil { - - public static class FileInfo { - Key firstKey = new Key(); - Key lastKey = new Key(); - - public FileInfo(Key firstKey, Key lastKey) { - this.firstKey = firstKey; - this.lastKey = lastKey; - } - - public Text getFirstRow() { - return firstKey.getRow(); - } - - public Text getLastRow() { - return lastKey.getRow(); - } - } - - private static final Logger log = Logger.getLogger(FileUtil.class); - - private static class MLong { - public MLong(long i) { - l = i; - } - - long l; - } - - public static Map<KeyExtent,Long> estimateSizes(AccumuloConfiguration acuConf, Path mapFile, long fileSize, List<KeyExtent> extents, Configuration conf, - FileSystem fs) throws IOException { - - long totalIndexEntries = 0; - Map<KeyExtent,MLong> counts = new TreeMap<KeyExtent,MLong>(); - for (KeyExtent keyExtent : extents) - counts.put(keyExtent, new MLong(0)); - - Text row = new Text(); - - FileSKVIterator index = FileOperations.getInstance().openIndex(mapFile.toString(), fs, conf, acuConf); - - try { - while (index.hasTop()) { - Key key = index.getTopKey(); - totalIndexEntries++; - key.getRow(row); - - for (Entry<KeyExtent,MLong> entry : counts.entrySet()) - if (entry.getKey().contains(row)) - entry.getValue().l++; - - index.next(); - } - } finally { - try { - if (index != null) - index.close(); - } catch (IOException e) { - // continue with next file - log.error(e, e); - } - } - - Map<KeyExtent,Long> results = new TreeMap<KeyExtent,Long>(); - for (KeyExtent keyExtent : extents) { - double numEntries = counts.get(keyExtent).l; - if (numEntries == 0) - numEntries = 1; - long estSize = (long) ((numEntries / totalIndexEntries) * fileSize); - results.put(keyExtent, estSize); - } - return results; - } - - public static FileSystem getFileSystem(String path, Configuration conf, AccumuloConfiguration acuconf) throws IOException { - if (path.contains(":")) - return new Path(path).getFileSystem(conf); - else - return getFileSystem(conf, acuconf); - } - - public static FileSystem getFileSystem(Configuration conf, AccumuloConfiguration acuconf) throws IOException { - String uri = acuconf.get(Property.INSTANCE_DFS_URI); - if ("".equals(uri)) - return FileSystem.get(conf); - else - try { - return FileSystem.get(new URI(uri), conf); - } catch (URISyntaxException e) { - throw new IOException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/main/java/org/apache/accumulo/core/file/VolumeConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/VolumeConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/VolumeConfiguration.java new file mode 100644 index 0000000..af9ca1d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/VolumeConfiguration.java @@ -0,0 +1,96 @@ +package org.apache.accumulo.core.file; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class VolumeConfiguration { + + public static FileSystem getFileSystem(String path, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + if (path.contains(":")) + return new Path(path).getFileSystem(conf); + else + return getDefaultFilesystem(conf, acuconf); + } + + public static FileSystem getDefaultFilesystem(Configuration conf, AccumuloConfiguration acuconf) throws IOException { + String uri = acuconf.get(Property.INSTANCE_DFS_URI); + if ("".equals(uri)) + return FileSystem.get(conf); + else + try { + return FileSystem.get(new URI(uri), conf); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + public static String getConfiguredBaseDir(AccumuloConfiguration conf) { + String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); + String dfsUri = conf.get(Property.INSTANCE_DFS_URI); + String baseDir; + + if (dfsUri == null || dfsUri.isEmpty()) { + Configuration hadoopConfig = CachedConfiguration.getInstance(); + try { + baseDir = FileSystem.get(hadoopConfig).getUri().toString() + singleNamespace; + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + if (!dfsUri.contains(":")) + throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_DFS_URI.getKey() + " got " + dfsUri); + baseDir = dfsUri + singleNamespace; + } + return baseDir; + } + + public static String[] getConfiguredBaseDirs(AccumuloConfiguration conf) { + String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); + String ns = conf.get(Property.INSTANCE_VOLUMES); + + String configuredBaseDirs[]; + + if (ns == null || ns.isEmpty()) { + configuredBaseDirs = new String[] {getConfiguredBaseDir(conf)}; + } else { + String namespaces[] = ns.split(","); + String unescapedNamespaces[] = new String[namespaces.length]; + int i = 0; + for (String namespace : namespaces) { + if (!namespace.contains(":")) { + throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace); + } + + try { + // pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char) + unescapedNamespaces[i++] = new Path(new URI(namespace)).toString(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(Property.INSTANCE_VOLUMES.getKey() + " contains " + namespace + " which has a syntax error", e); + } + } + + configuredBaseDirs = prefix(unescapedNamespaces, singleNamespace); + } + + return configuredBaseDirs; + } + + public static String[] prefix(String bases[], String suffix) { + if (suffix.startsWith("/")) + suffix = suffix.substring(1); + String result[] = new String[bases.length]; + for (int i = 0; i < bases.length; i++) { + result[i] = bases[i] + "/" + suffix; + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java index 7cee0f9..4cfefad 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java @@ -25,7 +25,7 @@ import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.hadoop.conf.Configuration; @@ -50,7 +50,7 @@ public class PrintInfo { @SuppressWarnings("deprecation") AccumuloConfiguration aconf = AccumuloConfiguration.getSiteConfiguration(); - FileSystem hadoopFs = FileUtil.getFileSystem(conf, aconf); + FileSystem hadoopFs = VolumeConfiguration.getDefaultFilesystem(conf, aconf); FileSystem localFs = FileSystem.getLocal(conf); Opts opts = new Opts(); opts.parseArgs(PrintInfo.class.getName(), args); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java index 4809d80..f21190e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java @@ -22,7 +22,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.file.rfile.bcfile.BCFile.MetaIndexEntry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -57,7 +57,7 @@ public class PrintInfo { Configuration conf = new Configuration(); @SuppressWarnings("deprecation") AccumuloConfiguration siteConf = AccumuloConfiguration.getSiteConfiguration(); - FileSystem hadoopFs = FileUtil.getFileSystem(conf, siteConf); + FileSystem hadoopFs = VolumeConfiguration.getDefaultFilesystem(conf, siteConf); FileSystem localFs = FileSystem.getLocal(conf); Path path = new Path(args[0]); FileSystem fs; http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java b/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java index 8ea82d4..850816c 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java +++ b/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java @@ -63,6 +63,7 @@ import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.BadArgumentException; @@ -433,7 +434,7 @@ public class Shell extends ShellOptions { if (instanceName == null || keepers == null) { AccumuloConfiguration conf = SiteConfiguration.getInstance(ServerConfigurationUtil.convertClientConfig(DefaultConfiguration.getInstance(), clientConfig)); if (instanceName == null) { - Path instanceDir = new Path(conf.get(Property.INSTANCE_DFS_DIR), "instance_id"); + Path instanceDir = new Path(VolumeConfiguration.getConfiguredBaseDirs(conf)[0], "instance_id"); instanceId = UUID.fromString(ZooUtil.getInstanceIDFromHdfs(instanceDir, conf)); } if (keepers == null) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java index a062602..de1b432 100644 --- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java @@ -23,7 +23,7 @@ import java.net.UnknownHostException; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -49,7 +49,7 @@ public class ZooUtil extends org.apache.accumulo.fate.zookeeper.ZooUtil { public static String getInstanceIDFromHdfs(Path instanceDirectory, AccumuloConfiguration conf) { try { - FileSystem fs = FileUtil.getFileSystem(instanceDirectory.toString(), CachedConfiguration.getInstance(), conf); + FileSystem fs = VolumeConfiguration.getFileSystem(instanceDirectory.toString(), CachedConfiguration.getInstance(), conf); FileStatus[] files = null; try { files = fs.listStatus(instanceDirectory); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/core/src/test/java/org/apache/accumulo/core/util/shell/ShellSetInstanceTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/util/shell/ShellSetInstanceTest.java b/core/src/test/java/org/apache/accumulo/core/util/shell/ShellSetInstanceTest.java index af810ad..5a6cc8a 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/shell/ShellSetInstanceTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/shell/ShellSetInstanceTest.java @@ -171,7 +171,10 @@ public class ShellSetInstanceTest { expect(clientConf.withZkHosts("host1,host2")).andReturn(clientConf); } if (!onlyInstance) { + expect(clientConf.containsKey(Property.INSTANCE_VOLUMES.getKey())).andReturn(false).atLeastOnce(); expect(clientConf.containsKey(Property.INSTANCE_DFS_DIR.getKey())).andReturn(true).atLeastOnce(); + expect(clientConf.containsKey(Property.INSTANCE_DFS_URI.getKey())).andReturn(true).atLeastOnce(); + expect(clientConf.getString(Property.INSTANCE_DFS_URI.getKey())).andReturn("hdfs://nn1").atLeastOnce(); expect(clientConf.getString(Property.INSTANCE_DFS_DIR.getKey())).andReturn("/dfs").atLeastOnce(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java index cca869a..8983d08 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.server; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -24,16 +23,14 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class ServerConstants { @@ -61,65 +58,16 @@ public class ServerConstants { public static synchronized String getDefaultBaseDir() { if (defaultBaseDir == null) { - String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR); - String dfsUri = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI); - String baseDir; - - if (dfsUri == null || dfsUri.isEmpty()) { - Configuration hadoopConfig = CachedConfiguration.getInstance(); - try { - baseDir = FileSystem.get(hadoopConfig).getUri().toString() + singleNamespace; - } catch (IOException e) { - throw new RuntimeException(e); - } - } else { - if (!dfsUri.contains(":")) - throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_DFS_URI.getKey() + " got " + dfsUri); - baseDir = dfsUri + singleNamespace; - } - - defaultBaseDir = new Path(baseDir).toString(); - + defaultBaseDir = new Path(VolumeConfiguration.getConfiguredBaseDir(ServerConfiguration.getSiteConfiguration())).toString(); } return defaultBaseDir; } - public static String[] getConfiguredBaseDirs(AccumuloConfiguration conf) { - String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR); - String ns = conf.get(Property.INSTANCE_VOLUMES); - - String configuredBaseDirs[]; - - if (ns == null || ns.isEmpty()) { - configuredBaseDirs = new String[] {getDefaultBaseDir()}; - } else { - String namespaces[] = ns.split(","); - String unescapedNamespaces[] = new String[namespaces.length]; - int i = 0; - for (String namespace : namespaces) { - if (!namespace.contains(":")) { - throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace); - } - - try { - // pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char) - unescapedNamespaces[i++] = new Path(new URI(namespace)).toString(); - } catch (URISyntaxException e) { - throw new IllegalArgumentException(Property.INSTANCE_VOLUMES.getKey() + " contains " + namespace + " which has a syntax error", e); - } - } - - configuredBaseDirs = prefix(unescapedNamespaces, singleNamespace); - } - - return configuredBaseDirs; - } - // these are functions to delay loading the Accumulo configuration unless we must public static synchronized String[] getBaseDirs() { if (baseDirs == null) { - baseDirs = checkBaseDirs(getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()), false); + baseDirs = checkBaseDirs(VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()), false); } return baseDirs; @@ -168,34 +116,24 @@ public class ServerConstants { return baseDirsList.toArray(new String[baseDirsList.size()]); } - public static String[] prefix(String bases[], String suffix) { - if (suffix.startsWith("/")) - suffix = suffix.substring(1); - String result[] = new String[bases.length]; - for (int i = 0; i < bases.length; i++) { - result[i] = bases[i] + "/" + suffix; - } - return result; - } - public static final String TABLE_DIR = "tables"; public static final String RECOVERY_DIR = "recovery"; public static final String WAL_DIR = "wal"; public static String[] getTablesDirs() { - return prefix(getBaseDirs(), TABLE_DIR); + return VolumeConfiguration.prefix(getBaseDirs(), TABLE_DIR); } public static String[] getRecoveryDirs() { - return prefix(getBaseDirs(), RECOVERY_DIR); + return VolumeConfiguration.prefix(getBaseDirs(), RECOVERY_DIR); } public static String[] getWalDirs() { - return prefix(getBaseDirs(), WAL_DIR); + return VolumeConfiguration.prefix(getBaseDirs(), WAL_DIR); } public static String[] getWalogArchives() { - return prefix(getBaseDirs(), "walogArchive"); + return VolumeConfiguration.prefix(getBaseDirs(), "walogArchive"); } public static Path getInstanceIdLocation() { @@ -209,11 +147,11 @@ public class ServerConstants { } public static String[] getMetadataTableDirs() { - return prefix(getTablesDirs(), MetadataTable.ID); + return VolumeConfiguration.prefix(getTablesDirs(), MetadataTable.ID); } public static String[] getTemporaryDirs() { - return prefix(getBaseDirs(), "tmp"); + return VolumeConfiguration.prefix(getBaseDirs(), "tmp"); } public static synchronized List<Pair<Path,Path>> getVolumeReplacements() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index 7f59eae..dc9acf8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -51,7 +51,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.thrift.TKeyExtent; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.FileUtil; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; @@ -64,6 +63,7 @@ import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.util.FileUtil; import org.apache.accumulo.trace.instrument.TraceRunnable; import org.apache.accumulo.trace.instrument.Tracer; import org.apache.hadoop.conf.Configuration; @@ -368,8 +368,7 @@ public class BulkImporter { Map<KeyExtent,Long> estimatedSizes = null; try { - FileSystem fs = vm.getFileSystemByPath(entry.getKey()); - estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, fs); + estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, vm); } catch (IOException e) { log.warn("Failed to estimate map file sizes " + e.getMessage()); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 165c3b8..80301ef 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -34,7 +34,7 @@ import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -349,8 +349,8 @@ public class VolumeManagerImpl implements VolumeManager { public static VolumeManager get(AccumuloConfiguration conf) throws IOException { Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>(); Configuration hadoopConf = CachedConfiguration.getInstance(); - fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf)); - for (String space : ServerConstants.getConfiguredBaseDirs(conf)) { + fileSystems.put(DEFAULT, VolumeConfiguration.getDefaultFilesystem(hadoopConf, conf)); + for (String space : VolumeConfiguration.getConfiguredBaseDirs(conf)) { if (space.equals(DEFAULT)) throw new IllegalArgumentException(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 8533484..925f602 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.master.thrift.MasterGoalState; @@ -150,7 +151,7 @@ public class Initialize { else fsUri = FileSystem.getDefaultUri(conf).toString(); log.info("Hadoop Filesystem is " + fsUri); - log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()))); + log.info("Accumulo data dirs are " + Arrays.asList(VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()))); log.info("Zookeeper server is " + sconf.get(Property.INSTANCE_ZK_HOST)); log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running"); if (!zookeeperAvailable()) { @@ -172,7 +173,7 @@ public class Initialize { try { if (isInitialized(fs)) { String instanceDfsDir = sconf.get(Property.INSTANCE_DFS_DIR); - log.fatal("It appears the directories " + Arrays.asList(ServerConstants.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration())) + log.fatal("It appears the directories " + Arrays.asList(VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration())) + " were previously initialized."); String instanceVolumes = sconf.get(Property.INSTANCE_VOLUMES); String instanceDfsUri = sconf.get(Property.INSTANCE_DFS_URI); @@ -219,7 +220,7 @@ public class Initialize { UUID uuid = UUID.randomUUID(); // the actual disk locations of the root table and tablets - String[] configuredTableDirs = ServerConstants.prefix(ServerConstants.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()), + String[] configuredTableDirs = VolumeConfiguration.prefix(VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()), ServerConstants.TABLE_DIR); final Path rootTablet = new Path(fs.choose(configuredTableDirs) + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION); try { @@ -295,13 +296,13 @@ public class Initialize { private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, Path rootTablet) throws IOException { FileStatus fstat; - initDirs(fs, uuid, ServerConstants.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()), false); + initDirs(fs, uuid, VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()), false); // the actual disk locations of the metadata table and tablets final Path[] metadataTableDirs = paths(ServerConstants.getMetadataTableDirs()); - String tableMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR)); - String defaultMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION)); + String tableMetadataTabletDir = fs.choose(VolumeConfiguration.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR)); + String defaultMetadataTabletDir = fs.choose(VolumeConfiguration.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION)); // initialize initial metadata config in zookeeper initMetadataConfig(); @@ -552,7 +553,7 @@ public class Initialize { } public static boolean isInitialized(VolumeManager fs) throws IOException { - for (String baseDir : ServerConstants.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration())) { + for (String baseDir : VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration())) { if (fs.exists(new Path(baseDir, ServerConstants.INSTANCE_ID_DIR)) || fs.exists(new Path(baseDir, ServerConstants.VERSION_DIR))) return true; } @@ -563,10 +564,10 @@ public class Initialize { private static void addVolumes(VolumeManager fs) throws IOException { HashSet<String> initializedDirs = new HashSet<String>(); initializedDirs - .addAll(Arrays.asList(ServerConstants.checkBaseDirs(ServerConstants.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()), true))); + .addAll(Arrays.asList(ServerConstants.checkBaseDirs(VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()), true))); HashSet<String> uinitializedDirs = new HashSet<String>(); - uinitializedDirs.addAll(Arrays.asList(ServerConstants.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()))); + uinitializedDirs.addAll(Arrays.asList(VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration()))); uinitializedDirs.removeAll(initializedDirs); Path aBasePath = new Path(initializedDirs.iterator().next()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java index 4aa2869..942f866 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java @@ -33,6 +33,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; import org.apache.accumulo.core.util.Duration; import org.apache.accumulo.core.util.NumUtil; @@ -41,7 +42,6 @@ import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.ZooKeeperStatus; import org.apache.accumulo.monitor.ZooKeeperStatus.ZooKeeperState; import org.apache.accumulo.monitor.util.celltypes.NumberType; -import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; @@ -268,7 +268,7 @@ public class DefaultServlet extends BasicServlet { long totalHdfsBytesUsed = 0l; try { - for (String baseDir : ServerConstants.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration())) { + for (String baseDir : VolumeConfiguration.getConfiguredBaseDirs(ServerConfiguration.getSiteConfiguration())) { final Path basePath = new Path(baseDir); final FileSystem fs = vm.getFileSystemByPath(basePath); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java index 891bea4..e9f1083 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java @@ -50,7 +50,7 @@ public class BulkFailedCopyProcessor implements Processor { Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp"); try { - FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), + FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.VolumeConfiguration.getDefaultFilesystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration())); FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance()); @@ -58,7 +58,7 @@ public class BulkFailedCopyProcessor implements Processor { log.debug("copied " + orig + " to " + dest); } catch (IOException ex) { try { - FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), + FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.VolumeConfiguration.getDefaultFilesystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration())); fs.create(dest).close(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/a8058a69/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java index 5d68155..c8023c0 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java @@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.file.VolumeConfiguration; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -53,7 +53,7 @@ public class BulkFileIT extends SimpleMacIT { c.tableOperations().addSplits(tableName, splits); Configuration conf = new Configuration(); AccumuloConfiguration aconf = ServerConfiguration.getDefaultConfiguration(); - FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, aconf)); + FileSystem fs = TraceFileSystem.wrap(VolumeConfiguration.getDefaultFilesystem(conf, aconf)); String dir = rootPath() + "/bulk_test_diff_files_89723987592_" + getTableNames(1)[0];