Updated Branches: refs/heads/1.4.5-SNAPSHOT 5c50e42be -> 5fdecd72e refs/heads/1.5.1-SNAPSHOT ea41264a6 -> d183132c4 refs/heads/1.6.0-SNAPSHOT 82437b1db -> 592d7d62a refs/heads/master e1ed7a44a -> 9338b522f
ACCUMULO-2195 fixed bulk import w/ viewfs Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/82437b1d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/82437b1d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/82437b1d Branch: refs/heads/master Commit: 82437b1dbf070cddb11d410a33c0154f1d700c05 Parents: 1496c5f Author: Keith Turner <ktur...@apache.org> Authored: Wed Jan 15 15:40:15 2014 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Wed Jan 15 15:48:07 2014 -0500 ---------------------------------------------------------------------- .../apache/accumulo/server/fs/ViewFSUtils.java | 59 ++++++++++- .../accumulo/server/fs/VolumeManagerImpl.java | 8 ++ .../accumulo/server/util/MetadataTableUtil.java | 10 +- .../accumulo/server/fs/ViewFSUtilsTest.java | 100 +++++++++++++++++++ .../accumulo/master/tableOps/BulkImport.java | 2 +- 5 files changed, 168 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java b/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java index ae7a8ae..34912f3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -27,8 +28,24 @@ import org.apache.hadoop.fs.Path; * */ public class ViewFSUtils { + + public static final String VIEWFS_CLASSNAME = "org.apache.hadoop.fs.viewfs.ViewFileSystem"; + + public static boolean isViewFSSupported() { + try { + Class.forName(VIEWFS_CLASSNAME); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + + public static boolean isViewFS(Path source, Configuration conf) throws IOException { + return isViewFS(source.getFileSystem(conf)); + } + public static boolean isViewFS(FileSystem fs) { - return fs.getClass().getName().equals("org.apache.hadoop.fs.viewfs.ViewFileSystem"); + return fs.getClass().getName().equals(VIEWFS_CLASSNAME); } public static Path resolvePath(FileSystem fs, Path path) throws IOException { @@ -48,4 +65,44 @@ public class ViewFSUtils { throw new IOException(e); } } + + public static Path matchingFileSystem(Path source, String[] options, Configuration conf) throws IOException { + + if (!isViewFS(source, conf)) + throw new IllegalArgumentException("source " + source + " is not view fs"); + + String sourceUriPath = source.toUri().getPath(); + + Path match = null; + int matchPrefixLen = 0; + + // find the option with the longest commmon path prefix + for (String option : options) { + Path optionPath = new Path(option); + if (isViewFS(optionPath, conf)) { + String optionUriPath = optionPath.toUri().getPath(); + + int commonPrefixLen = 0; + for (int i = 0; i < Math.min(sourceUriPath.length(), optionUriPath.length()); i++) { + if (sourceUriPath.charAt(i) == optionUriPath.charAt(i)) { + if (sourceUriPath.charAt(i) == '/') + commonPrefixLen++; + } else { + break; + } + } + + if (commonPrefixLen > matchPrefixLen) { + matchPrefixLen = commonPrefixLen; + match = optionPath; + } else if (match != null && commonPrefixLen == matchPrefixLen && optionPath.depth() < match.depth()) { + // take path with less depth when match perfix length is the same + match = optionPath; + } + } + } + + return match; + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 3e9cc26..eb7a330 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -439,6 +439,14 @@ public class VolumeManagerImpl implements VolumeManager { @Override public Path matchingFileSystem(Path source, String[] options) { + try { + if (ViewFSUtils.isViewFS(source, CachedConfiguration.getInstance())) { + return ViewFSUtils.matchingFileSystem(source, options, CachedConfiguration.getInstance()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + URI uri1 = source.toUri(); for (String option : options) { URI uri3 = URI.create(option); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 26444e0..8a2fe3b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -77,7 +77,6 @@ import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.zookeeper.ZooLock; @@ -271,14 +270,7 @@ public class MetadataTableUtil { } public static Mutation createDeleteMutation(String tableId, String pathToRemove) throws IOException { - if (!pathToRemove.contains(":")) { - if (pathToRemove.startsWith("../")) - pathToRemove = pathToRemove.substring(2); - else - pathToRemove = "/" + tableId + pathToRemove; - } - - Path path = VolumeManagerImpl.get().getFullPath(FileType.TABLE, pathToRemove); + Path path = VolumeManagerImpl.get().getFullPath(tableId, pathToRemove); Mutation delFlag = new Mutation(new Text(MetadataSchema.DeletesSection.getRowPrefix() + path.toString())); delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {})); return delFlag; http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/base/src/test/java/org/apache/accumulo/server/fs/ViewFSUtilsTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/ViewFSUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/ViewFSUtilsTest.java new file mode 100644 index 0000000..52c70c0 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/fs/ViewFSUtilsTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.fs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class ViewFSUtilsTest { + + private String[] shuffle(String ... inputs){ + // code below will modify array + Collections.shuffle(Arrays.asList(inputs)); + return inputs; + } + + @Test + public void testDisjointMountPoints() throws IllegalArgumentException, IOException { + if (ViewFSUtils.isViewFSSupported()) { + Configuration conf = new Configuration(false); + conf.set("fs.viewfs.mounttable.default.link./ns", "file:///tmp/ns"); + conf.set("fs.viewfs.mounttable.default.link./ns1", "file:///tmp/ns1"); + conf.set("fs.viewfs.mounttable.default.link./ns2", "file:///tmp/ns2"); + conf.set("fs.viewfs.mounttable.default.link./ns22", "file:///tmp/ns22"); + + String[] tablesDirs1 = shuffle("viewfs:///ns1/accumulo/tables", "viewfs:///ns2/accumulo/tables", "viewfs:///ns22/accumulo/tables", + "viewfs:///ns/accumulo/tables"); + String[] tablesDirs2 = shuffle("viewfs:/ns1/accumulo/tables", "viewfs:/ns2/accumulo/tables", "viewfs:/ns22/accumulo/tables", + "viewfs:/ns/accumulo/tables"); + + for (String ns : Arrays.asList("ns1", "ns2", "ns22", "ns")) { + Path match = ViewFSUtils.matchingFileSystem(new Path("viewfs:/" + ns + "/bulk_import_01"), tablesDirs2, conf); + Assert.assertEquals(new Path("viewfs:/" + ns + "/accumulo/tables"), match); + + match = ViewFSUtils.matchingFileSystem(new Path("viewfs:///" + ns + "/bulk_import_01"), tablesDirs1, conf); + Assert.assertEquals(new Path("viewfs:/" + ns + "/accumulo/tables"), match); + + match = ViewFSUtils.matchingFileSystem(new Path("viewfs:/" + ns + "/bulk_import_01"), tablesDirs2, conf); + Assert.assertEquals(new Path("viewfs:/" + ns + "/accumulo/tables"), match); + + match = ViewFSUtils.matchingFileSystem(new Path("viewfs:///" + ns + "/bulk_import_01"), tablesDirs1, conf); + Assert.assertEquals(new Path("viewfs:/" + ns + "/accumulo/tables"), match); + } + } + } + + @Test + public void testOverlappingMountPoints() throws IllegalArgumentException, IOException { + if (ViewFSUtils.isViewFSSupported()) { + Configuration conf = new Configuration(false); + conf.set("fs.viewfs.mounttable.default.link./", "file:///tmp/0"); + conf.set("fs.viewfs.mounttable.default.link./ns1", "file:///tmp/1"); + conf.set("fs.viewfs.mounttable.default.link./ns1/A", "file:///tmp/2"); + conf.set("fs.viewfs.mounttable.default.link./ns1/AA", "file:///tmp/3"); + conf.set("fs.viewfs.mounttable.default.link./ns1/C", "file:///tmp/3"); + conf.set("fs.viewfs.mounttable.default.link./ns2", "file:///tmp/3"); + + String[] tablesDirs1 = shuffle("viewfs:///ns1/accumulo/tables", "viewfs:///ns1/A/accumulo/tables", "viewfs:///ns1/AA/accumulo/tables", + "viewfs:///ns1/C/accumulo/tables", "viewfs:///ns2/accumulo/tables", "viewfs:///accumulo/tables"); + String[] tablesDirs2 = shuffle("viewfs:/ns1/accumulo/tables", "viewfs:/ns1/A/accumulo/tables", "viewfs:/ns1/AA/accumulo/tables", + "viewfs:/ns1/C/accumulo/tables", "viewfs:/ns2/accumulo/tables", "viewfs:/accumulo/tables"); + + for (String ns : Arrays.asList("", "/ns1", "/ns1/A", "/ns1/AA", "/ns1/C", "/ns2")) { + Path match = ViewFSUtils.matchingFileSystem(new Path("viewfs:" + ns + "/bulk_import_01"), tablesDirs2, conf); + Assert.assertEquals(new Path("viewfs:" + ns + "/accumulo/tables"), match); + + match = ViewFSUtils.matchingFileSystem(new Path("viewfs://" + ns + "/bulk_import_01"), tablesDirs1, conf); + Assert.assertEquals(new Path("viewfs:" + ns + "/accumulo/tables"), match); + + match = ViewFSUtils.matchingFileSystem(new Path("viewfs:" + ns + "/bulk_import_01"), tablesDirs2, conf); + Assert.assertEquals(new Path("viewfs:" + ns + "/accumulo/tables"), match); + + match = ViewFSUtils.matchingFileSystem(new Path("viewfs://" + ns + "/bulk_import_01"), tablesDirs1, conf); + Assert.assertEquals(new Path("viewfs:" + ns + "/accumulo/tables"), match); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java index 5b85a14..1388c70 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java @@ -305,7 +305,7 @@ class CleanUpBulkImport extends MasterRepo { log.debug("removing the bulk processing flag file in " + bulk); Path bulkDir = new Path(bulk); MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); - MetadataTableUtil.addDeleteEntry(tableId, "/" + bulkDir.getName()); + MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString()); log.debug("removing the metadata table markers for loaded files"); Connector conn = master.getConnector(); MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);