This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 7a27d40463 Offers new ways of computing bulk load plans (#4898)
7a27d40463 is described below

commit 7a27d4046323203926122f447009e36c1ae88c75
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Feb 1 15:51:59 2025 -0500

    Offers new ways of computing bulk load plans (#4898)
    
    Two new ways of computing bulk import load plans are offered in these 
change.  First the RFile API was modified to support computing a LoadPlan as 
the RFile is written.  Second a new LoadPlan.compute() method was added that 
creates a LoadPlan from an existing RFile. In addition to these changes methods 
were added to LoadPlan that support serializing and deserializing load plans 
to/from json.
    
    All of these changes together support the use case of computing load plans 
in a distributed manner. For example, with a bulk import directory with N files 
the following use case is now supported.
    
     1. For eack file a task is spun up on a remote server that calls the new 
LoadPlan.compute() API to determine what tablets the file overlaps. Then the 
new LoadPlan.toJson() method is called to serialize the load plan and send it 
to a central place.
     2. All the load plans from the remote servers are deserialized calling the 
new LoadPlan.fromJson() method and merged into a single load plan that is used 
to do the bulk import.
    
    Another use case these new APIs could support is running this new code in 
the map reduce job that generates bulk import data.
    
      1. In each reducer as it writes to an rfile it could also be building a 
LoadPlan.  A load plan can be obtained from the Rfile after closing it and 
serialized using LoadPlan.toJson() and the result saved to a file.  So after 
the map reduce job completes each rfile would have  corresponding file with a 
load plan for that file.
      2. Another process that runs after the map reduce job can load all the 
load plans from files and merge them using the new LoadPlan.fromJson() method.  
Then the merged LoadPlan can be used to do the bulk import.
    
    
    Both of these use cases avoid doing the analysis of files on a single 
machine doing the bulk import.  Bulk import V1 had this functionality and would 
ask random tservers to do the file analysis.  This could cause unexpected load 
on those tservers.  Bulk V1 would interleave analyzing files and adding them to 
tablets.  This could lead to odd situations where files are partially imported 
to some tablets and analysis fails, leaving the file partially imported.  Bulk 
v2 does all analysis b [...]
    
    
    Co-authored-by: Daniel Roberts <ddani...@gmail.com>
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../java/org/apache/accumulo/core/Constants.java   |   1 +
 .../core/client/rfile/LoadPlanCollector.java       | 131 +++++++++++
 .../apache/accumulo/core/client/rfile/RFile.java   |  10 +
 .../accumulo/core/client/rfile/RFileWriter.java    |  36 ++-
 .../core/client/rfile/RFileWriterBuilder.java      |  35 ++-
 .../accumulo/core/clientImpl/bulk/BulkImport.java  |  39 +++-
 .../org/apache/accumulo/core/data/LoadPlan.java    | 242 ++++++++++++++++++++-
 .../apache/accumulo/core/file/FileOperations.java  |   5 +
 .../core/client/rfile/RFileClientTest.java         | 210 ++++++++++++++++++
 .../apache/accumulo/core/data/LoadPlanTest.java    |  48 ++++
 .../apache/accumulo/core/file/rfile/RFileTest.java |   1 -
 .../apache/accumulo/test/functional/BulkNewIT.java |  61 +++++-
 12 files changed, 792 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java 
b/core/src/main/java/org/apache/accumulo/core/Constants.java
index c31e205585..1caf157e90 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -104,6 +104,7 @@ public class Constants {
   public static final String BULK_PREFIX = "b-";
   public static final String BULK_RENAME_FILE = "renames.json";
   public static final String BULK_LOAD_MAPPING = "loadmap.json";
+  public static final String BULK_WORKING_PREFIX = "accumulo-bulk-";
 
   public static final String CLONE_PREFIX = "c-";
   public static final byte[] CLONE_PREFIX_BYTES = CLONE_PREFIX.getBytes(UTF_8);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java
 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java
new file mode 100644
index 0000000000..511e3fed51
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client.rfile;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+class LoadPlanCollector {
+
+  private final LoadPlan.SplitResolver splitResolver;
+  private boolean finished = false;
+  private Text lgFirstRow;
+  private Text lgLastRow;
+  private Text firstRow;
+  private Text lastRow;
+  private Set<KeyExtent> overlappingExtents;
+  private KeyExtent currentExtent;
+  private long appended = 0;
+
+  LoadPlanCollector(LoadPlan.SplitResolver splitResolver) {
+    this.splitResolver = splitResolver;
+    this.overlappingExtents = new HashSet<>();
+  }
+
+  LoadPlanCollector() {
+    splitResolver = null;
+    this.overlappingExtents = null;
+
+  }
+
+  private void appendNoSplits(Key key) {
+    if (lgFirstRow == null) {
+      lgFirstRow = key.getRow();
+      lgLastRow = lgFirstRow;
+    } else {
+      var row = key.getRow();
+      lgLastRow = row;
+    }
+  }
+
+  private static final TableId FAKE_ID = TableId.of("123");
+
+  private void appendSplits(Key key) {
+    var row = key.getRow();
+    if (currentExtent == null || !currentExtent.contains(row)) {
+      var tableSplits = splitResolver.apply(row);
+      var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), 
tableSplits.getPrevRow());
+      Preconditions.checkState(extent.contains(row), "%s does not contain %s", 
tableSplits, row);
+      if (currentExtent != null) {
+        overlappingExtents.add(currentExtent);
+      }
+      currentExtent = extent;
+    }
+  }
+
+  public void append(Key key) {
+    if (splitResolver == null) {
+      appendNoSplits(key);
+    } else {
+      appendSplits(key);
+    }
+    appended++;
+  }
+
+  public void startLocalityGroup() {
+    if (lgFirstRow != null) {
+      if (firstRow == null) {
+        firstRow = lgFirstRow;
+        lastRow = lgLastRow;
+      } else {
+        // take the minimum
+        firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow;
+        // take the maximum
+        lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow;
+      }
+      lgFirstRow = null;
+      lgLastRow = null;
+    }
+  }
+
+  public LoadPlan getLoadPlan(String filename) {
+    Preconditions.checkState(finished, "Attempted to get load plan before 
closing");
+
+    if (appended == 0) {
+      return LoadPlan.builder().build();
+    }
+
+    if (splitResolver == null) {
+      return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, 
firstRow, lastRow)
+          .build();
+    } else {
+      var builder = LoadPlan.builder();
+      overlappingExtents.add(currentExtent);
+      for (var extent : overlappingExtents) {
+        builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, 
extent.prevEndRow(),
+            extent.endRow());
+      }
+      return builder.build();
+    }
+  }
+
+  public void close() {
+    finished = true;
+    // compute the overall min and max rows
+    startLocalityGroup();
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
index 3b6d10aade..71c186f3eb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.client.summary.Summary;
 import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.fs.FileSystem;
@@ -428,6 +429,15 @@ public class RFile {
      */
     WriterOptions withVisibilityCacheSize(int maxSize);
 
+    /**
+     * @param splitResolver builds a {@link LoadPlan} using table split points 
provided by the given
+     *        splitResolver.
+     * @return this
+     * @see RFileWriter#getLoadPlan(String)
+     * @since 2.1.4
+     */
+    WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver);
+
     /**
      * @return a new RfileWriter created with the options previously specified.
      */
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
index b4d6def4a2..ffa43e3bc4 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.security.ColumnVisibility;
@@ -92,12 +93,15 @@ public class RFileWriter implements AutoCloseable {
 
   private final FileSKVWriter writer;
   private final LRUMap<ByteSequence,Boolean> validVisibilities;
+
+  private final LoadPlanCollector loadPlanCollector;
   private boolean startedLG;
   private boolean startedDefaultLG;
 
-  RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) {
+  RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector 
loadPlanCollector) {
     this.writer = fileSKVWriter;
     this.validVisibilities = new LRUMap<>(visCacheSize);
+    this.loadPlanCollector = loadPlanCollector;
   }
 
   private void _startNewLocalityGroup(String name, Set<ByteSequence> 
columnFamilies)
@@ -106,6 +110,7 @@ public class RFileWriter implements AutoCloseable {
         "Cannot start a locality group after starting the default locality 
group");
     writer.startNewLocalityGroup(name, columnFamilies);
     startedLG = true;
+    loadPlanCollector.startLocalityGroup();
   }
 
   /**
@@ -175,6 +180,7 @@ public class RFileWriter implements AutoCloseable {
 
   public void startDefaultLocalityGroup() throws IOException {
     Preconditions.checkState(!startedDefaultLG);
+    loadPlanCollector.startLocalityGroup();
     writer.startDefaultLocalityGroup();
     startedDefaultLG = true;
     startedLG = true;
@@ -204,6 +210,7 @@ public class RFileWriter implements AutoCloseable {
       validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, 
cv.length)), Boolean.TRUE);
     }
     writer.append(key, val);
+    loadPlanCollector.append(key);
   }
 
   /**
@@ -249,6 +256,31 @@ public class RFileWriter implements AutoCloseable {
 
   @Override
   public void close() throws IOException {
-    writer.close();
+    try {
+      writer.close();
+    } finally {
+      loadPlanCollector.close();
+    }
+  }
+
+  /**
+   * If no split resolver was provided when the RFileWriter was built then 
this method will return a
+   * simple load plan of type {@link 
org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using
+   * the first and last row seen. If a splitResolver was provided then this 
will return a load plan
+   * of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} 
that has the split
+   * ranges the rows written overlapped.
+   *
+   * @param filename This file name will be used in the load plan and it 
should match the name that
+   *        will be used when bulk importing this file. Only a filename is 
needed, not a full path.
+   * @return load plan computed from the keys written to the rfile.
+   * @see 
org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver)
+   * @since 2.1.4
+   * @throws IllegalStateException is attempting to get load plan before 
calling {@link #close()}
+   * @throws IllegalArgumentException is a full path is passed instead of a 
filename
+   */
+  public LoadPlan getLoadPlan(String filename) {
+    Preconditions.checkArgument(!filename.contains("/"),
+        "Unexpected path %s seen instead of file name", filename);
+    return loadPlanCollector.getLoadPlan(filename);
   }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
index 6382d568b5..5b12f4d73b 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.core.client.rfile;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -26,7 +27,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions;
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
+import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.metadata.ValidationUtil;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -72,6 +73,7 @@ class RFileWriterBuilder implements RFile.OutputArguments, 
RFile.WriterFSOptions
   private int visCacheSize = 1000;
   private Map<String,String> samplerProps = Collections.emptyMap();
   private Map<String,String> summarizerProps = Collections.emptyMap();
+  private LoadPlan.SplitResolver splitResolver;
 
   private void checkDisjoint(Map<String,String> props, Map<String,String> 
derivedProps,
       String kind) {
@@ -81,7 +83,7 @@ class RFileWriterBuilder implements RFile.OutputArguments, 
RFile.WriterFSOptions
 
   @Override
   public WriterOptions withSampler(SamplerConfiguration samplerConf) {
-    Objects.requireNonNull(samplerConf);
+    requireNonNull(samplerConf);
     Map<String,String> tmp = new 
SamplerConfigurationImpl(samplerConf).toTablePropertiesMap();
     checkDisjoint(tableConfig, tmp, "sampler");
     this.samplerProps = tmp;
@@ -106,6 +108,9 @@ class RFileWriterBuilder implements RFile.OutputArguments, 
RFile.WriterFSOptions
     CryptoService cs =
         CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, 
tableConfig);
 
+    var loadPlanCollector =
+        splitResolver == null ? new LoadPlanCollector() : new 
LoadPlanCollector(splitResolver);
+
     if (out.getOutputStream() != null) {
       FSDataOutputStream fsdo;
       if (out.getOutputStream() instanceof FSDataOutputStream) {
@@ -116,17 +121,19 @@ class RFileWriterBuilder implements 
RFile.OutputArguments, RFile.WriterFSOptions
       return new RFileWriter(
           fileops.newWriterBuilder().forOutputStream(".rf", fsdo, 
out.getConf(), cs)
               .withTableConfiguration(acuconf).withStartDisabled().build(),
-          visCacheSize);
+          visCacheSize, loadPlanCollector);
     } else {
-      return new RFileWriter(fileops.newWriterBuilder()
-          .forFile(out.path.toString(), out.getFileSystem(out.path), 
out.getConf(), cs)
-          .withTableConfiguration(acuconf).withStartDisabled().build(), 
visCacheSize);
+      return new RFileWriter(
+          fileops.newWriterBuilder()
+              .forFile(out.path.toString(), out.getFileSystem(out.path), 
out.getConf(), cs)
+              .withTableConfiguration(acuconf).withStartDisabled().build(),
+          visCacheSize, loadPlanCollector);
     }
   }
 
   @Override
   public WriterOptions withFileSystem(FileSystem fs) {
-    Objects.requireNonNull(fs);
+    requireNonNull(fs);
     out.fs = fs;
     return this;
   }
@@ -140,14 +147,14 @@ class RFileWriterBuilder implements 
RFile.OutputArguments, RFile.WriterFSOptions
 
   @Override
   public WriterOptions to(OutputStream out) {
-    Objects.requireNonNull(out);
+    requireNonNull(out);
     this.out = new OutputArgs(out);
     return this;
   }
 
   @Override
   public WriterOptions withTableProperties(Iterable<Entry<String,String>> 
tableConfig) {
-    Objects.requireNonNull(tableConfig);
+    requireNonNull(tableConfig);
     HashMap<String,String> cfg = new HashMap<>();
     for (Entry<String,String> entry : tableConfig) {
       cfg.put(entry.getKey(), entry.getValue());
@@ -161,7 +168,7 @@ class RFileWriterBuilder implements RFile.OutputArguments, 
RFile.WriterFSOptions
 
   @Override
   public WriterOptions withTableProperties(Map<String,String> tableConfig) {
-    Objects.requireNonNull(tableConfig);
+    requireNonNull(tableConfig);
     return withTableProperties(tableConfig.entrySet());
   }
 
@@ -172,9 +179,15 @@ class RFileWriterBuilder implements RFile.OutputArguments, 
RFile.WriterFSOptions
     return this;
   }
 
+  @Override
+  public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) 
{
+    this.splitResolver = requireNonNull(splitResolver);
+    return this;
+  }
+
   @Override
   public WriterOptions withSummarizers(SummarizerConfiguration... 
summarizerConf) {
-    Objects.requireNonNull(summarizerConf);
+    requireNonNull(summarizerConf);
     Map<String,String> tmp = 
SummarizerConfiguration.toTableProperties(summarizerConf);
     checkDisjoint(tableConfig, tmp, "summarizer");
     this.summarizerProps = tmp;
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index a85db74a86..1c155cd510 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -48,6 +48,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
@@ -324,19 +325,25 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
     KeyExtent lookup(Text row);
   }
 
-  public static List<KeyExtent> findOverlappingTablets(KeyExtentCache 
extentCache,
-      FileSKVIterator reader) throws IOException {
+  /**
+   * Function that will find a row in a file being bulk imported that is >= 
the row passed to the
+   * function. If there is no row then it should return null.
+   */
+  public interface NextRowFunction {
+    Text apply(Text row) throws IOException;
+  }
+
+  public static List<KeyExtent> 
findOverlappingTablets(Function<Text,KeyExtent> rowToExtentResolver,
+      NextRowFunction nextRowFunction) throws IOException {
 
     List<KeyExtent> result = new ArrayList<>();
-    Collection<ByteSequence> columnFamilies = Collections.emptyList();
     Text row = new Text();
     while (true) {
-      reader.seek(new Range(row, null), columnFamilies, false);
-      if (!reader.hasTop()) {
+      row = nextRowFunction.apply(row);
+      if (row == null) {
         break;
       }
-      row = reader.getTopKey().getRow();
-      KeyExtent extent = extentCache.lookup(row);
+      KeyExtent extent = rowToExtentResolver.apply(row);
       result.add(extent);
       row = extent.endRow();
       if (row != null) {
@@ -356,13 +363,23 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
   }
 
   public static List<KeyExtent> findOverlappingTablets(ClientContext context,
-      KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> 
fileLenCache,
+      KeyExtentCache keyExtentCache, Path file, FileSystem fs, 
Cache<String,Long> fileLenCache,
       CryptoService cs) throws IOException {
     try (FileSKVIterator reader = 
FileOperations.getInstance().newReaderBuilder()
         .forFile(file.toString(), fs, fs.getConf(), cs)
         
.withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
         .seekToBeginning().build()) {
-      return findOverlappingTablets(extentCache, reader);
+
+      Collection<ByteSequence> columnFamilies = Collections.emptyList();
+      NextRowFunction nextRowFunction = row -> {
+        reader.seek(new Range(row, null), columnFamilies, false);
+        if (!reader.hasTop()) {
+          return null;
+        }
+        return reader.getTopKey().getRow();
+      };
+
+      return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction);
     }
   }
 
@@ -517,8 +534,8 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
         continue;
       }
 
-      if (FileOperations.getBulkWorkingFiles().contains(fname)) {
-        log.debug("{} is an internal working file, ignoring.", 
fileStatus.getPath());
+      if (FileOperations.isBulkWorkingFile(fname)) {
+        log.trace("{} is an internal working file, ignoring.", 
fileStatus.getPath());
         continue;
       }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java 
b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
index aecd25f663..c06fbb83c2 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
@@ -20,17 +20,31 @@ package org.apache.accumulo.core.data;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.io.IOException;
+import java.net.URI;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import 
org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.UnsignedBytes;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
@@ -125,7 +139,7 @@ public class LoadPlan {
               "Start row is greater than or equal to end row : " + srs + " " + 
ers);
         }
       } else {
-        throw new RuntimeException();
+        throw new IllegalStateException();
       }
 
     }
@@ -228,4 +242,230 @@ public class LoadPlan {
       }
     };
   }
