This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit f09116d04c870f5f298b251098e548ecf8341ea2
Merge: 121758a40e e4d83cca90
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Feb 26 14:13:05 2025 +0000

    Merge branch '3.1'

 .../core/metadata/schema/TabletMetadata.java       |   2 +-
 .../core/metadata/schema/TabletsMetadata.java      |   3 +-
 .../manager/tableOps/bulkVer2/LoadFiles.java       |  93 ++++--
 .../manager/tableOps/bulkVer2/LoadFilesTest.java   | 312 +++++++++++++++++++++
 .../tableOps/bulkVer2/PrepBulkImportTest.java      |   4 +-
 5 files changed, 388 insertions(+), 26 deletions(-)

diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index ccf1f3a2b5,ba879d6ebd..36667c8c88
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@@ -22,11 -22,9 +22,12 @@@ import static org.apache.accumulo.core.
  import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
  import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
  import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType.CURRENT;
  
+ import java.time.Duration;
  import java.util.ArrayList;
 +import java.util.Collections;
  import java.util.Comparator;
  import java.util.HashMap;
  import java.util.Iterator;
@@@ -60,9 -58,13 +61,10 @@@ import org.apache.accumulo.core.metadat
  import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 -import org.apache.accumulo.core.tabletingest.thrift.DataFileInfo;
 -import 
org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService.Client;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
  import org.apache.accumulo.core.trace.TraceUtil;
 -import org.apache.accumulo.core.util.MapCounter;
  import org.apache.accumulo.core.util.PeekingIterator;
 -import org.apache.accumulo.core.util.TextUtil;
+ import org.apache.accumulo.core.util.Timer;
  import org.apache.accumulo.manager.Manager;
  import org.apache.accumulo.manager.tableOps.ManagerRepo;
  import org.apache.accumulo.server.fs.VolumeManager;
