This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 46ee51d Minor cleanup in BulkImport (#1769) 46ee51d is described below commit 46ee51d0c6afe9910873f55653017b91c45a345a Author: Mike Miller <mmil...@apache.org> AuthorDate: Wed Nov 4 16:35:25 2020 -0500 Minor cleanup in BulkImport (#1769) * Remove exceptions from BulkImport and ConcurrentKeyExtentCache internal classes that were declared but never thrown * Remove other unused variables * Shorten a few lines that use lambdas --- .../accumulo/core/clientImpl/bulk/BulkImport.java | 35 ++++++---------------- .../clientImpl/bulk/ConcurrentKeyExtentCache.java | 9 ++---- .../bulk/ConcurrentKeyExtentCacheTest.java | 13 ++------ 3 files changed, 14 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index 1a60318..160db69 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.groupingBy; +import static org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.pathToCacheId; import java.io.FileNotFoundException; import java.io.IOException; @@ -75,7 +76,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.fate.util.Retry; @@ -302,35 +302,25 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } public interface KeyExtentCache { - KeyExtent lookup(Text row) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException; + KeyExtent lookup(Text row); } public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache, - FileSKVIterator reader) - throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { - - Text startRow = null; - Text endRow = null; + FileSKVIterator reader) throws IOException { List<KeyExtent> result = new ArrayList<>(); Collection<ByteSequence> columnFamilies = Collections.emptyList(); - Text row = startRow; - if (row == null) - row = new Text(); + Text row = new Text(); while (true) { - // log.debug(filename + " Seeking to row " + row); reader.seek(new Range(row, null), columnFamilies, false); if (!reader.hasTop()) { - // log.debug(filename + " not found"); break; } row = reader.getTopKey().getRow(); KeyExtent extent = extentCache.lookup(row); - // log.debug(filename + " found row " + row + " at location " + tabletLocation); result.add(extent); row = extent.endRow(); - if (row != null && (endRow == null || row.compareTo(endRow) < 0)) { + if (row != null) { row = nextRow(row); } else break; @@ -347,8 +337,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti public static List<KeyExtent> findOverlappingTablets(ClientContext context, KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache, - CryptoService cs) - throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + CryptoService cs) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() .forFile(file.toString(), fs, fs.getConf(), cs) .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) @@ -361,7 +350,6 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti HashMap<String,Long> fileLens = new HashMap<>(); for (FileStatus status : statuses) { fileLens.put(status.getPath().getName(), status.getLen()); - status.getLen(); } return fileLens; @@ -371,9 +359,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti Map<String,Long> fileLens = getFileLenMap(statuses); Map<String,Long> absFileLens = new HashMap<>(); - fileLens.forEach((k, v) -> { - absFileLens.put(CachableBlockFile.pathToCacheId(new Path(dir, k)), v); - }); + fileLens.forEach((k, v) -> absFileLens.put(pathToCacheId(new Path(dir, k)), v)); Cache<String,Long> fileLenCache = CacheBuilder.newBuilder().build(); @@ -440,8 +426,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } private Set<KeyExtent> mapDestinationsToExtents(TableId tableId, KeyExtentCache kec, - List<Destination> destinations) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + List<Destination> destinations) { Set<KeyExtent> extents = new HashSet<>(); for (Destination dest : destinations) { @@ -570,9 +555,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti for (CompletableFuture<Map<KeyExtent,Bulk.FileInfo>> future : futures) { try { Map<KeyExtent,Bulk.FileInfo> pathMapping = future.get(); - pathMapping.forEach((extent, path) -> { - mappings.computeIfAbsent(extent, k -> new Bulk.Files()).add(path); - }); + pathMapping.forEach((ext, fi) -> mappings.computeIfAbsent(ext, k -> new Files()).add(fi)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java index c31695c..901c672 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java @@ -31,9 +31,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.stream.Stream; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.bulk.BulkImport.KeyExtentCache; import org.apache.accumulo.core.data.TableId; @@ -85,15 +82,13 @@ class ConcurrentKeyExtentCache implements KeyExtentCache { } @VisibleForTesting - protected Stream<KeyExtent> lookupExtents(Text row) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + protected Stream<KeyExtent> lookupExtents(Text row) { return TabletsMetadata.builder().forTable(tableId).overlapping(row, null).checkConsistency() .fetch(PREV_ROW).build(ctx).stream().limit(100).map(TabletMetadata::getExtent); } @Override - public KeyExtent lookup(Text row) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + public KeyExtent lookup(Text row) { while (true) { KeyExtent ke = getFromCache(row); if (ke != null) diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCacheTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCacheTest.java index c051499..c17298c 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCacheTest.java @@ -31,9 +31,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.hadoop.io.Text; @@ -92,13 +89,9 @@ public class ConcurrentKeyExtentCacheTest { } private void testLookup(TestCache tc, Text lookupRow) { - try { - KeyExtent extent = tc.lookup(lookupRow); - assertTrue(extent.contains(lookupRow)); - assertTrue(extentsSet.contains(extent)); - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { - throw new RuntimeException(e); - } + KeyExtent extent = tc.lookup(lookupRow); + assertTrue(extent.contains(lookupRow)); + assertTrue(extentsSet.contains(extent)); } @Test