This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new e5106ef fixes #518 ignore non rfiles in new bulk import (#773) e5106ef is described below commit e5106efb9613468b46b4f94e549d81012631a34b Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Nov 20 08:14:57 2018 -0500 fixes #518 ignore non rfiles in new bulk import (#773) --- .../accumulo/core/clientImpl/BulkImport.java | 46 +++++++++++++++++++--- .../accumulo/test/functional/BulkLoadIT.java | 6 +++ 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java index 426cf0b..3d8da28 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java @@ -312,7 +312,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } } - private static Map<String,Long> getFileLenMap(FileStatus[] statuses) { + private static Map<String,Long> getFileLenMap(List<FileStatus> statuses) { HashMap<String,Long> fileLens = new HashMap<>(); for (FileStatus status : statuses) { fileLens.put(status.getPath().getName(), status.getLen()); @@ -322,7 +322,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti return fileLens; } - private static Cache<String,Long> getPopulatedFileLenCache(Path dir, FileStatus[] statuses) { + private static Cache<String,Long> getPopulatedFileLenCache(Path dir, List<FileStatus> statuses) { Map<String,Long> fileLens = getFileLenMap(statuses); Map<String,Long> absFileLens = new HashMap<>(); @@ -343,8 +343,8 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti Map<String,List<Destination>> fileDestinations = plan.getDestinations().stream() .collect(groupingBy(Destination::getFileName)); - FileStatus[] statuses = fs.listStatus(srcPath, - p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)); + List<FileStatus> statuses = filterInvalid( + fs.listStatus(srcPath, p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING))); Map<String,Long> fileLens = getFileLenMap(statuses); @@ -447,13 +447,47 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } + private static List<FileStatus> filterInvalid(FileStatus[] files) { + ArrayList<FileStatus> fileList = new ArrayList<>(files.length); + + for (FileStatus fileStatus : files) { + + String fname = fileStatus.getPath().getName(); + + if (fname.equals("_SUCCESS") || fname.equals("_logs")) { + log.debug("Ignoring file likely created by map reduce : {}", fileStatus.getPath()); + continue; + } + + if (fileStatus.isDirectory()) { + log.warn("{} is a directory, ignoring.", fileStatus.getPath()); + continue; + } + + String sa[] = fname.split("\\."); + String extension = ""; + if (sa.length > 1) { + extension = sa[sa.length - 1]; + } + + if (!FileOperations.getValidExtensions().contains(extension)) { + log.warn("{} does not have a valid extension, ignoring", fileStatus.getPath()); + continue; + } + + fileList.add(fileStatus); + } + + return fileList; + } + public static SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs, Table.ID tableId, Path dirPath, Executor executor, ClientContext context) throws IOException { KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context); - FileStatus[] files = fs.listStatus(dirPath, - p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)); + List<FileStatus> files = filterInvalid( + fs.listStatus(dirPath, p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING))); // we know all of the file lens, so construct a cache and populate it in order to avoid later // trips to the namenode diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java index 34f2707..24fd475 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java @@ -59,6 +59,7 @@ import org.apache.accumulo.minicluster.MemoryUnit; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; @@ -231,6 +232,11 @@ public class BulkLoadIT extends AccumuloClusterHarness { hashes.put(endRow, new HashSet<>()); } + // Add a junk file, should be ignored + FSDataOutputStream out = fs.create(new Path(dir, "junk")); + out.writeChars("ABCDEFG\n"); + out.close(); + // 1 Tablet 0333-null String h1 = writeData(dir + "/f1.", aconf, 0, 333); hashes.get("0333").add(h1);