+
+  private static class JsonDestination {
+    String fileName;
+    String startRow;
+    String endRow;
+    RangeType rangeType;
+
+    JsonDestination() {}
+
+    JsonDestination(Destination destination) {
+      fileName = destination.getFileName();
+      startRow = destination.getStartRow() == null ? null
+          : Base64.getUrlEncoder().encodeToString(destination.getStartRow());
+      endRow = destination.getEndRow() == null ? null
+          : Base64.getUrlEncoder().encodeToString(destination.getEndRow());
+      rangeType = destination.getRangeType();
+    }
+
+    Destination toDestination() {
+      return new Destination(fileName, rangeType,
+          startRow == null ? null : Base64.getUrlDecoder().decode(startRow),
+          endRow == null ? null : Base64.getUrlDecoder().decode(endRow));
+    }
+  }
+
+  private static final class JsonAll {
+    List<JsonDestination> destinations;
+
+    JsonAll() {}
+
+    JsonAll(List<Destination> destinations) {
+      this.destinations =
+          
destinations.stream().map(JsonDestination::new).collect(Collectors.toList());
+    }
+
+  }
+
+  private static final Gson gson = new 
GsonBuilder().disableJdkUnsafe().serializeNulls().create();
+
+  /**
+   * Serializes the load plan to json that looks like the following. The 
values of startRow and
+   * endRow field are base64 encoded using {@link Base64#getUrlEncoder()}.
+   *
+   * <pre>
+   * {
+   *   "destinations": [
+   *     {
+   *       "fileName": "f1.rf",
+   *       "startRow": null,
+   *       "endRow": "MDAz",
+   *       "rangeType": "TABLE"
+   *     },
+   *     {
+   *       "fileName": "f2.rf",
+   *       "startRow": "MDA0",
+   *       "endRow": "MDA3",
+   *       "rangeType": "FILE"
+   *     },
+   *     {
+   *       "fileName": "f1.rf",
+   *       "startRow": "MDA1",
+   *       "endRow": "MDA2",
+   *       "rangeType": "TABLE"
+   *     },
+   *     {
+   *       "fileName": "f3.rf",
+   *       "startRow": "MDA4",
+   *       "endRow": null,
+   *       "rangeType": "TABLE"
+   *     }
+   *   ]
+   * }
+   * </pre>
+   *
+   * @since 2.1.4
+   */
+  public String toJson() {
+    return gson.toJson(new JsonAll(destinations));
+  }
+
+  /**
+   * Deserializes json to a load plan.
+   *
+   * @param json produced by {@link #toJson()}
+   */
+  public static LoadPlan fromJson(String json) {
+    var dests = gson.fromJson(json, JsonAll.class).destinations.stream()
+        
.map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList());
+    return new LoadPlan(dests);
+  }
+
+  /**
+   * Represents two split points that exist in a table being bulk imported to.
+   *
+   * @since 2.1.4
+   */
+  public static class TableSplits {
+    private final Text prevRow;
+    private final Text endRow;
+
+    public TableSplits(Text prevRow, Text endRow) {
+      Preconditions.checkArgument(
+          prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0, 
"%s >= %s", prevRow,
+          endRow);
+      this.prevRow = prevRow;
+      this.endRow = endRow;
+    }
+
+    public Text getPrevRow() {
+      return prevRow;
+    }
+
+    public Text getEndRow() {
+      return endRow;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TableSplits that = (TableSplits) o;
+      return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow, 
that.endRow);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(prevRow, endRow);
+    }
+
+    @Override
+    public String toString() {
+      return "(" + prevRow + "," + endRow + "]";
+    }
+  }
+
+  /**
+   * A function that maps a row to two table split points that contain the 
row. These splits must
+   * exist in the table being bulk imported to. There is no requirement that 
the splits are
+   * contiguous. For example if a table has splits C,D,E,M and we ask for 
splits containing row H
+   * its ok to return D,M, but that could result in the file mapping to more 
actual tablets than
+   * needed. For a row that falls before or after all table splits, use null 
to represent -inf and
+   * +inf. For example if a table has splits C,D,E,M and row B is resolved it 
is ok to return
+   * null,C. If row Q is resolved for table splits C,D,E,M it would be ok to 
return M,null. For a
+   * table with zero splits, the resolver should return null,null for all rows.
+   *
+   * @since 2.1.4
+   */
+  public interface SplitResolver extends Function<Text,TableSplits> {
+    static SplitResolver from(SortedSet<Text> splits) {
+      return row -> {
+        var headSet = splits.headSet(row);
+        Text prevRow = headSet.isEmpty() ? null : headSet.last();
+        var tailSet = splits.tailSet(row);
+        Text endRow = tailSet.isEmpty() ? null : tailSet.first();
+        return new TableSplits(prevRow, endRow);
+      };
+    }
+
+    /**
+     * For a given row, R, this function should find two split points, S1 and 
S2, that exist in the
+     * table being bulk imported to, such that S1 &lt; R &lt;= S2. The closer 
S1 and S2 are to each
+     * other, the better.
+     */
+    @Override
+    TableSplits apply(Text row);
+  }
+
+  /**
+   * Computes a load plan for a given rfile. This will open the rfile and find 
every
+   * {@link TableSplits} that overlaps rows in the file and add those to the 
returned load plan.
+   *
+   * @since 2.1.4
+   */
+  public static LoadPlan compute(URI file, SplitResolver splitResolver) throws 
IOException {
+    return compute(file, Map.of(), splitResolver);
+  }
+
+  // KeyExtent requires a tableId and this code needs to use KeyExtent 
functionality but does not
+  // have a tableId or care what the tableId is. So this fake id is used with 
KeyExtent.
+  private static final TableId FAKE_ID = TableId.of("999");
+
+  /**
+   * Computes a load plan for a given rfile. This will open the rfile and find 
every
+   * {@link TableSplits} that overlaps rows in the file and add those to the 
returned load plan.
+   *
+   * @param properties used when opening the rfile, see
+   *        {@link 
org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)}
+   * @since 2.1.4
+   */
+  public static LoadPlan compute(URI file, Map<String,String> properties,
+      SplitResolver splitResolver) throws IOException {
+    try (var scanner = 
RFile.newScanner().from(file.toString()).withoutSystemIterators()
+        .withTableProperties(properties).withIndexCache(10_000_000).build()) {
+      BulkImport.NextRowFunction nextRowFunction = row -> {
+        scanner.setRange(new Range(row, null));
+        var iter = scanner.iterator();
+        if (iter.hasNext()) {
+          return iter.next().getKey().getRow();
+        } else {
+          return null;
+        }
+      };
+
+      Function<Text,KeyExtent> rowToExtentResolver = row -> {
+        var tabletRange = splitResolver.apply(row);
+        var extent = new KeyExtent(FAKE_ID, tabletRange.endRow, 
tabletRange.prevRow);
+        Preconditions.checkState(extent.contains(row), "%s does not contain 
%s", tabletRange, row);
+        return extent;
+      };
+
+      List<KeyExtent> overlapping =
+          BulkImport.findOverlappingTablets(rowToExtentResolver, 
nextRowFunction);
+
+      Path path = new Path(file);
+
+      var builder = builder();
+      for (var extent : overlapping) {
+        builder.loadFileTo(path.getName(), RangeType.TABLE, 
extent.prevEndRow(), extent.endRow());
+      }
+      return builder.build();
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java 
b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index db82b0d149..f83d202b2f 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -53,6 +53,11 @@ public abstract class FileOperations {
       Set.of(Constants.BULK_LOAD_MAPPING, Constants.BULK_RENAME_FILE,
           FileOutputCommitter.SUCCEEDED_FILE_NAME, HADOOP_JOBHISTORY_LOCATION);
 
+  public static boolean isBulkWorkingFile(String fileName) {
+    return fileName.startsWith(Constants.BULK_WORKING_PREFIX)
+        || bulkWorkingFiles.contains(fileName);
+  }
+
   public static Set<String> getValidExtensions() {
     return validExtensions;
   }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java 
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
index 716803b356..58e56abf0a 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.io.File;
 import java.io.IOException;
 import java.net.ConnectException;
+import java.net.URI;
 import java.security.SecureRandom;
 import java.util.AbstractMap;
 import java.util.ArrayList;
@@ -40,7 +41,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
@@ -57,6 +62,8 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
+import org.apache.accumulo.core.data.LoadPlanTest;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
@@ -881,4 +888,207 @@ public class RFileClientTest {
     assertTrue(Arrays.stream(exception3.getStackTrace())
         .anyMatch(ste -> ste.getClassName().contains(localFsClass)));
   }
+
+  @Test
+  public void testLoadPlanEmpty() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    LoadPlan.SplitResolver splitResolver =
+        LoadPlan.SplitResolver.from(new TreeSet<>(List.of(new Text("m"))));
+
+    for (boolean withSplits : List.of(true, false)) {
+      String testFile = createTmpTestFile();
+      var builder = RFile.newWriter().to(testFile).withFileSystem(localFs);
+      if (withSplits) {
+        builder = builder.withSplitResolver(splitResolver);
+      }
+      var writer = builder.build();
+
+      // can not get load plan before closing file
+      assertThrows(IllegalStateException.class,
+          () -> writer.getLoadPlan(new Path(testFile).getName()));
+
+      try (writer) {
+        writer.startDefaultLocalityGroup();
+        assertThrows(IllegalStateException.class,
+            () -> writer.getLoadPlan(new Path(testFile).getName()));
+      }
+      var loadPlan = writer.getLoadPlan(new Path(testFile).getName());
+      assertEquals(0, loadPlan.getDestinations().size());
+
+      loadPlan = LoadPlan.compute(new URI(testFile), splitResolver);
+      assertEquals(0, loadPlan.getDestinations().size());
+    }
+  }
+
+  @Test
+  public void testLoadPlanLocalityGroupsNoSplits() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    String testFile = createTmpTestFile();
+    var writer = 
RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+    try (writer) {
+      writer.startNewLocalityGroup("LG1", "F1");
+      writer.append(new Key("001", "F1"), "V1");
+      writer.append(new Key("005", "F1"), "V2");
+      writer.startNewLocalityGroup("LG2", "F3");
+      writer.append(new Key("003", "F3"), "V3");
+      writer.append(new Key("004", "F3"), "V4");
+      writer.startDefaultLocalityGroup();
+      writer.append(new Key("007", "F4"), "V5");
+      writer.append(new Key("009", "F4"), "V6");
+    }
+
+    var filename = new Path(testFile).getName();
+    var loadPlan = writer.getLoadPlan(filename);
+    assertEquals(1, loadPlan.getDestinations().size());
+
+    // The minimum and maximum rows happend in different locality groups, the 
load plan should
+    // reflect this
+    var expectedLoadPlan =
+        LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, 
"001", "009").build();
+    assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+  }
+
+  @Test
+  public void testLoadPlanLocalityGroupsSplits() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    SortedSet<Text> splits =
+        Stream.of("001", "002", "003", "004", "005", "006", "007", "008", 
"009").map(Text::new)
+            .collect(Collectors.toCollection(TreeSet::new));
+    var splitResolver = LoadPlan.SplitResolver.from(splits);
+
+    String testFile = createTmpTestFile();
+    var writer = RFile.newWriter().to(testFile).withFileSystem(localFs)
+        .withSplitResolver(splitResolver).build();
+    try (writer) {
+      writer.startNewLocalityGroup("LG1", "F1");
+      writer.append(new Key("001", "F1"), "V1");
+      writer.append(new Key("005", "F1"), "V2");
+      writer.startNewLocalityGroup("LG2", "F3");
+      writer.append(new Key("003", "F3"), "V3");
+      writer.append(new Key("005", "F3"), "V3");
+      writer.append(new Key("007", "F3"), "V4");
+      writer.startDefaultLocalityGroup();
+      writer.append(new Key("007", "F4"), "V5");
+      writer.append(new Key("009", "F4"), "V6");
+    }
+
+    var filename = new Path(testFile).getName();
+    var loadPlan = writer.getLoadPlan(filename);
+    assertEquals(5, loadPlan.getDestinations().size());
+
+    var builder = LoadPlan.builder();
+    builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, null, "001");
+    builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "004", "005");
+    builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "002", "003");
+    builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "006", "007");
+    builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "008", "009");
+    assertEquals(LoadPlanTest.toString(builder.build().getDestinations()),
+        LoadPlanTest.toString(loadPlan.getDestinations()));
+
+    loadPlan = LoadPlan.compute(new URI(testFile), splitResolver);
+    assertEquals(LoadPlanTest.toString(builder.build().getDestinations()),
+        LoadPlanTest.toString(loadPlan.getDestinations()));
+  }
+
+  @Test
+  public void testIncorrectSplitResolver() throws Exception {
+    // for some rows the returns table splits will not contain the row. This 
should cause an error.
+    LoadPlan.SplitResolver splitResolver =
+        row -> new LoadPlan.TableSplits(new Text("003"), new Text("005"));
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    String testFile = createTmpTestFile();
+    var writer = RFile.newWriter().to(testFile).withFileSystem(localFs)
+        .withSplitResolver(splitResolver).build();
+    try (writer) {
+      writer.startDefaultLocalityGroup();
+      writer.append(new Key("004", "F4"), "V2");
+      var e = assertThrows(IllegalStateException.class,
+          () -> writer.append(new Key("007", "F4"), "V2"));
+      assertTrue(e.getMessage().contains("(003,005]"));
+      assertTrue(e.getMessage().contains("007"));
+    }
+
+    var testFile2 = createTmpTestFile();
+    var writer2 = 
RFile.newWriter().to(testFile2).withFileSystem(localFs).build();
+    try (writer2) {
+      writer2.startDefaultLocalityGroup();
+      writer2.append(new Key("004", "F4"), "V2");
+      writer2.append(new Key("007", "F4"), "V2");
+    }
+
+    var e = assertThrows(IllegalStateException.class,
+        () -> LoadPlan.compute(new URI(testFile), splitResolver));
+    assertTrue(e.getMessage().contains("(003,005]"));
+    assertTrue(e.getMessage().contains("007"));
+  }
+
+  @Test
+  public void testGetLoadPlanBeforeClose() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    String testFile = createTmpTestFile();
+    var writer = 
RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+    try (writer) {
+      var e = assertThrows(IllegalStateException.class,
+          () -> writer.getLoadPlan(new Path(testFile).getName()));
+      assertEquals("Attempted to get load plan before closing", 
e.getMessage());
+      writer.startDefaultLocalityGroup();
+      writer.append(new Key("004", "F4"), "V2");
+      var e2 = assertThrows(IllegalStateException.class,
+          () -> writer.getLoadPlan(new Path(testFile).getName()));
+      assertEquals("Attempted to get load plan before closing", 
e2.getMessage());
+    }
+  }
+
+  @Test
+  public void testGetLoadPlanWithPath() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    String testFile = createTmpTestFile();
+    var writer = 
RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+    writer.close();
+
+    var e =
+        assertThrows(IllegalArgumentException.class, () -> 
writer.getLoadPlan(testFile.toString()));
+    assertTrue(e.getMessage().contains("Unexpected path"));
+    assertEquals(0, writer.getLoadPlan(new 
Path(testFile).getName()).getDestinations().size());
+  }
+
+  @Test
+  public void testComputeLoadPlanWithPath() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    SortedSet<Text> splits =
+        Stream.of("001", "002", "003", "004", "005", "006", "007", "008", 
"009").map(Text::new)
+            .collect(Collectors.toCollection(TreeSet::new));
+    var splitResolver = LoadPlan.SplitResolver.from(splits);
+
+    String testFile = createTmpTestFile();
+    var writer = RFile.newWriter().to(testFile).withFileSystem(localFs)
+        .withSplitResolver(splitResolver).build();
+    writer.startDefaultLocalityGroup();
+    writer.append(new Key("001", "V4"), "test");
+    writer.append(new Key("002", "V4"), "test");
+    writer.append(new Key("003", "V4"), "test");
+    writer.append(new Key("004", "V4"), "test");
+    writer.close();
+
+    var e = assertThrows(IllegalArgumentException.class, () -> 
writer.getLoadPlan(testFile));
+    assertTrue(e.getMessage().contains("Unexpected path"));
+    assertEquals(4, writer.getLoadPlan(new 
Path(testFile).getName()).getDestinations().size());
+    assertEquals(4, LoadPlan.compute(new URI(testFile), 
splitResolver).getDestinations().size());
+
+    String hdfsHost = "127.0.0.5:8080";
+    String fileUri = "hdfs://" + hdfsHost + "/bulk-xyx/file1.rf";
+    URI uri = new URI(fileUri);
+    var err = assertThrows(RuntimeException.class, () -> LoadPlan.compute(uri, 
splitResolver));
+    assertTrue(err.getMessage().contains("to " + hdfsHost + " failed on 
connection exception"));
+    assertTrue(Arrays.stream(err.getCause().getStackTrace())
+        .anyMatch(ste -> 
ste.getClassName().contains(DistributedFileSystem.class.getName())));
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java 
b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java
index bba30a555d..18f2038163 100644
--- a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java
@@ -23,8 +23,11 @@ import static java.util.stream.Collectors.toSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.Base64;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.data.LoadPlan.Destination;
 import org.apache.accumulo.core.data.LoadPlan.RangeType;
@@ -100,6 +103,47 @@ public class LoadPlanTest {
 
     assertEquals(expected, actual);
 
+    var loadPlan2 = LoadPlan.fromJson(loadPlan.toJson());
+    Set<String> actual2 =
+        
loadPlan2.getDestinations().stream().map(LoadPlanTest::toString).collect(toSet());
+    assertEquals(expected, actual2);
+  }
+
+  @Test
+  public void testJson() {
+    var loadPlan = LoadPlan.builder().build();
+    assertEquals(0, loadPlan.getDestinations().size());
+    assertEquals("{\"destinations\":[]}", loadPlan.toJson());
+
+    var builder = LoadPlan.builder();
+    builder.loadFileTo("f1.rf", RangeType.TABLE, null, "003");
+    builder.loadFileTo("f2.rf", RangeType.FILE, "004", "007");
+    builder.loadFileTo("f1.rf", RangeType.TABLE, "005", "006");
+    builder.loadFileTo("f3.rf", RangeType.TABLE, "008", null);
+    String json = builder.build().toJson();
+
+    String b64003 = 
Base64.getUrlEncoder().encodeToString("003".getBytes(UTF_8));
+    String b64004 = 
Base64.getUrlEncoder().encodeToString("004".getBytes(UTF_8));
+    String b64005 = 
Base64.getUrlEncoder().encodeToString("005".getBytes(UTF_8));
+    String b64006 = 
Base64.getUrlEncoder().encodeToString("006".getBytes(UTF_8));
+    String b64007 = 
Base64.getUrlEncoder().encodeToString("007".getBytes(UTF_8));
+    String b64008 = 
Base64.getUrlEncoder().encodeToString("008".getBytes(UTF_8));
+
+    String expected = 
"{'destinations':[{'fileName':'f1.rf','startRow':null,'endRow':'" + b64003
+        + "','rangeType':'TABLE'},{'fileName':'f2.rf','startRow':'" + b64004 + 
"','endRow':'"
+        + b64007 + "','rangeType':'FILE'},{'fileName':'f1.rf','startRow':'" + 
b64005
+        + "','endRow':'" + b64006 + 
"','rangeType':'TABLE'},{'fileName':'f3.rf','startRow':'"
+        + b64008 + "','endRow':null,'rangeType':'TABLE'}]}";
+
+    assertEquals(expected.replace("'", "\""), json);
+  }
+
+  @Test
+  public void testTableSplits() {
+    assertThrows(IllegalArgumentException.class,
+        () -> new LoadPlan.TableSplits(new Text("004"), new Text("004")));
+    assertThrows(IllegalArgumentException.class,
+        () -> new LoadPlan.TableSplits(new Text("004"), new Text("003")));
   }
 
   private static String toString(Destination d) {
@@ -110,4 +154,8 @@ public class LoadPlanTest {
   private static String toString(byte[] r) {
     return r == null ? null : new String(r, UTF_8);
   }
+
+  public static Set<String> toString(Collection<Destination> destinations) {
+    return destinations.stream().map(d -> 
toString(d)).collect(Collectors.toSet());
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 33545639ef..4b0c533dd1 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -366,7 +366,6 @@ public class RFileTest {
   public void test1() throws IOException {
 
     // test an empty file
-
     TestRFile trf = new TestRFile(conf);
 
     trf.openWriter();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index a8aebd2e25..683461b8c3 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.math.BigInteger;
+import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.MessageDigest;
@@ -88,6 +89,7 @@ import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.constraints.MetadataConstraints;
 import org.apache.accumulo.server.constraints.SystemEnvironment;
 import org.apache.accumulo.test.util.Wait;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -476,6 +478,63 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testComputeLoadPlan() throws Exception {
+
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      addSplits(c, tableName, "0333 0666 0999 1333 1666");
+
+      String dir = getDir("/testBulkFile-");
+
+      Map<String,Set<String>> hashes = new HashMap<>();
+      String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+      hashes.put("0333", new HashSet<>(List.of(h1)));
+      String h2 = writeData(dir + "/f2.", aconf, 0, 666);
+      hashes.get("0333").add(h2);
+      hashes.put("0666", new HashSet<>(List.of(h2)));
+      String h3 = writeData(dir + "/f3.", aconf, 334, 700);
+      hashes.get("0666").add(h3);
+      hashes.put("0999", new HashSet<>(List.of(h3)));
+      hashes.put("1333", Set.of());
+      hashes.put("1666", Set.of());
+      hashes.put("null", Set.of());
+
+      SortedSet<Text> splits = new 
TreeSet<>(c.tableOperations().listSplits(tableName));
+
+      for (String filename : List.of("f1.rf", "f2.rf", "f3.rf")) {
+        // The body of this loop simulates what each reducer would do
+        Path path = new Path(dir + "/" + filename);
+
+        // compute the load plan for the rfile
+        URI file = path.toUri();
+        String lpJson = LoadPlan.compute(file, 
LoadPlan.SplitResolver.from(splits)).toJson();
+
+        // save the load plan to a file
+        Path lpPath = new Path(path.getParent(), path.getName().replace(".rf", 
".lp"));
+        try (var output = getCluster().getFileSystem().create(lpPath, false)) {
+          IOUtils.write(lpJson, output, UTF_8);
+        }
+      }
+
+      // This simulates the code that would run after the map reduce job and 
bulk import the files
+      var builder = LoadPlan.builder();
+      for (var status : getCluster().getFileSystem().listStatus(new Path(dir),
+          p -> p.getName().endsWith(".lp"))) {
+        try (var input = getCluster().getFileSystem().open(status.getPath())) {
+          String lpJson = IOUtils.toString(input, UTF_8);
+          builder.addPlan(LoadPlan.fromJson(lpJson));
+        }
+      }
+
+      LoadPlan lpAll = builder.build();
+
+      
c.tableOperations().importDirectory(dir).to(tableName).plan(lpAll).load();
+
+      verifyData(c, tableName, 0, 700, false);
+      verifyMetadata(c, tableName, hashes);
+    }
+  }
+
   @Test
   public void testEmptyDir() throws Exception {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
@@ -621,7 +680,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
         String endRow = tablet.getEndRow() == null ? "null" : 
tablet.getEndRow().toString();
 
-        assertEquals(expectedHashes.get(endRow), fileHashes);
+        assertEquals(expectedHashes.get(endRow), fileHashes, "endRow " + 
endRow);
 
         endRowsSeen.add(endRow);
       }

Reply via email to