This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 8386e60353644fec80866adca605a2f1fea5a759 Merge: 5e0455a08b 7a27d40463 Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Feb 1 21:12:50 2025 +0000 Merge branch '2.1' into 3.1 .../java/org/apache/accumulo/core/Constants.java | 1 + .../core/client/rfile/LoadPlanCollector.java | 131 +++++++++++ .../apache/accumulo/core/client/rfile/RFile.java | 10 + .../accumulo/core/client/rfile/RFileWriter.java | 36 +++- .../core/client/rfile/RFileWriterBuilder.java | 30 ++- .../accumulo/core/clientImpl/bulk/BulkImport.java | 39 +++- .../org/apache/accumulo/core/data/LoadPlan.java | 240 +++++++++++++++++++++ .../apache/accumulo/core/file/FileOperations.java | 5 + .../core/client/rfile/RFileClientTest.java | 210 ++++++++++++++++++ .../apache/accumulo/core/data/LoadPlanTest.java | 48 +++++ .../apache/accumulo/core/file/rfile/RFileTest.java | 1 - .../apache/accumulo/test/functional/BulkNewIT.java | 61 +++++- 12 files changed, 788 insertions(+), 24 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index 64956dcbc2,71c186f3eb..897e87213e --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@@ -35,11 -34,10 +35,12 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.summary.Summary.FileStatistics; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.RowRangeUtil; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; /** diff --cc core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index 559f43df1e,5b12f4d73b..ad5a7e5341 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@@ -37,8 -37,8 +37,9 @@@ import org.apache.accumulo.core.conf.Ac import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; + import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.metadata.UnreferencedTabletFile; import org.apache.accumulo.core.metadata.ValidationUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; @@@ -117,12 -121,13 +122,13 @@@ class RFileWriterBuilder implements RFi return new RFileWriter( fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs) .withTableConfiguration(acuconf).withStartDisabled().build(), - visCacheSize); + visCacheSize, loadPlanCollector); } else { - return new RFileWriter( - fileops.newWriterBuilder() - .forFile(out.path.toString(), out.getFileSystem(out.path), out.getConf(), cs) - .withTableConfiguration(acuconf).withStartDisabled().build(), - visCacheSize, loadPlanCollector); + return new RFileWriter(fileops.newWriterBuilder() + .forFile(UnreferencedTabletFile.of(out.getFileSystem(out.path), out.path), + out.getFileSystem(out.path), out.getConf(), cs) - .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize); ++ .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize, ++ loadPlanCollector); } } diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index a5e7ff94b5,1c155cd510..6c9e49886b --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@@ -345,12 -363,23 +352,22 @@@ public class BulkImport implements Impo } public static List<KeyExtent> findOverlappingTablets(ClientContext context, - KeyExtentCache extentCache, UnreferencedTabletFile file, FileSystem fs, - KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache, - CryptoService cs) throws IOException { ++ KeyExtentCache keyExtentCache, UnreferencedTabletFile file, FileSystem fs, + Cache<String,Long> fileLenCache, CryptoService cs) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.toString(), fs, fs.getConf(), cs) - .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) - .seekToBeginning().build()) { + .forFile(file, fs, fs.getConf(), cs).withTableConfiguration(context.getConfiguration()) + .withFileLenCache(fileLenCache).seekToBeginning().build()) { - return findOverlappingTablets(extentCache, reader); + + Collection<ByteSequence> columnFamilies = Collections.emptyList(); + NextRowFunction nextRowFunction = row -> { + reader.seek(new Range(row, null), columnFamilies, false); + if (!reader.hasTop()) { + return null; + } + return reader.getTopKey().getRow(); + }; + + return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction); } } diff --cc core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 753ff6631d,58e56abf0a..4273381467 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@@ -29,6 -28,8 +29,7 @@@ import static org.junit.jupiter.api.Ass import java.io.File; import java.io.IOException; import java.net.ConnectException; + import java.net.URI; -import java.security.SecureRandom; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@@ -58,7 -62,8 +63,9 @@@ import org.apache.accumulo.core.conf.Pr import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.LoadPlan; + import org.apache.accumulo.core.data.LoadPlanTest; +import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations;