ACCUMULO-2854 Added additional check to prevent NPE Signed-off-by: Bill Havanki <bhava...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/079ef51c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/079ef51c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/079ef51c Branch: refs/heads/master Commit: 079ef51c7c254f1f7bd7bd4d83ea405ae635b433 Parents: 5c409b0 Author: Jeffrey S. Schwartz <j...@schwartech.com> Authored: Thu Jun 12 20:07:54 2014 -0400 Committer: Bill Havanki <bhava...@cloudera.com> Committed: Wed Jun 18 17:07:38 2014 -0400 ---------------------------------------------------------------------- .../accumulo/master/tableOps/BulkImport.java | 150 ++++++++++--------- 1 file changed, 76 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/079ef51c/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java index bdc89dd..e42fee6 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java @@ -106,28 +106,28 @@ import org.apache.thrift.TException; public class BulkImport extends MasterRepo { public static final String FAILURES_TXT = "failures.txt"; - + private static final long serialVersionUID = 1L; - + private static final Logger log = Logger.getLogger(BulkImport.class); - + private String tableId; private String sourceDir; private String errorDir; private boolean setTime; - + public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) { this.tableId = tableId; this.sourceDir = sourceDir; this.errorDir = errorDir; this.setTime = setTime; } - + @Override public long isReady(long tid, Master master) throws Exception { if (!Utils.getReadLock(tableId, tid).tryLock()) return 100; - + Instance instance = HdfsZooInstance.getInstance(); Tables.clearCache(instance); if (Tables.getTableState(instance, tableId) == TableState.ONLINE) { @@ -140,18 +140,18 @@ public class BulkImport extends MasterRepo { throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null); } } - + @Override //TODO Remove deprecation warning suppression when Hadoop1 support is dropped @SuppressWarnings("deprecation") public Repo<Master> call(long tid, Master master) throws Exception { log.debug(" tid " + tid + " sourceDir " + sourceDir); - + Utils.getReadLock(tableId, tid).lock(); - + // check that the error directory exists and is empty VolumeManager fs = master.getFileSystem(); - + Path errorPath = new Path(errorDir); FileStatus errorStatus = null; try { @@ -168,9 +168,9 @@ public class BulkImport extends MasterRepo { if (fs.listStatus(errorPath).length != 0) throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir + " is not empty"); - + ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid); - + // move the files into the directory try { String bulkDir = prepareBulkImport(fs, sourceDir, tableId); @@ -182,24 +182,26 @@ public class BulkImport extends MasterRepo { + ex); } } - + private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException { - - String tableDir = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()).toString(); - + Path tempPath = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()); + if (tempPath == null) + throw new IllegalStateException(sourceDir + " is not in a known namespace"); + + String tableDir = tempPath.toString(); if (tableDir == null) throw new IllegalStateException(sourceDir + " is not in a known namespace"); Path directory = new Path(tableDir + "/" + tableId); fs.mkdirs(directory); - + // only one should be able to create the lock file // the purpose of the lock file is to avoid a race // condition between the call to fs.exists() and // fs.mkdirs()... if only hadoop had a mkdir() function // that failed when the dir existed - + UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); - + while (true) { Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName()); if (fs.exists(newBulkDir)) // sanity check @@ -207,7 +209,7 @@ public class BulkImport extends MasterRepo { if (fs.mkdirs(newBulkDir)) return newBulkDir; log.warn("Failed to create " + newBulkDir + " for unknown reason"); - + UtilWaitThread.sleep(3000); } } @@ -216,20 +218,20 @@ public class BulkImport extends MasterRepo { @SuppressWarnings("deprecation") private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException { Path bulkDir = createNewBulkDir(fs, tableId); - + MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); - + Path dirPath = new Path(dir); FileStatus[] mapFiles = fs.listStatus(dirPath); - + UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); - + for (FileStatus fileStatus : mapFiles) { String sa[] = fileStatus.getPath().getName().split("\\."); String extension = ""; if (sa.length > 1) { extension = sa[sa.length - 1]; - + if (!FileOperations.getValidExtensions().contains(extension)) { log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring"); continue; @@ -238,13 +240,13 @@ public class BulkImport extends MasterRepo { // assume it is a map file extension = Constants.MAPFILE_EXTENSION; } - + if (extension.equals(Constants.MAPFILE_EXTENSION)) { if (!fileStatus.isDir()) { log.warn(fileStatus.getPath() + " is not a map file, ignoring"); continue; } - + if (fileStatus.getPath().getName().equals("_logs")) { log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping"); continue; @@ -260,7 +262,7 @@ public class BulkImport extends MasterRepo { continue; } } - + String newName = "I" + namer.getNextName() + "." + extension; Path newPath = new Path(bulkDir, newName); try { @@ -272,7 +274,7 @@ public class BulkImport extends MasterRepo { } return bulkDir.toString(); } - + @Override public void undo(long tid, Master environment) throws Exception { // unreserve source/error directories @@ -283,23 +285,23 @@ public class BulkImport extends MasterRepo { } class CleanUpBulkImport extends MasterRepo { - + private static final long serialVersionUID = 1L; - + private static final Logger log = Logger.getLogger(CleanUpBulkImport.class); - + private String tableId; private String source; private String bulk; private String error; - + public CleanUpBulkImport(String tableId, String source, String bulk, String error) { this.tableId = tableId; this.source = source; this.bulk = bulk; this.error = error; } - + @Override public Repo<Master> call(long tid, Master master) throws Exception { log.debug("removing the bulk processing flag file in " + bulk); @@ -320,21 +322,21 @@ class CleanUpBulkImport extends MasterRepo { } class CompleteBulkImport extends MasterRepo { - + private static final long serialVersionUID = 1L; - + private String tableId; private String source; private String bulk; private String error; - + public CompleteBulkImport(String tableId, String source, String bulk, String error) { this.tableId = tableId; this.source = source; this.bulk = bulk; this.error = error; } - + @Override public Repo<Master> call(long tid, Master master) throws Exception { ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid); @@ -343,21 +345,21 @@ class CompleteBulkImport extends MasterRepo { } class CopyFailed extends MasterRepo { - + private static final long serialVersionUID = 1L; - + private String tableId; private String source; private String bulk; private String error; - + public CopyFailed(String tableId, String source, String bulk, String error) { this.tableId = tableId; this.source = source; this.bulk = bulk; this.error = error; } - + @Override public long isReady(long tid, Master master) throws Exception { Set<TServerInstance> finished = new HashSet<TServerInstance>(); @@ -375,19 +377,19 @@ class CopyFailed extends MasterRepo { return 0; return 500; } - + @Override public Repo<Master> call(long tid, Master master) throws Exception { // This needs to execute after the arbiter is stopped - + VolumeManager fs = master.getFileSystem(); - + if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT))) return new CleanUpBulkImport(tableId, source, bulk, error); - + HashMap<String,String> failures = new HashMap<String,String>(); HashMap<String,String> loadedFailures = new HashMap<String,String>(); - + FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT)); BufferedReader in = new BufferedReader(new InputStreamReader(failFile, Constants.UTF8)); try { @@ -400,18 +402,18 @@ class CopyFailed extends MasterRepo { } finally { failFile.close(); } - + /* * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that * have no loaded markers. */ - + // determine which failed files were loaded Connector conn = master.getConnector(); Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - + for (Entry<Key,Value> entry : mscanner) { if (Long.parseLong(entry.getValue().toString()) == tid) { String loadedFile = entry.getKey().getColumnQualifier().toString(); @@ -421,7 +423,7 @@ class CopyFailed extends MasterRepo { } } } - + // move failed files that were not loaded for (String failure : failures.values()) { Path orig = new Path(failure); @@ -429,47 +431,47 @@ class CopyFailed extends MasterRepo { fs.rename(orig, dest); log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed"); } - + if (loadedFailures.size() > 0) { DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ); - + HashSet<String> workIds = new HashSet<String>(); - + for (String failure : loadedFailures.values()) { Path orig = new Path(failure); Path dest = new Path(error, orig.getName()); - + if (fs.exists(dest)) continue; - + bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(Constants.UTF8)); workIds.add(orig.getName()); log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed"); } - + bifCopyQueue.waitUntilDone(workIds); } - + fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT)); return new CleanUpBulkImport(tableId, source, bulk, error); } - + } class LoadFiles extends MasterRepo { - + private static final long serialVersionUID = 1L; - + private static ExecutorService threadPool = null; private static final Logger log = Logger.getLogger(BulkImport.class); - + private String tableId; private String source; private String bulk; private String errorDir; private boolean setTime; - + public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { this.tableId = tableId; this.source = source; @@ -477,7 +479,7 @@ class LoadFiles extends MasterRepo { this.errorDir = errorDir; this.setTime = setTime; } - + @Override public long isReady(long tid, Master master) throws Exception { if (master.onlineTabletServers().size() == 0) @@ -505,7 +507,7 @@ class LoadFiles extends MasterRepo { files.add(entry); } log.debug("tid " + tid + " importing " + files.size() + " files"); - + Path writable = new Path(this.errorDir, ".iswritable"); if (!fs.createNewFile(writable)) { // Maybe this is a re-try... clear the flag and try again @@ -515,22 +517,22 @@ class LoadFiles extends MasterRepo { "Unable to write to " + this.errorDir); } fs.delete(writable); - + final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); for (FileStatus f : files) filesToLoad.add(f.getPath().toString()); - + final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { List<Future<List<String>>> results = new ArrayList<Future<List<String>>>(); - + if (master.onlineTabletServers().size() == 0) log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); - + while (master.onlineTabletServers().size() == 0) { UtilWaitThread.sleep(500); } - + // Use the threadpool to assign files one-at-a-time to the server final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); for (final String file : filesToLoad) { @@ -575,7 +577,7 @@ class LoadFiles extends MasterRepo { UtilWaitThread.sleep(100); } } - + FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true); BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, Constants.UTF8)); try { @@ -586,11 +588,11 @@ class LoadFiles extends MasterRepo { } finally { out.close(); } - + // return the next step, which will perform cleanup return new CompleteBulkImport(tableId, source, bulk, errorDir); } - + static String sampleList(Collection<?> potentiallyLongList, int max) { StringBuffer result = new StringBuffer(); result.append("["); @@ -610,5 +612,5 @@ class LoadFiles extends MasterRepo { result.append("]"); return result.toString(); } - + }