Updated Branches: refs/heads/master d82888686 -> 069fb1afd
ACCUMULO-1765 made bulk import work w/ multiple namenodes Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/069fb1af Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/069fb1af Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/069fb1af Branch: refs/heads/master Commit: 069fb1afd18a86631a82ce89dca9c5ca06a9646e Parents: d828886 Author: Keith Turner <ktur...@apache.org> Authored: Wed Oct 23 23:05:20 2013 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Wed Oct 23 23:05:34 2013 -0400 ---------------------------------------------------------------------- .../core/client/admin/TableOperationsImpl.java | 53 ++++++++++++++------ .../shell/commands/ImportDirectoryCommand.java | 17 +------ .../accumulo/server/client/BulkImporter.java | 32 ++++++------ .../accumulo/server/fs/VolumeManager.java | 2 - .../accumulo/server/fs/VolumeManagerImpl.java | 21 +++----- .../server/master/tableOps/BulkImport.java | 15 ++---- .../server/client/BulkImporterTest.java | 7 ++- .../tabletserver/TabletServerSyncCheckTest.java | 5 -- .../apache/accumulo/test/AuditMessageTest.java | 6 ++- 9 files changed, 77 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java index 8f14fba..d9319ff 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java @@ -1102,24 +1102,43 @@ public class TableOperationsImpl extends TableOperationsHelper { return ranges; } + private Path checkPath(String dir, String kind, String type) throws IOException, AccumuloException { + Path ret; + FileSystem fs; + + if (dir.contains(":")) { + ret = new Path(dir); + fs = ret.getFileSystem(CachedConfiguration.getInstance()); + } else { + fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration()); + ret = fs.makeQualified(new Path(dir)); + } + + if (!fs.exists(ret)) + throw new AccumuloException(kind + " import " + type + " directory " + dir + " does not exist!"); + + if (!fs.getFileStatus(ret).isDir()) { + throw new AccumuloException(kind + " import " + type + " directory " + dir + " is not a directory!"); + } + + if (type.equals("failure")) { + FileStatus[] listStatus = fs.listStatus(ret); + if (listStatus != null && listStatus.length != 0) { + throw new AccumuloException("Bulk import failure directory " + ret + " is not empty"); + } + } + + return ret; + } + @Override public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloSecurityException, TableNotFoundException, AccumuloException { ArgumentChecker.notNull(tableName, dir, failureDir); - FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration()); - Path dirPath = fs.makeQualified(new Path(dir)); - Path failPath = fs.makeQualified(new Path(failureDir)); - if (!fs.exists(dirPath)) - throw new AccumuloException("Bulk import directory " + dir + " does not exist!"); - if (!fs.exists(failPath)) - throw new AccumuloException("Bulk import failure directory " + failureDir + " does not exist!"); - FileStatus[] listStatus = fs.listStatus(failPath); - if (listStatus != null && listStatus.length != 0) { - if (listStatus.length == 1 && listStatus[0].isDir()) - throw new AccumuloException("Bulk import directory " + failPath + " is a file"); - throw new AccumuloException("Bulk import failure directory " + failPath + " is not empty"); - } + Path dirPath = checkPath(dir, "Bulk", ""); + Path failPath = checkPath(failureDir, "Bulk", "failure"); + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(dirPath.toString().getBytes()), ByteBuffer.wrap(failPath.toString().getBytes()), ByteBuffer.wrap((setTime + "").getBytes())); Map<String,String> opts = new HashMap<String,String>(); @@ -1418,7 +1437,13 @@ public class TableOperationsImpl extends TableOperationsHelper { ArgumentChecker.notNull(tableName, importDir); try { - FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration()); + importDir = checkPath(importDir, "Table", "").toString(); + } catch (IOException e) { + throw new AccumuloException(e); + } + + try { + FileSystem fs = new Path(importDir).getFileSystem(CachedConfiguration.getInstance()); Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE)); for (String propKey : props.keySet()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ImportDirectoryCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ImportDirectoryCommand.java b/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ImportDirectoryCommand.java index 84d2095..13db3a4 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ImportDirectoryCommand.java +++ b/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ImportDirectoryCommand.java @@ -16,19 +16,14 @@ */ package org.apache.accumulo.core.util.shell.commands; -import java.io.FileNotFoundException; import java.io.IOException; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.shell.Shell; import org.apache.accumulo.core.util.shell.Shell.Command; import org.apache.commons.cli.CommandLine; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; public class ImportDirectoryCommand extends Command { @@ -45,17 +40,7 @@ public class ImportDirectoryCommand extends Command { String dir = cl.getArgs()[0]; String failureDir = cl.getArgs()[1]; final boolean setTime = Boolean.parseBoolean(cl.getArgs()[2]); - - final FileSystem fs = FileSystem.get(CachedConfiguration.getInstance()); - FileStatus failStatus = null; - try { - failStatus = fs.getFileStatus(new Path(failureDir)); - } catch (FileNotFoundException ex) { - // ignored - } - if (failStatus == null || !failStatus.isDir() || fs.listStatus(new Path(failureDir)).length != 0) { - throw new AccumuloException(failureDir + " is not an empty directory"); - } + shellState.getConnector().tableOperations().importDirectory(shellState.getTableName(), dir, failureDir, setTime); return 0; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index a04765f..606941d 100644 --- a/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java +++ b/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -60,6 +60,8 @@ import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.StopWatch; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.trace.instrument.TraceRunnable; import org.apache.accumulo.trace.instrument.Tracer; import org.apache.hadoop.conf.Configuration; @@ -115,8 +117,9 @@ public class BulkImporter { timer.start(Timers.TOTAL); Configuration conf = CachedConfiguration.getInstance(); - final FileSystem fs = FileSystem.get(conf); - + VolumeManagerImpl.get(acuConf); + final VolumeManager fs = VolumeManagerImpl.get(acuConf); + Set<Path> paths = new HashSet<Path>(); for (String file : files) { paths.add(new Path(file)); @@ -125,11 +128,6 @@ public class BulkImporter { final Map<Path,List<KeyExtent>> completeFailures = Collections.synchronizedSortedMap(new TreeMap<Path,List<KeyExtent>>()); - if (!fs.exists(failureDir)) { - log.error(failureDir + " does not exist"); - throw new RuntimeException("Directory does not exist " + failureDir); - } - ClientService.Client client = null; final TabletLocator locator = TabletLocator.getLocator(instance, new Text(tableId)); @@ -256,7 +254,7 @@ public class BulkImporter { } } assignmentStats.assignmentsAbandoned(completeFailures); - Set<Path> failedFailures = processFailures(conf, fs, failureDir, completeFailures); + Set<Path> failedFailures = processFailures(completeFailures); assignmentStats.unrecoveredMapFiles(failedFailures); timer.stop(Timers.TOTAL); @@ -292,7 +290,7 @@ public class BulkImporter { log.debug(String.format("Total : %,10.2f secs", timer.getSecs(Timers.TOTAL))); } - private Set<Path> processFailures(Configuration conf, FileSystem fs, Path failureDir, Map<Path,List<KeyExtent>> completeFailures) { + private Set<Path> processFailures(Map<Path,List<KeyExtent>> completeFailures) { // we should check if map file was not assigned to any tablets, then we // should just move it; not currently being done? @@ -330,7 +328,7 @@ public class BulkImporter { return result; } - private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final FileSystem fs, + private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final VolumeManager vm, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) { long t1 = System.currentTimeMillis(); @@ -338,6 +336,7 @@ public class BulkImporter { try { for (Path path : paths) { + FileSystem fs = vm.getFileSystemByPath(path); mapFileSizes.put(path, fs.getContentSummary(path).getLength()); } } catch (IOException e) { @@ -366,6 +365,7 @@ public class BulkImporter { Map<KeyExtent,Long> estimatedSizes = null; try { + FileSystem fs = vm.getFileSystemByPath(entry.getKey()); estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, fs); } catch (IOException e) { log.warn("Failed to estimate map file sizes " + e.getMessage()); @@ -420,7 +420,7 @@ public class BulkImporter { } private Map<Path,List<KeyExtent>> assignMapFiles(AccumuloConfiguration acuConf, Instance instance, Configuration conf, Credentials credentials, - FileSystem fs, String tableId, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) { + VolumeManager fs, String tableId, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) { timer.start(Timers.EXAMINE_MAP_FILES); Map<Path,List<AssignmentInfo>> assignInfo = estimateSizes(acuConf, conf, fs, assignments, paths, numMapThreads); timer.stop(Timers.EXAMINE_MAP_FILES); @@ -594,7 +594,7 @@ public class BulkImporter { for (PathSize pathSize : entry.getValue()) { org.apache.accumulo.core.data.thrift.MapFileInfo mfi = new org.apache.accumulo.core.data.thrift.MapFileInfo(pathSize.estSize); - tabletFiles.put(pathSize.path.toUri().getPath().toString(), mfi); + tabletFiles.put(pathSize.path.toString(), mfi); } } @@ -614,12 +614,13 @@ public class BulkImporter { } } - public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, FileSystem fs, TabletLocator locator, Path file, + public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file, Credentials credentials) throws Exception { return findOverlappingTablets(acuConf, fs, locator, file, null, null, credentials); } - public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, FileSystem fs, TabletLocator locator, Path file, KeyExtent failed, + public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file, + KeyExtent failed, Credentials credentials) throws Exception { locator.invalidateCache(failed); Text start = failed.getPrevEndRow(); @@ -630,12 +631,13 @@ public class BulkImporter { final static byte[] byte0 = {0}; - public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, FileSystem fs, TabletLocator locator, Path file, Text startRow, + public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager vm, TabletLocator locator, Path file, Text startRow, Text endRow, Credentials credentials) throws Exception { List<TabletLocation> result = new ArrayList<TabletLocation>(); Collection<ByteSequence> columnFamilies = Collections.emptyList(); String filename = file.toString(); // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow); + FileSystem fs = vm.getFileSystemByPath(file); FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf); try { Text row = startRow; http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index 133513f..b7787c9 100644 --- a/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -92,8 +92,6 @@ public interface VolumeManager { // return the item in options that is in the same volume as source Path matchingFileSystem(Path source, String[] options); - // create a new path in the same volume as the sourceDir - String newPathOnSameVolume(String sourceDir, String suffix); // forward to the appropriate FileSystem object FileStatus[] listStatus(Path path) throws IOException; http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index b71822b..6c1d956 100644 --- a/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -19,6 +19,7 @@ package org.apache.accumulo.server.fs; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Method; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -396,9 +397,13 @@ public class VolumeManagerImpl implements VolumeManager { @Override public Path matchingFileSystem(Path source, String[] options) { - for (String fs : getFileSystems().keySet()) { - for (String option : options) { - if (option.startsWith(fs)) + URI uri1 = source.toUri(); + for (String option : options) { + URI uri3 = URI.create(option); + if (uri1.getScheme().equals(uri3.getScheme())) { + String a1 = uri1.getAuthority(); + String a2 = uri3.getAuthority(); + if (a1 == a2 || (a1 != null && a1.equals(a2))) return new Path(option); } } @@ -406,16 +411,6 @@ public class VolumeManagerImpl implements VolumeManager { } @Override - public String newPathOnSameVolume(String sourceDir, String suffix) { - for (String fs : getFileSystems().keySet()) { - if (sourceDir.startsWith(fs)) { - return fs + "/" + suffix; - } - } - return null; - } - - @Override public Path getFullPath(String tableId, String path) { if (path.contains(":")) return new Path(path); http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java b/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java index 8839304..e4d707d 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java +++ b/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java @@ -182,18 +182,9 @@ public class BulkImport extends MasterRepo { } private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException { - String tableDir = null; - loop: for (String dir : fs.getFileSystems().keySet()) { - if (this.sourceDir.startsWith(dir)) { - for (String path : ServerConstants.getTablesDirs()) { - if (path.startsWith(dir)) { - tableDir = path; - break loop; - } - } - break; - } - } + + String tableDir = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()).toString(); + if (tableDir == null) throw new IllegalStateException(sourceDir + " is not in a known namespace"); Path directory = new Path(tableDir + "/" + tableId); http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java index 80b3eda..3680341 100644 --- a/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java +++ b/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java @@ -38,6 +38,8 @@ import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -131,7 +133,8 @@ public class BulkImporterTest { writer.append(new Key("iterator", "cf", "cq5"), empty); writer.append(new Key("xyzzy", "cf", "cq"), empty); writer.close(); - List<TabletLocation> overlaps = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file), credentials); + VolumeManager vm = VolumeManagerImpl.get(acuConf); + List<TabletLocation> overlaps = BulkImporter.findOverlappingTablets(acuConf, vm, locator, new Path(file), credentials); Assert.assertEquals(5, overlaps.size()); Collections.sort(overlaps); Assert.assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent); @@ -140,7 +143,7 @@ public class BulkImporterTest { Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps.get(3).tablet_extent); Assert.assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent); - List<TabletLocation> overlaps2 = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text( + List<TabletLocation> overlaps2 = BulkImporter.findOverlappingTablets(acuConf, vm, locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text( "b")), credentials); Assert.assertEquals(3, overlaps2.size()); Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent); http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/server/src/test/java/org/apache/accumulo/server/tabletserver/TabletServerSyncCheckTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/TabletServerSyncCheckTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/TabletServerSyncCheckTest.java index d7cf20e..9450364 100644 --- a/server/src/test/java/org/apache/accumulo/server/tabletserver/TabletServerSyncCheckTest.java +++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/TabletServerSyncCheckTest.java @@ -163,11 +163,6 @@ public class TabletServerSyncCheckTest { } @Override - public String newPathOnSameVolume(String sourceDir, String suffix) { - return null; - } - - @Override public FileStatus[] listStatus(Path path) throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/069fb1af/test/src/test/java/org/apache/accumulo/test/AuditMessageTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/AuditMessageTest.java b/test/src/test/java/org/apache/accumulo/test/AuditMessageTest.java index b9e72dc..5cdf480 100644 --- a/test/src/test/java/org/apache/accumulo/test/AuditMessageTest.java +++ b/test/src/test/java/org/apache/accumulo/test/AuditMessageTest.java @@ -327,8 +327,10 @@ public class AuditMessageTest { .size()); assertEquals(1, findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_EXPORT_AUDIT_TEMPLATE, OLD_TEST_TABLE_NAME, exportDir.toString())).size()); - assertEquals(1, - findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_IMPORT_AUDIT_TEMPLATE, NEW_TEST_TABLE_NAME, exportDir.toString())).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + String.format(AuditedSecurityOperation.CAN_IMPORT_AUDIT_TEMPLATE, NEW_TEST_TABLE_NAME, filePrefix + exportDir.toString())).size()); assertEquals(1, findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_CREATE_TABLE_AUDIT_TEMPLATE, THIRD_TEST_TABLE_NAME)).size()); assertEquals( 1,