This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 8386e60353644fec80866adca605a2f1fea5a759
Merge: 5e0455a08b 7a27d40463
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Feb 1 21:12:50 2025 +0000

    Merge branch '2.1' into 3.1

 .../java/org/apache/accumulo/core/Constants.java   |   1 +
 .../core/client/rfile/LoadPlanCollector.java       | 131 +++++++++++
 .../apache/accumulo/core/client/rfile/RFile.java   |  10 +
 .../accumulo/core/client/rfile/RFileWriter.java    |  36 +++-
 .../core/client/rfile/RFileWriterBuilder.java      |  30 ++-
 .../accumulo/core/clientImpl/bulk/BulkImport.java  |  39 +++-
 .../org/apache/accumulo/core/data/LoadPlan.java    | 240 +++++++++++++++++++++
 .../apache/accumulo/core/file/FileOperations.java  |   5 +
 .../core/client/rfile/RFileClientTest.java         | 210 ++++++++++++++++++
 .../apache/accumulo/core/data/LoadPlanTest.java    |  48 +++++
 .../apache/accumulo/core/file/rfile/RFileTest.java |   1 -
 .../apache/accumulo/test/functional/BulkNewIT.java |  61 +++++-
 12 files changed, 788 insertions(+), 24 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
index 64956dcbc2,71c186f3eb..897e87213e
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
@@@ -35,11 -34,10 +35,12 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.LoadPlan;
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.RowRangeUtil;
  import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.Text;
  
  /**
diff --cc 
core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
index 559f43df1e,5b12f4d73b..ad5a7e5341
--- 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
@@@ -37,8 -37,8 +37,9 @@@ import org.apache.accumulo.core.conf.Ac
  import org.apache.accumulo.core.conf.ConfigurationCopy;
  import org.apache.accumulo.core.conf.DefaultConfiguration;
  import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
+ import org.apache.accumulo.core.data.LoadPlan;
  import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
  import org.apache.accumulo.core.metadata.ValidationUtil;
  import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
  import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
@@@ -117,12 -121,13 +122,13 @@@ class RFileWriterBuilder implements RFi
        return new RFileWriter(
            fileops.newWriterBuilder().forOutputStream(".rf", fsdo, 
out.getConf(), cs)
                .withTableConfiguration(acuconf).withStartDisabled().build(),
-           visCacheSize);
+           visCacheSize, loadPlanCollector);
      } else {
 -      return new RFileWriter(
 -          fileops.newWriterBuilder()
 -              .forFile(out.path.toString(), out.getFileSystem(out.path), 
out.getConf(), cs)
 -              .withTableConfiguration(acuconf).withStartDisabled().build(),
 -          visCacheSize, loadPlanCollector);
 +      return new RFileWriter(fileops.newWriterBuilder()
 +          .forFile(UnreferencedTabletFile.of(out.getFileSystem(out.path), 
out.path),
 +              out.getFileSystem(out.path), out.getConf(), cs)
-           .withTableConfiguration(acuconf).withStartDisabled().build(), 
visCacheSize);
++          .withTableConfiguration(acuconf).withStartDisabled().build(), 
visCacheSize,
++          loadPlanCollector);
      }
    }
  
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index a5e7ff94b5,1c155cd510..6c9e49886b
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@@ -345,12 -363,23 +352,22 @@@ public class BulkImport implements Impo
    }
  
    public static List<KeyExtent> findOverlappingTablets(ClientContext context,
-       KeyExtentCache extentCache, UnreferencedTabletFile file, FileSystem fs,
 -      KeyExtentCache keyExtentCache, Path file, FileSystem fs, 
Cache<String,Long> fileLenCache,
 -      CryptoService cs) throws IOException {
++      KeyExtentCache keyExtentCache, UnreferencedTabletFile file, FileSystem 
fs,
 +      Cache<String,Long> fileLenCache, CryptoService cs) throws IOException {
      try (FileSKVIterator reader = 
FileOperations.getInstance().newReaderBuilder()
 -        .forFile(file.toString(), fs, fs.getConf(), cs)
 -        
.withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
 -        .seekToBeginning().build()) {
 +        .forFile(file, fs, fs.getConf(), 
cs).withTableConfiguration(context.getConfiguration())
 +        .withFileLenCache(fileLenCache).seekToBeginning().build()) {
-       return findOverlappingTablets(extentCache, reader);
+ 
+       Collection<ByteSequence> columnFamilies = Collections.emptyList();
+       NextRowFunction nextRowFunction = row -> {
+         reader.seek(new Range(row, null), columnFamilies, false);
+         if (!reader.hasTop()) {
+           return null;
+         }
+         return reader.getTopKey().getRow();
+       };
+ 
+       return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction);
      }
    }
  
diff --cc 
core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
index 753ff6631d,58e56abf0a..4273381467
--- 
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
@@@ -29,6 -28,8 +29,7 @@@ import static org.junit.jupiter.api.Ass
  import java.io.File;
  import java.io.IOException;
  import java.net.ConnectException;
+ import java.net.URI;
 -import java.security.SecureRandom;
  import java.util.AbstractMap;
  import java.util.ArrayList;
  import java.util.Arrays;
@@@ -58,7 -62,8 +63,9 @@@ import org.apache.accumulo.core.conf.Pr
  import org.apache.accumulo.core.data.ArrayByteSequence;
  import org.apache.accumulo.core.data.ByteSequence;
  import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.LoadPlan;
+ import org.apache.accumulo.core.data.LoadPlanTest;
 +import org.apache.accumulo.core.data.PartialKey;
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.file.FileOperations;

Reply via email to