@@@ -93,10 -101,11 +102,11 @@@ class LoadFiles extends ManagerRepo 
    }
  
    @Override
 -  public long isReady(long tid, Manager manager) throws Exception {
 -    log.info("Starting bulk import for {} (tid = {})", bulkInfo.sourceDir, 
FateTxId.formatTid(tid));
 +  public long isReady(FateId fateId, Manager manager) throws Exception {
++    log.info("Starting bulk import for {} (tid = {})", bulkInfo.sourceDir, 
fateId);
      if (manager.onlineTabletServers().isEmpty()) {
 -      log.warn("There are no tablet server to process bulkDir import, waiting 
(tid = "
 -          + FateTxId.formatTid(tid) + ")");
 +      log.warn("There are no tablet server to process bulkDir import, waiting 
(fateId = " + fateId
 +          + ")");
        return 100;
      }
      VolumeManager fs = manager.getVolumeManager();
@@@ -104,154 -113,118 +114,167 @@@
      manager.updateBulkImportStatus(bulkInfo.sourceDir, 
BulkImportState.LOADING);
      try (LoadMappingIterator lmi =
          BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), 
bulkInfo.tableId, fs::open)) {
-       return loadFiles(bulkInfo.tableId, bulkDir, lmi, manager, fateId);
+ 
 -      Loader loader;
 -      if (bulkInfo.tableState == TableState.ONLINE) {
 -        loader = new OnlineLoader();
 -      } else {
 -        loader = new OfflineLoader();
++      Loader loader = new Loader();
++
++      List<ColumnType> fetchCols = new ArrayList<>(List.of(PREV_ROW, 
LOCATION, LOADED, TIME));
++      if (loader.pauseLimit > 0) {
++        fetchCols.add(FILES);
+       }
+ 
+       TabletsMetadataFactory tmf = (startRow) -> 
TabletsMetadata.builder(manager.getContext())
+           .forTable(bulkInfo.tableId).overlapping(startRow, 
null).checkConsistency()
 -          .fetch(PREV_ROW, LOCATION, LOADED).build();
++          .fetch(fetchCols.toArray(new ColumnType[0])).build();
+ 
 -      return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, tid);
++      return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, fateId);
      }
    }
  
    @Override
 -  public Repo<Manager> call(final long tid, final Manager manager) {
 -    if (bulkInfo.tableState == TableState.ONLINE) {
 -      return new CompleteBulkImport(bulkInfo);
 -    } else {
 -      return new CleanUpBulkImport(bulkInfo);
 -    }
 +  public Repo<Manager> call(final FateId fateId, final Manager manager) {
 +    return new RefreshTablets(bulkInfo);
    }
  
-   private static class Loader {
+   // visible for testing
 -  public abstract static class Loader {
++  public static class Loader {
      protected Path bulkDir;
      protected Manager manager;
 -    protected long tid;
 +    protected FateId fateId;
      protected boolean setTime;
 +    Ample.ConditionalTabletsMutator conditionalMutator;
 +    private Map<KeyExtent,List<TabletFile>> loadingFiles;
  
 -    void start(Path bulkDir, Manager manager, long tid, boolean setTime) 
throws Exception {
 +    private long skipped = 0;
 +    private long pauseLimit;
 +
 +    void start(Path bulkDir, Manager manager, TableId tableId, FateId fateId, 
boolean setTime)
 +        throws Exception {
        this.bulkDir = bulkDir;
        this.manager = manager;
 -      this.tid = tid;
 +      this.fateId = fateId;
        this.setTime = setTime;
 +      conditionalMutator = 
manager.getContext().getAmple().conditionallyMutateTablets();
 +      this.skipped = 0;
 +      this.loadingFiles = new HashMap<>();
 +      this.pauseLimit =
 +          
manager.getContext().getTableConfiguration(tableId).getCount(Property.TABLE_FILE_PAUSE);
      }
  
 -    abstract void load(List<TabletMetadata> tablets, Files files) throws 
Exception;
 +    void load(List<TabletMetadata> tablets, Files files) {
  
 -    abstract long finish() throws Exception;
 -  }
 +      Map<ReferencedTabletFile,Bulk.FileInfo> toLoad = new HashMap<>();
 +      for (var fileInfo : files) {
 +        toLoad.put(new ReferencedTabletFile(new Path(bulkDir, 
fileInfo.getFileName())), fileInfo);
 +      }
  
 -  private static class OnlineLoader extends Loader {
 +      // remove any tablets that already have loaded flags
 +      tablets = tablets.stream().filter(tabletMeta -> {
 +        Set<ReferencedTabletFile> loaded = 
tabletMeta.getLoaded().keySet().stream()
 +            .map(StoredTabletFile::getTabletFile).collect(Collectors.toSet());
 +        boolean containsAll = loaded.containsAll(toLoad.keySet());
 +        // The tablet should either contain all loaded files or none. It 
should never contain a
 +        // subset. Loaded files are written in single mutation to accumulo, 
either all changes in a
 +        // mutation should go through or none.
 +        Preconditions.checkState(containsAll || Collections.disjoint(loaded, 
toLoad.keySet()),
 +            "Tablet %s has a subset of loaded files %s %s", 
tabletMeta.getExtent(), loaded,
 +            toLoad.keySet());
 +        if (containsAll) {
 +          log.trace("{} tablet {} has already loaded all files, nothing to 
do", fateId,
 +              tabletMeta.getExtent());
 +        }
 +        return !containsAll;
 +      }).collect(Collectors.toList());
 +
 +      // timestamps from tablets that are hosted on a tablet server
 +      Map<KeyExtent,Long> hostedTimestamps;
 +      if (setTime) {
 +        hostedTimestamps = allocateTimestamps(tablets, toLoad.size());
 +        hostedTimestamps.forEach((e, t) -> {
 +          log.trace("{} allocated timestamp {} {}", fateId, e, t);
 +        });
 +      } else {
 +        hostedTimestamps = Map.of();
 +      }
  
 -    long timeInMillis;
 -    String fmtTid;
 -    int locationLess = 0;
 +      List<ColumnType> rsc = new ArrayList<>();
 +      if (setTime) {
 +        rsc.add(TIME);
 +      }
  
 -    // track how many tablets were sent load messages per tablet server
 -    MapCounter<HostAndPort> loadMsgs;
 +      ColumnType[] requireSameCols = rsc.toArray(new ColumnType[0]);
  
 -    // Each RPC to a tablet server needs to check in zookeeper to see if the 
transaction is still
 -    // active. The purpose of this map is to group load request by tablet 
servers inorder to do less
 -    // RPCs. Less RPCs will result in less calls to Zookeeper.
 -    Map<HostAndPort,Map<TKeyExtent,Map<String,DataFileInfo>>> loadQueue;
 -    private int queuedDataSize = 0;
 +      for (TabletMetadata tablet : tablets) {
 +        // Skip any tablets at the beginning of the loop before any work is 
done.
 +        if (setTime && tablet.getLocation() != null
 +            && !hostedTimestamps.containsKey(tablet.getExtent())) {
 +          skipped++;
 +          log.debug("{} tablet {} did not have a timestamp allocated, will 
retry later", fateId,
 +              tablet.getExtent());
 +          continue;
 +        }
 +        if (pauseLimit > 0 && tablet.getFiles().size() > pauseLimit) {
 +          skipped++;
 +          log.debug(
 +              "{} tablet {} has {} files which exceeds the pause limit of {}, 
not bulk importing and will retry later",
 +              fateId, tablet.getExtent(), tablet.getFiles().size(), 
pauseLimit);
 +          continue;
 +        }
  
 -    @Override
 -    void start(Path bulkDir, Manager manager, long tid, boolean setTime) 
throws Exception {
 -      super.start(bulkDir, manager, tid, setTime);
 +        Map<ReferencedTabletFile,DataFileValue> filesToLoad = new HashMap<>();
  
 -      timeInMillis = 
manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
 -      fmtTid = FateTxId.formatTid(tid);
 +        var tabletTime = TabletTime.getInstance(tablet.getTime());
  
 -      loadMsgs = new MapCounter<>();
 +        Long fileTime = null;
 +        if (setTime) {
 +          if (tablet.getLocation() == null) {
 +            fileTime = tabletTime.getAndUpdateTime();
 +          } else {
 +            fileTime = hostedTimestamps.get(tablet.getExtent());
 +            tabletTime.updateTimeIfGreater(fileTime);
 +          }
 +        }
  
 -      loadQueue = new HashMap<>();
 -    }
 +        for (var entry : toLoad.entrySet()) {
 +          ReferencedTabletFile refTabFile = entry.getKey();
 +          Bulk.FileInfo fileInfo = entry.getValue();
  
 -    private void sendQueued(int threshhold) {
 -      if (queuedDataSize > threshhold || threshhold == 0) {
 -        loadQueue.forEach((server, tabletFiles) -> {
 +          DataFileValue dfv;
  
 -          if (log.isTraceEnabled()) {
 -            log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
 -                tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
 +          if (setTime) {
 +            // This should always be set outside the loop when setTime is 
true and should not be
 +            // null at this point
 +            Preconditions.checkState(fileTime != null);
 +            dfv =
 +                new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries(), fileTime);
 +          } else {
 +            dfv = new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries());
            }
  
 -          Client client = null;
 -          try {
 -            client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, 
server,
 -                manager.getContext(), timeInMillis);
 -            client.loadFiles(TraceUtil.traceInfo(), 
manager.getContext().rpcCreds(), tid,
 -                bulkDir.toString(), tabletFiles, setTime);
 -          } catch (TException ex) {
 -            log.debug("rpc failed server: " + server + ", " + fmtTid + " " + 
ex.getMessage(), ex);
 -          } finally {
 -            ThriftUtil.returnClient(client, manager.getContext());
 -          }
 -        });
 +          filesToLoad.put(refTabFile, dfv);
 +        }
  
 -        loadQueue.clear();
 -        queuedDataSize = 0;
 -      }
 -    }
 +        var tabletMutator = 
conditionalMutator.mutateTablet(tablet.getExtent())
 +            
.requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet())
 +            .requireSame(tablet, LOCATION, requireSameCols);
 +
 +        if (pauseLimit > 0) {
 +          tabletMutator.requireLessOrEqualsFiles(pauseLimit);
 +        }
  
 -    private void addToQueue(HostAndPort server, KeyExtent extent,
 -        Map<String,DataFileInfo> thriftImports) {
 -      if (!thriftImports.isEmpty()) {
 -        loadMsgs.increment(server, 1);
 +        filesToLoad.forEach((f, v) -> {
 +          tabletMutator.putBulkFile(f, fateId);
 +          tabletMutator.putFile(f, v);
 +        });
  
 -        Map<String,DataFileInfo> prev = loadQueue.computeIfAbsent(server, k 
-> new HashMap<>())
 -            .putIfAbsent(extent.toThrift(), thriftImports);
 +        if (setTime) {
 +          tabletMutator.putTime(tabletTime.getMetadataTime());
 +        }
  
 -        Preconditions.checkState(prev == null, "Unexpectedly saw extent %s 
twice", extent);
 +        // Hang on to loaded files for logging purposes in the case where the 
update is success.
 +        Preconditions.checkState(
 +            loadingFiles.put(tablet.getExtent(), 
List.copyOf(filesToLoad.keySet())) == null);
  
 -        // keep a very rough estimate of how much is memory so we can send if 
over a few megs is
 -        // buffered
 -        queuedDataSize += 
thriftImports.keySet().stream().mapToInt(String::length).sum()
 -            + server.getHost().length() + 4 + thriftImports.size() * 32;
 +        tabletMutator.submit(tm -> false, () -> "bulk load files " + fateId);
        }
      }
  
@@@ -350,35 -353,25 +383,28 @@@
     * scan the metadata table getting Tablet range and location information. 
It will return 0 when
     * all files have been loaded.
     */
-   private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator 
loadMapIter,
-       Manager manager, FateId fateId) throws Exception {
+   // visible for testing
+   static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir,
 -      LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, 
Manager manager, long tid)
 -      throws Exception {
++      LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, 
Manager manager,
++      FateId fateId) throws Exception {
      PeekingIterator<Map.Entry<KeyExtent,Bulk.Files>> lmi = new 
PeekingIterator<>(loadMapIter);
      Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.peek();
  
      Text startRow = loadMapEntry.getKey().prevEndRow();
  
 -    String fmtTid = FateTxId.formatTid(tid);
 +    String fmtTid = fateId.getTxUUIDStr();
      log.trace("{}: Starting bulk load at row: {}", fmtTid, startRow);
  
-     Loader loader = new Loader();
-     long t1;
-     loader.start(bulkDir, manager, tableId, fateId, bulkInfo.setTime);
 -    loader.start(bulkDir, manager, tid, bulkInfo.setTime);
++    loader.start(bulkDir, manager, bulkInfo.tableId, fateId, 
bulkInfo.setTime);
  
-     List<ColumnType> fetchCols = new ArrayList<>(List.of(PREV_ROW, LOCATION, 
LOADED, TIME));
-     if (loader.pauseLimit > 0) {
-       fetchCols.add(FILES);
-     }
- 
-     try (TabletsMetadata tabletsMetadata =
-         
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow,
 null)
-             .checkConsistency().fetch(fetchCols.toArray(new 
ColumnType[0])).build()) {
+     ImportTimingStats importTimingStats = new ImportTimingStats();
+     Timer timer = Timer.startNew();
+     try (TabletsMetadata tabletsMetadata = 
factory.newTabletsMetadata(startRow)) {
  
 +      // The tablet iterator and load mapping iterator are both iterating 
over data that is sorted
 +      // in the same way. The two iterators are each independently advanced 
to find common points in
 +      // the sorted data.
-       var tabletIter = tabletsMetadata.iterator();
- 
-       t1 = System.currentTimeMillis();
+       Iterator<TabletMetadata> tabletIter = tabletsMetadata.iterator();
        while (lmi.hasNext()) {
          loadMapEntry = lmi.next();
          List<TabletMetadata> tablets =
@@@ -389,6 -383,16 +416,16 @@@
  
      log.trace("{}: Completed Finding Overlapping Tablets", fmtTid);
  
+     if (importTimingStats.callCount > 0) {
+       log.debug(
+           "Bulk import stats for {} (tid = {}): processed {} tablets in {} 
calls which took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} 
nanos) or {}% of the processing time.",
 -          bulkInfo.sourceDir, FateTxId.formatTid(tid), 
importTimingStats.tabletCount,
 -          importTimingStats.callCount, totalProcessingTime.toMillis(),
 -          totalProcessingTime.toNanos(), importTimingStats.wastedIterations,
 -          importTimingStats.totalWastedTime.toMillis(), 
importTimingStats.totalWastedTime.toNanos(),
++          bulkInfo.sourceDir, fateId, importTimingStats.tabletCount, 
importTimingStats.callCount,
++          totalProcessingTime.toMillis(), totalProcessingTime.toNanos(),
++          importTimingStats.wastedIterations, 
importTimingStats.totalWastedTime.toMillis(),
++          importTimingStats.totalWastedTime.toNanos(),
+           (importTimingStats.totalWastedTime.toNanos() * 100) / 
totalProcessingTime.toNanos());
+     }
+ 
      long sleepTime = loader.finish();
      if (sleepTime > 0) {
        log.trace("{}: Tablet Max Sleep is {}", fmtTid, sleepTime);
diff --cc 
server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
index 0000000000,9afa0ceb57..097325a5ea
mode 000000,100644..100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
@@@ -1,0 -1,304 +1,312 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.manager.tableOps.bulkVer2;
+ 
+ import static 
org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImportTest.nke;
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertNotNull;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ 
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
++import java.util.UUID;
+ 
+ import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo;
+ import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
+ import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
+ import org.apache.accumulo.core.data.TableId;
+ import org.apache.accumulo.core.dataImpl.KeyExtent;
 -import org.apache.accumulo.core.fate.FateTxId;
++import org.apache.accumulo.core.fate.FateId;
++import org.apache.accumulo.core.fate.FateInstanceType;
+ import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+ import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+ import org.apache.accumulo.manager.Manager;
+ import 
org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.ImportTimingStats;
+ import 
org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.TabletsMetadataFactory;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.Text;
+ import org.easymock.EasyMock;
+ import org.junit.jupiter.api.BeforeEach;
+ import org.junit.jupiter.api.Test;
+ 
+ public class LoadFilesTest {
+ 
+   public static class TestTabletsMetadata extends TabletsMetadata {
+ 
+     public TestTabletsMetadata(AutoCloseable closeable, 
Iterable<TabletMetadata> tmi) {
+       super(closeable, tmi);
+     }
+ 
+   }
+ 
+   private static class CaptureLoader extends LoadFiles.Loader {
+ 
+     private static class LoadResult {
+       private final List<TabletMetadata> tablets;
+       private final Files files;
+ 
+       public LoadResult(List<TabletMetadata> tablets, Files files) {
+         super();
+         this.tablets = tablets;
+         this.files = files;
+       }
+ 
+       public List<TabletMetadata> getTablets() {
+         return tablets;
+       }
+ 
+       public Files getFiles() {
+         return files;
+       }
+ 
+     }
+ 
+     private final List<LoadResult> results = new ArrayList<>();
+ 
++    @Override
++    void start(Path bulkDir, Manager manager, TableId tableId, FateId fateId, 
boolean setTime)
++        throws Exception {
++      // override to do nothing
++    }
++
+     @Override
+     void load(List<TabletMetadata> tablets, Files files) {
+       results.add(new LoadResult(tablets, files));
+     }
+ 
+     public List<LoadResult> getLoadResults() {
+       return results;
+     }
+ 
+     @Override
+     long finish() {
+       return 0;
+     }
+ 
+   }
+ 
+   private TableId tid = TableId.of("1");
+   private List<TabletMetadata> tm = new ArrayList<>();
+ 
+   @BeforeEach
+   public void setup() {
+     tm.clear();
+     tm.add(TabletMetadata.create(tid.canonical(), null, "a"));
+     for (int i = 'a'; i < 'z'; i++) {
+       tm.add(TabletMetadata.create(tid.canonical(), "" + (char) (i), "" + 
(char) (i + 1)));
+     }
+     tm.add(TabletMetadata.create(tid.canonical(), "z", null));
+   }
+ 
+   @Test
+   public void testFindOverlappingFiles() {
+ 
 -    String fmtTid = FateTxId.formatTid(1234L);
++    String fmtTid = FateId.from(FateInstanceType.USER, 
UUID.randomUUID()).canonical();
+     var iter = tm.iterator();
+     List<TabletMetadata> tablets = LoadFiles.findOverlappingTablets(fmtTid,
+         new KeyExtent(tid, new Text("c"), null), iter, new 
ImportTimingStats());
+     assertEquals(tm.get(3), iter.next());
+     assertEquals(3, tablets.size());
+     assertEquals(tm.get(0), tablets.get(0));
+     assertEquals(tm.get(1), tablets.get(1));
+     assertEquals(tm.get(2), tablets.get(2));
+ 
+     iter = tm.iterator();
+     tablets = LoadFiles.findOverlappingTablets(fmtTid,
+         new KeyExtent(tid, new Text("o"), new Text("j")), iter, new 
ImportTimingStats());
+     assertEquals(tm.get(tm.size() - 12), iter.next());
+     assertEquals(5, tablets.size());
+     assertEquals(tm.get(tm.size() - 17), tablets.get(tablets.size() - 5));
+     assertEquals(tm.get(tm.size() - 16), tablets.get(tablets.size() - 4));
+     assertEquals(tm.get(tm.size() - 15), tablets.get(tablets.size() - 3));
+     assertEquals(tm.get(tm.size() - 14), tablets.get(tablets.size() - 2));
+     assertEquals(tm.get(tm.size() - 13), tablets.get(tablets.size() - 1));
+ 
+     iter = tm.iterator();
+     tablets = LoadFiles.findOverlappingTablets(fmtTid, new KeyExtent(tid, 
null, new Text("x")),
+         iter, new ImportTimingStats());
+     assertEquals(3, tablets.size());
+     assertEquals(tm.get(tm.size() - 3), tablets.get(tablets.size() - 3));
+     assertEquals(tm.get(tm.size() - 2), tablets.get(tablets.size() - 2));
+     assertEquals(tm.get(tm.size() - 1), tablets.get(tablets.size() - 1));
+     assertTrue(!iter.hasNext());
+ 
+     tablets = LoadFiles.findOverlappingTablets(fmtTid, new KeyExtent(tid, 
null, null),
+         tm.iterator(), new ImportTimingStats());
+     assertEquals(tm, tablets);
+ 
+   }
+ 
+   private Map<String,HashSet<KeyExtent>> 
runLoadFilesLoad(Map<KeyExtent,String> loadRanges)
+       throws Exception {
+ 
+     TabletsMetadata tabletMeta = new TestTabletsMetadata(null, tm);
+     LoadMappingIterator lmi = 
PrepBulkImportTest.createLoadMappingIter(loadRanges);
+     CaptureLoader cl = new CaptureLoader();
+     BulkInfo info = new BulkInfo();
+     TabletsMetadataFactory tmf = (startRow) -> tabletMeta;
 -    long txid = 1234L;
++    FateId txid = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+ 
+     Manager manager = EasyMock.createMock(Manager.class);
+     Path bulkDir = EasyMock.createMock(Path.class);
+     EasyMock.replay(manager, bulkDir);
+ 
+     LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid);
+     EasyMock.verify(manager, bulkDir);
+     List<CaptureLoader.LoadResult> results = cl.getLoadResults();
+     assertEquals(loadRanges.size(), results.size());
+ 
+     Map<String,HashSet<KeyExtent>> loadFileToExtentMap = new HashMap<>();
+     for (CaptureLoader.LoadResult result : results) {
+       for (FileInfo file : result.getFiles()) {
+         HashSet<KeyExtent> extents =
+             loadFileToExtentMap.computeIfAbsent(file.getFileName(), fileName 
-> new HashSet<>());
+         result.getTablets().forEach(m -> extents.add(m.getExtent()));
+       }
+     }
+     return loadFileToExtentMap;
+   }
+ 
+   @Test
+   public void testLoadFilesPartialTable() throws Exception {
+     Map<KeyExtent,String> loadRanges = new HashMap<>();
+     loadRanges.put(nke("c", "g"), "f2 f3");
+     loadRanges.put(nke("l", "n"), "f1 f2 f4 f5");
+     loadRanges.put(nke("r", "w"), "f2 f5");
+ 
+     Map<String,HashSet<KeyExtent>> loadFileToExtentMap = 
runLoadFilesLoad(loadRanges);
+     assertEquals(5, loadFileToExtentMap.size());
+ 
+     HashSet<KeyExtent> extents = loadFileToExtentMap.get("f1");
+     assertNotNull(extents);
+     assertEquals(2, extents.size());
+     assertTrue(extents.contains(nke("l", "m")));
+     assertTrue(extents.contains(nke("m", "n")));
+ 
+     extents = loadFileToExtentMap.get("f2");
+     assertEquals(11, extents.size());
+     assertTrue(extents.contains(nke("c", "d")));
+     assertTrue(extents.contains(nke("d", "e")));
+     assertTrue(extents.contains(nke("e", "f")));
+     assertTrue(extents.contains(nke("f", "g")));
+     assertTrue(extents.contains(nke("l", "m")));
+     assertTrue(extents.contains(nke("m", "n")));
+     assertTrue(extents.contains(nke("r", "s")));
+     assertTrue(extents.contains(nke("s", "t")));
+     assertTrue(extents.contains(nke("t", "u")));
+     assertTrue(extents.contains(nke("u", "v")));
+     assertTrue(extents.contains(nke("v", "w")));
+ 
+     extents = loadFileToExtentMap.get("f3");
+     assertEquals(4, extents.size());
+     assertTrue(extents.contains(nke("c", "d")));
+     assertTrue(extents.contains(nke("d", "e")));
+     assertTrue(extents.contains(nke("e", "f")));
+     assertTrue(extents.contains(nke("f", "g")));
+ 
+     extents = loadFileToExtentMap.get("f4");
+     assertEquals(2, extents.size());
+     assertTrue(extents.contains(nke("l", "m")));
+     assertTrue(extents.contains(nke("m", "n")));
+ 
+     extents = loadFileToExtentMap.get("f5");
+     assertEquals(7, extents.size());
+     assertTrue(extents.contains(nke("l", "m")));
+     assertTrue(extents.contains(nke("m", "n")));
+     assertTrue(extents.contains(nke("r", "s")));
+     assertTrue(extents.contains(nke("s", "t")));
+     assertTrue(extents.contains(nke("t", "u")));
+     assertTrue(extents.contains(nke("u", "v")));
+     assertTrue(extents.contains(nke("v", "w")));
+ 
+   }
+ 
+   @Test
+   public void testLoadFilesEntireTable() throws Exception {
+ 
+     Map<KeyExtent,String> loadRanges = new HashMap<>();
+     loadRanges.put(nke(null, "c"), "f1 f2");
+     loadRanges.put(nke("c", "g"), "f2 f3");
+     loadRanges.put(nke("g", "l"), "f2 f4");
+     loadRanges.put(nke("l", "n"), "f1 f2 f4 f5");
+     loadRanges.put(nke("n", "r"), "f2 f4");
+     loadRanges.put(nke("r", "w"), "f2 f5");
+     loadRanges.put(nke("w", null), "f2 f6");
+ 
+     Map<String,HashSet<KeyExtent>> loadFileToExtentMap = 
runLoadFilesLoad(loadRanges);
+     assertEquals(6, loadFileToExtentMap.size());
+ 
+     HashSet<KeyExtent> extents = loadFileToExtentMap.get("f1");
+     assertNotNull(extents);
+     assertEquals(5, extents.size());
+     assertTrue(extents.contains(nke(null, "a")));
+     assertTrue(extents.contains(nke("a", "b")));
+     assertTrue(extents.contains(nke("b", "c")));
+     assertTrue(extents.contains(nke("l", "m")));
+     assertTrue(extents.contains(nke("m", "n")));
+ 
+     extents = loadFileToExtentMap.get("f2");
+     assertEquals(tm.size(), extents.size());
+     for (TabletMetadata m : tm) {
+       assertTrue(extents.contains(m.getExtent()));
+     }
+ 
+     extents = loadFileToExtentMap.get("f3");
+     assertEquals(4, extents.size());
+     assertTrue(extents.contains(nke("c", "d")));
+     assertTrue(extents.contains(nke("d", "e")));
+     assertTrue(extents.contains(nke("e", "f")));
+     assertTrue(extents.contains(nke("f", "g")));
+ 
+     extents = loadFileToExtentMap.get("f4");
+     assertEquals(11, extents.size());
+     assertTrue(extents.contains(nke("g", "h")));
+     assertTrue(extents.contains(nke("h", "i")));
+     assertTrue(extents.contains(nke("i", "j")));
+     assertTrue(extents.contains(nke("j", "k")));
+     assertTrue(extents.contains(nke("k", "l")));
+     assertTrue(extents.contains(nke("l", "m")));
+     assertTrue(extents.contains(nke("m", "n")));
+     assertTrue(extents.contains(nke("n", "o")));
+     assertTrue(extents.contains(nke("o", "p")));
+     assertTrue(extents.contains(nke("p", "q")));
+     assertTrue(extents.contains(nke("q", "r")));
+ 
+     extents = loadFileToExtentMap.get("f5");
+     assertEquals(7, extents.size());
+     assertTrue(extents.contains(nke("l", "m")));
+     assertTrue(extents.contains(nke("m", "n")));
+     assertTrue(extents.contains(nke("r", "s")));
+     assertTrue(extents.contains(nke("s", "t")));
+     assertTrue(extents.contains(nke("t", "u")));
+     assertTrue(extents.contains(nke("u", "v")));
+     assertTrue(extents.contains(nke("v", "w")));
+ 
+     extents = loadFileToExtentMap.get("f6");
+     assertEquals(4, extents.size());
+     assertTrue(extents.contains(nke("w", "x")));
+     assertTrue(extents.contains(nke("x", "y")));
+     assertTrue(extents.contains(nke("y", "z")));
+     assertTrue(extents.contains(nke("z", null)));
+ 
+   }
+ }

Reply via email to