This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f09116d04c870f5f298b251098e548ecf8341ea2 Merge: 121758a40e e4d83cca90 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Feb 26 14:13:05 2025 +0000 Merge branch '3.1' .../core/metadata/schema/TabletMetadata.java | 2 +- .../core/metadata/schema/TabletsMetadata.java | 3 +- .../manager/tableOps/bulkVer2/LoadFiles.java | 93 ++++-- .../manager/tableOps/bulkVer2/LoadFilesTest.java | 312 +++++++++++++++++++++ .../tableOps/bulkVer2/PrepBulkImportTest.java | 4 +- 5 files changed, 388 insertions(+), 26 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index ccf1f3a2b5,ba879d6ebd..36667c8c88 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@@ -22,11 -22,9 +22,12 @@@ import static org.apache.accumulo.core. import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType.CURRENT; + import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@@ -60,9 -58,13 +61,10 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.tabletingest.thrift.DataFileInfo; -import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService.Client; +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.PeekingIterator; -import org.apache.accumulo.core.util.TextUtil; + import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.server.fs.VolumeManager; @@@ -93,10 -101,11 +102,11 @@@ class LoadFiles extends ManagerRepo } @Override - public long isReady(long tid, Manager manager) throws Exception { - log.info("Starting bulk import for {} (tid = {})", bulkInfo.sourceDir, FateTxId.formatTid(tid)); + public long isReady(FateId fateId, Manager manager) throws Exception { ++ log.info("Starting bulk import for {} (tid = {})", bulkInfo.sourceDir, fateId); if (manager.onlineTabletServers().isEmpty()) { - log.warn("There are no tablet server to process bulkDir import, waiting (tid = " - + FateTxId.formatTid(tid) + ")"); + log.warn("There are no tablet server to process bulkDir import, waiting (fateId = " + fateId + + ")"); return 100; } VolumeManager fs = manager.getVolumeManager(); @@@ -104,154 -113,118 +114,167 @@@ manager.updateBulkImportStatus(bulkInfo.sourceDir, BulkImportState.LOADING); try (LoadMappingIterator lmi = BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) { - return loadFiles(bulkInfo.tableId, bulkDir, lmi, manager, fateId); + - Loader loader; - if (bulkInfo.tableState == TableState.ONLINE) { - loader = new OnlineLoader(); - } else { - loader = new OfflineLoader(); ++ Loader loader = new Loader(); ++ ++ List<ColumnType> fetchCols = new ArrayList<>(List.of(PREV_ROW, LOCATION, LOADED, TIME)); ++ if (loader.pauseLimit > 0) { ++ fetchCols.add(FILES); + } + + TabletsMetadataFactory tmf = (startRow) -> TabletsMetadata.builder(manager.getContext()) + .forTable(bulkInfo.tableId).overlapping(startRow, null).checkConsistency() - .fetch(PREV_ROW, LOCATION, LOADED).build(); ++ .fetch(fetchCols.toArray(new ColumnType[0])).build(); + - return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, tid); ++ return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, fateId); } } @Override - public Repo<Manager> call(final long tid, final Manager manager) { - if (bulkInfo.tableState == TableState.ONLINE) { - return new CompleteBulkImport(bulkInfo); - } else { - return new CleanUpBulkImport(bulkInfo); - } + public Repo<Manager> call(final FateId fateId, final Manager manager) { + return new RefreshTablets(bulkInfo); } - private static class Loader { + // visible for testing - public abstract static class Loader { ++ public static class Loader { protected Path bulkDir; protected Manager manager; - protected long tid; + protected FateId fateId; protected boolean setTime; + Ample.ConditionalTabletsMutator conditionalMutator; + private Map<KeyExtent,List<TabletFile>> loadingFiles; - void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception { + private long skipped = 0; + private long pauseLimit; + + void start(Path bulkDir, Manager manager, TableId tableId, FateId fateId, boolean setTime) + throws Exception { this.bulkDir = bulkDir; this.manager = manager; - this.tid = tid; + this.fateId = fateId; this.setTime = setTime; + conditionalMutator = manager.getContext().getAmple().conditionallyMutateTablets(); + this.skipped = 0; + this.loadingFiles = new HashMap<>(); + this.pauseLimit = + manager.getContext().getTableConfiguration(tableId).getCount(Property.TABLE_FILE_PAUSE); } - abstract void load(List<TabletMetadata> tablets, Files files) throws Exception; + void load(List<TabletMetadata> tablets, Files files) { - abstract long finish() throws Exception; - } + Map<ReferencedTabletFile,Bulk.FileInfo> toLoad = new HashMap<>(); + for (var fileInfo : files) { + toLoad.put(new ReferencedTabletFile(new Path(bulkDir, fileInfo.getFileName())), fileInfo); + } - private static class OnlineLoader extends Loader { + // remove any tablets that already have loaded flags + tablets = tablets.stream().filter(tabletMeta -> { + Set<ReferencedTabletFile> loaded = tabletMeta.getLoaded().keySet().stream() + .map(StoredTabletFile::getTabletFile).collect(Collectors.toSet()); + boolean containsAll = loaded.containsAll(toLoad.keySet()); + // The tablet should either contain all loaded files or none. It should never contain a + // subset. Loaded files are written in single mutation to accumulo, either all changes in a + // mutation should go through or none. + Preconditions.checkState(containsAll || Collections.disjoint(loaded, toLoad.keySet()), + "Tablet %s has a subset of loaded files %s %s", tabletMeta.getExtent(), loaded, + toLoad.keySet()); + if (containsAll) { + log.trace("{} tablet {} has already loaded all files, nothing to do", fateId, + tabletMeta.getExtent()); + } + return !containsAll; + }).collect(Collectors.toList()); + + // timestamps from tablets that are hosted on a tablet server + Map<KeyExtent,Long> hostedTimestamps; + if (setTime) { + hostedTimestamps = allocateTimestamps(tablets, toLoad.size()); + hostedTimestamps.forEach((e, t) -> { + log.trace("{} allocated timestamp {} {}", fateId, e, t); + }); + } else { + hostedTimestamps = Map.of(); + } - long timeInMillis; - String fmtTid; - int locationLess = 0; + List<ColumnType> rsc = new ArrayList<>(); + if (setTime) { + rsc.add(TIME); + } - // track how many tablets were sent load messages per tablet server - MapCounter<HostAndPort> loadMsgs; + ColumnType[] requireSameCols = rsc.toArray(new ColumnType[0]); - // Each RPC to a tablet server needs to check in zookeeper to see if the transaction is still - // active. The purpose of this map is to group load request by tablet servers inorder to do less - // RPCs. Less RPCs will result in less calls to Zookeeper. - Map<HostAndPort,Map<TKeyExtent,Map<String,DataFileInfo>>> loadQueue; - private int queuedDataSize = 0; + for (TabletMetadata tablet : tablets) { + // Skip any tablets at the beginning of the loop before any work is done. + if (setTime && tablet.getLocation() != null + && !hostedTimestamps.containsKey(tablet.getExtent())) { + skipped++; + log.debug("{} tablet {} did not have a timestamp allocated, will retry later", fateId, + tablet.getExtent()); + continue; + } + if (pauseLimit > 0 && tablet.getFiles().size() > pauseLimit) { + skipped++; + log.debug( + "{} tablet {} has {} files which exceeds the pause limit of {}, not bulk importing and will retry later", + fateId, tablet.getExtent(), tablet.getFiles().size(), pauseLimit); + continue; + } - @Override - void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception { - super.start(bulkDir, manager, tid, setTime); + Map<ReferencedTabletFile,DataFileValue> filesToLoad = new HashMap<>(); - timeInMillis = manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT); - fmtTid = FateTxId.formatTid(tid); + var tabletTime = TabletTime.getInstance(tablet.getTime()); - loadMsgs = new MapCounter<>(); + Long fileTime = null; + if (setTime) { + if (tablet.getLocation() == null) { + fileTime = tabletTime.getAndUpdateTime(); + } else { + fileTime = hostedTimestamps.get(tablet.getExtent()); + tabletTime.updateTimeIfGreater(fileTime); + } + } - loadQueue = new HashMap<>(); - } + for (var entry : toLoad.entrySet()) { + ReferencedTabletFile refTabFile = entry.getKey(); + Bulk.FileInfo fileInfo = entry.getValue(); - private void sendQueued(int threshhold) { - if (queuedDataSize > threshhold || threshhold == 0) { - loadQueue.forEach((server, tabletFiles) -> { + DataFileValue dfv; - if (log.isTraceEnabled()) { - log.trace("{} asking {} to bulk import {} files for {} tablets", fmtTid, server, - tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size()); + if (setTime) { + // This should always be set outside the loop when setTime is true and should not be + // null at this point + Preconditions.checkState(fileTime != null); + dfv = + new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), fileTime); + } else { + dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()); } - Client client = null; - try { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, server, - manager.getContext(), timeInMillis); - client.loadFiles(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid, - bulkDir.toString(), tabletFiles, setTime); - } catch (TException ex) { - log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(), ex); - } finally { - ThriftUtil.returnClient(client, manager.getContext()); - } - }); + filesToLoad.put(refTabFile, dfv); + } - loadQueue.clear(); - queuedDataSize = 0; - } - } + var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) + .requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet()) + .requireSame(tablet, LOCATION, requireSameCols); + + if (pauseLimit > 0) { + tabletMutator.requireLessOrEqualsFiles(pauseLimit); + } - private void addToQueue(HostAndPort server, KeyExtent extent, - Map<String,DataFileInfo> thriftImports) { - if (!thriftImports.isEmpty()) { - loadMsgs.increment(server, 1); + filesToLoad.forEach((f, v) -> { + tabletMutator.putBulkFile(f, fateId); + tabletMutator.putFile(f, v); + }); - Map<String,DataFileInfo> prev = loadQueue.computeIfAbsent(server, k -> new HashMap<>()) - .putIfAbsent(extent.toThrift(), thriftImports); + if (setTime) { + tabletMutator.putTime(tabletTime.getMetadataTime()); + } - Preconditions.checkState(prev == null, "Unexpectedly saw extent %s twice", extent); + // Hang on to loaded files for logging purposes in the case where the update is success. + Preconditions.checkState( + loadingFiles.put(tablet.getExtent(), List.copyOf(filesToLoad.keySet())) == null); - // keep a very rough estimate of how much is memory so we can send if over a few megs is - // buffered - queuedDataSize += thriftImports.keySet().stream().mapToInt(String::length).sum() - + server.getHost().length() + 4 + thriftImports.size() * 32; + tabletMutator.submit(tm -> false, () -> "bulk load files " + fateId); } } @@@ -350,35 -353,25 +383,28 @@@ * scan the metadata table getting Tablet range and location information. It will return 0 when * all files have been loaded. */ - private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMapIter, - Manager manager, FateId fateId) throws Exception { + // visible for testing + static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, - LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, Manager manager, long tid) - throws Exception { ++ LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, Manager manager, ++ FateId fateId) throws Exception { PeekingIterator<Map.Entry<KeyExtent,Bulk.Files>> lmi = new PeekingIterator<>(loadMapIter); Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.peek(); Text startRow = loadMapEntry.getKey().prevEndRow(); - String fmtTid = FateTxId.formatTid(tid); + String fmtTid = fateId.getTxUUIDStr(); log.trace("{}: Starting bulk load at row: {}", fmtTid, startRow); - Loader loader = new Loader(); - long t1; - loader.start(bulkDir, manager, tableId, fateId, bulkInfo.setTime); - loader.start(bulkDir, manager, tid, bulkInfo.setTime); ++ loader.start(bulkDir, manager, bulkInfo.tableId, fateId, bulkInfo.setTime); - List<ColumnType> fetchCols = new ArrayList<>(List.of(PREV_ROW, LOCATION, LOADED, TIME)); - if (loader.pauseLimit > 0) { - fetchCols.add(FILES); - } - - try (TabletsMetadata tabletsMetadata = - TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null) - .checkConsistency().fetch(fetchCols.toArray(new ColumnType[0])).build()) { + ImportTimingStats importTimingStats = new ImportTimingStats(); + Timer timer = Timer.startNew(); + try (TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow)) { + // The tablet iterator and load mapping iterator are both iterating over data that is sorted + // in the same way. The two iterators are each independently advanced to find common points in + // the sorted data. - var tabletIter = tabletsMetadata.iterator(); - - t1 = System.currentTimeMillis(); + Iterator<TabletMetadata> tabletIter = tabletsMetadata.iterator(); while (lmi.hasNext()) { loadMapEntry = lmi.next(); List<TabletMetadata> tablets = @@@ -389,6 -383,16 +416,16 @@@ log.trace("{}: Completed Finding Overlapping Tablets", fmtTid); + if (importTimingStats.callCount > 0) { + log.debug( + "Bulk import stats for {} (tid = {}): processed {} tablets in {} calls which took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} nanos) or {}% of the processing time.", - bulkInfo.sourceDir, FateTxId.formatTid(tid), importTimingStats.tabletCount, - importTimingStats.callCount, totalProcessingTime.toMillis(), - totalProcessingTime.toNanos(), importTimingStats.wastedIterations, - importTimingStats.totalWastedTime.toMillis(), importTimingStats.totalWastedTime.toNanos(), ++ bulkInfo.sourceDir, fateId, importTimingStats.tabletCount, importTimingStats.callCount, ++ totalProcessingTime.toMillis(), totalProcessingTime.toNanos(), ++ importTimingStats.wastedIterations, importTimingStats.totalWastedTime.toMillis(), ++ importTimingStats.totalWastedTime.toNanos(), + (importTimingStats.totalWastedTime.toNanos() * 100) / totalProcessingTime.toNanos()); + } + long sleepTime = loader.finish(); if (sleepTime > 0) { log.trace("{}: Tablet Max Sleep is {}", fmtTid, sleepTime); diff --cc server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java index 0000000000,9afa0ceb57..097325a5ea mode 000000,100644..100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java @@@ -1,0 -1,304 +1,312 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.manager.tableOps.bulkVer2; + + import static org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImportTest.nke; + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertNotNull; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.util.ArrayList; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; ++import java.util.UUID; + + import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo; + import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files; + import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator; + import org.apache.accumulo.core.data.TableId; + import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; ++import org.apache.accumulo.core.fate.FateId; ++import org.apache.accumulo.core.fate.FateInstanceType; + import org.apache.accumulo.core.metadata.schema.TabletMetadata; + import org.apache.accumulo.core.metadata.schema.TabletsMetadata; + import org.apache.accumulo.manager.Manager; + import org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.ImportTimingStats; + import org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.TabletsMetadataFactory; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.io.Text; + import org.easymock.EasyMock; + import org.junit.jupiter.api.BeforeEach; + import org.junit.jupiter.api.Test; + + public class LoadFilesTest { + + public static class TestTabletsMetadata extends TabletsMetadata { + + public TestTabletsMetadata(AutoCloseable closeable, Iterable<TabletMetadata> tmi) { + super(closeable, tmi); + } + + } + + private static class CaptureLoader extends LoadFiles.Loader { + + private static class LoadResult { + private final List<TabletMetadata> tablets; + private final Files files; + + public LoadResult(List<TabletMetadata> tablets, Files files) { + super(); + this.tablets = tablets; + this.files = files; + } + + public List<TabletMetadata> getTablets() { + return tablets; + } + + public Files getFiles() { + return files; + } + + } + + private final List<LoadResult> results = new ArrayList<>(); + ++ @Override ++ void start(Path bulkDir, Manager manager, TableId tableId, FateId fateId, boolean setTime) ++ throws Exception { ++ // override to do nothing ++ } ++ + @Override + void load(List<TabletMetadata> tablets, Files files) { + results.add(new LoadResult(tablets, files)); + } + + public List<LoadResult> getLoadResults() { + return results; + } + + @Override + long finish() { + return 0; + } + + } + + private TableId tid = TableId.of("1"); + private List<TabletMetadata> tm = new ArrayList<>(); + + @BeforeEach + public void setup() { + tm.clear(); + tm.add(TabletMetadata.create(tid.canonical(), null, "a")); + for (int i = 'a'; i < 'z'; i++) { + tm.add(TabletMetadata.create(tid.canonical(), "" + (char) (i), "" + (char) (i + 1))); + } + tm.add(TabletMetadata.create(tid.canonical(), "z", null)); + } + + @Test + public void testFindOverlappingFiles() { + - String fmtTid = FateTxId.formatTid(1234L); ++ String fmtTid = FateId.from(FateInstanceType.USER, UUID.randomUUID()).canonical(); + var iter = tm.iterator(); + List<TabletMetadata> tablets = LoadFiles.findOverlappingTablets(fmtTid, + new KeyExtent(tid, new Text("c"), null), iter, new ImportTimingStats()); + assertEquals(tm.get(3), iter.next()); + assertEquals(3, tablets.size()); + assertEquals(tm.get(0), tablets.get(0)); + assertEquals(tm.get(1), tablets.get(1)); + assertEquals(tm.get(2), tablets.get(2)); + + iter = tm.iterator(); + tablets = LoadFiles.findOverlappingTablets(fmtTid, + new KeyExtent(tid, new Text("o"), new Text("j")), iter, new ImportTimingStats()); + assertEquals(tm.get(tm.size() - 12), iter.next()); + assertEquals(5, tablets.size()); + assertEquals(tm.get(tm.size() - 17), tablets.get(tablets.size() - 5)); + assertEquals(tm.get(tm.size() - 16), tablets.get(tablets.size() - 4)); + assertEquals(tm.get(tm.size() - 15), tablets.get(tablets.size() - 3)); + assertEquals(tm.get(tm.size() - 14), tablets.get(tablets.size() - 2)); + assertEquals(tm.get(tm.size() - 13), tablets.get(tablets.size() - 1)); + + iter = tm.iterator(); + tablets = LoadFiles.findOverlappingTablets(fmtTid, new KeyExtent(tid, null, new Text("x")), + iter, new ImportTimingStats()); + assertEquals(3, tablets.size()); + assertEquals(tm.get(tm.size() - 3), tablets.get(tablets.size() - 3)); + assertEquals(tm.get(tm.size() - 2), tablets.get(tablets.size() - 2)); + assertEquals(tm.get(tm.size() - 1), tablets.get(tablets.size() - 1)); + assertTrue(!iter.hasNext()); + + tablets = LoadFiles.findOverlappingTablets(fmtTid, new KeyExtent(tid, null, null), + tm.iterator(), new ImportTimingStats()); + assertEquals(tm, tablets); + + } + + private Map<String,HashSet<KeyExtent>> runLoadFilesLoad(Map<KeyExtent,String> loadRanges) + throws Exception { + + TabletsMetadata tabletMeta = new TestTabletsMetadata(null, tm); + LoadMappingIterator lmi = PrepBulkImportTest.createLoadMappingIter(loadRanges); + CaptureLoader cl = new CaptureLoader(); + BulkInfo info = new BulkInfo(); + TabletsMetadataFactory tmf = (startRow) -> tabletMeta; - long txid = 1234L; ++ FateId txid = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + Manager manager = EasyMock.createMock(Manager.class); + Path bulkDir = EasyMock.createMock(Path.class); + EasyMock.replay(manager, bulkDir); + + LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid); + EasyMock.verify(manager, bulkDir); + List<CaptureLoader.LoadResult> results = cl.getLoadResults(); + assertEquals(loadRanges.size(), results.size()); + + Map<String,HashSet<KeyExtent>> loadFileToExtentMap = new HashMap<>(); + for (CaptureLoader.LoadResult result : results) { + for (FileInfo file : result.getFiles()) { + HashSet<KeyExtent> extents = + loadFileToExtentMap.computeIfAbsent(file.getFileName(), fileName -> new HashSet<>()); + result.getTablets().forEach(m -> extents.add(m.getExtent())); + } + } + return loadFileToExtentMap; + } + + @Test + public void testLoadFilesPartialTable() throws Exception { + Map<KeyExtent,String> loadRanges = new HashMap<>(); + loadRanges.put(nke("c", "g"), "f2 f3"); + loadRanges.put(nke("l", "n"), "f1 f2 f4 f5"); + loadRanges.put(nke("r", "w"), "f2 f5"); + + Map<String,HashSet<KeyExtent>> loadFileToExtentMap = runLoadFilesLoad(loadRanges); + assertEquals(5, loadFileToExtentMap.size()); + + HashSet<KeyExtent> extents = loadFileToExtentMap.get("f1"); + assertNotNull(extents); + assertEquals(2, extents.size()); + assertTrue(extents.contains(nke("l", "m"))); + assertTrue(extents.contains(nke("m", "n"))); + + extents = loadFileToExtentMap.get("f2"); + assertEquals(11, extents.size()); + assertTrue(extents.contains(nke("c", "d"))); + assertTrue(extents.contains(nke("d", "e"))); + assertTrue(extents.contains(nke("e", "f"))); + assertTrue(extents.contains(nke("f", "g"))); + assertTrue(extents.contains(nke("l", "m"))); + assertTrue(extents.contains(nke("m", "n"))); + assertTrue(extents.contains(nke("r", "s"))); + assertTrue(extents.contains(nke("s", "t"))); + assertTrue(extents.contains(nke("t", "u"))); + assertTrue(extents.contains(nke("u", "v"))); + assertTrue(extents.contains(nke("v", "w"))); + + extents = loadFileToExtentMap.get("f3"); + assertEquals(4, extents.size()); + assertTrue(extents.contains(nke("c", "d"))); + assertTrue(extents.contains(nke("d", "e"))); + assertTrue(extents.contains(nke("e", "f"))); + assertTrue(extents.contains(nke("f", "g"))); + + extents = loadFileToExtentMap.get("f4"); + assertEquals(2, extents.size()); + assertTrue(extents.contains(nke("l", "m"))); + assertTrue(extents.contains(nke("m", "n"))); + + extents = loadFileToExtentMap.get("f5"); + assertEquals(7, extents.size()); + assertTrue(extents.contains(nke("l", "m"))); + assertTrue(extents.contains(nke("m", "n"))); + assertTrue(extents.contains(nke("r", "s"))); + assertTrue(extents.contains(nke("s", "t"))); + assertTrue(extents.contains(nke("t", "u"))); + assertTrue(extents.contains(nke("u", "v"))); + assertTrue(extents.contains(nke("v", "w"))); + + } + + @Test + public void testLoadFilesEntireTable() throws Exception { + + Map<KeyExtent,String> loadRanges = new HashMap<>(); + loadRanges.put(nke(null, "c"), "f1 f2"); + loadRanges.put(nke("c", "g"), "f2 f3"); + loadRanges.put(nke("g", "l"), "f2 f4"); + loadRanges.put(nke("l", "n"), "f1 f2 f4 f5"); + loadRanges.put(nke("n", "r"), "f2 f4"); + loadRanges.put(nke("r", "w"), "f2 f5"); + loadRanges.put(nke("w", null), "f2 f6"); + + Map<String,HashSet<KeyExtent>> loadFileToExtentMap = runLoadFilesLoad(loadRanges); + assertEquals(6, loadFileToExtentMap.size()); + + HashSet<KeyExtent> extents = loadFileToExtentMap.get("f1"); + assertNotNull(extents); + assertEquals(5, extents.size()); + assertTrue(extents.contains(nke(null, "a"))); + assertTrue(extents.contains(nke("a", "b"))); + assertTrue(extents.contains(nke("b", "c"))); + assertTrue(extents.contains(nke("l", "m"))); + assertTrue(extents.contains(nke("m", "n"))); + + extents = loadFileToExtentMap.get("f2"); + assertEquals(tm.size(), extents.size()); + for (TabletMetadata m : tm) { + assertTrue(extents.contains(m.getExtent())); + } + + extents = loadFileToExtentMap.get("f3"); + assertEquals(4, extents.size()); + assertTrue(extents.contains(nke("c", "d"))); + assertTrue(extents.contains(nke("d", "e"))); + assertTrue(extents.contains(nke("e", "f"))); + assertTrue(extents.contains(nke("f", "g"))); + + extents = loadFileToExtentMap.get("f4"); + assertEquals(11, extents.size()); + assertTrue(extents.contains(nke("g", "h"))); + assertTrue(extents.contains(nke("h", "i"))); + assertTrue(extents.contains(nke("i", "j"))); + assertTrue(extents.contains(nke("j", "k"))); + assertTrue(extents.contains(nke("k", "l"))); + assertTrue(extents.contains(nke("l", "m"))); + assertTrue(extents.contains(nke("m", "n"))); + assertTrue(extents.contains(nke("n", "o"))); + assertTrue(extents.contains(nke("o", "p"))); + assertTrue(extents.contains(nke("p", "q"))); + assertTrue(extents.contains(nke("q", "r"))); + + extents = loadFileToExtentMap.get("f5"); + assertEquals(7, extents.size()); + assertTrue(extents.contains(nke("l", "m"))); + assertTrue(extents.contains(nke("m", "n"))); + assertTrue(extents.contains(nke("r", "s"))); + assertTrue(extents.contains(nke("s", "t"))); + assertTrue(extents.contains(nke("t", "u"))); + assertTrue(extents.contains(nke("u", "v"))); + assertTrue(extents.contains(nke("v", "w"))); + + extents = loadFileToExtentMap.get("f6"); + assertEquals(4, extents.size()); + assertTrue(extents.contains(nke("w", "x"))); + assertTrue(extents.contains(nke("x", "y"))); + assertTrue(extents.contains(nke("y", "z"))); + assertTrue(extents.contains(nke("z", null))); + + } + }