This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new c5877f846e adds function to compute load plans when splits are unknown
(#5982)
c5877f846e is described below
commit c5877f846ecee8da422179b3572d4e2295559196
Author: Keith Turner <[email protected]>
AuthorDate: Tue Mar 3 11:38:13 2026 -0800
adds function to compute load plans when splits are unknown (#5982)
fixes #5971
Co-authored-by: Dave Marion <[email protected]>
---
.../org/apache/accumulo/core/data/LoadPlan.java | 60 ++++++++++++++++++++--
.../core/client/rfile/RFileClientTest.java | 41 +++++++++++++++
.../apache/accumulo/core/crypto/CryptoTest.java | 21 ++++++++
3 files changed, 118 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
index 2d1fd04e45..a874ae952e 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
@@ -38,7 +38,14 @@ import java.util.stream.Collectors;
import
org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -90,13 +97,19 @@ public class LoadPlan {
* row and end row can be null. The start row is exclusive and the end row
is inclusive (like
* Accumulo tablets). A common use case for this would be when files were
partitioned using a
* table's splits. When using this range type, the start and end row must
exist as splits in the
- * table or an exception will be thrown at load time.
+ * table or an exception will be thrown at load time. This RangeType is
the most efficient for
+ * accumulo to load, and it enables only loading files to tablets that
overlap data in the file.
*/
TABLE,
/**
- * Range that correspond to known rows in a file. For this range type, the
start row and end row
- * must be non-null. The start row and end row are both considered
inclusive. At load time,
- * these data ranges will be mapped to table ranges.
+ * Range that corresponds to the minimum and maximum rows in a file. For
this range type, the
+ * start row and end row must be non-null. The start row and end row are
both considered
+ * inclusive. At load time, these data ranges will be mapped to table
ranges. For this RangeType
+ * Accumulo has to do more work at load to map the file range to tablets.
Also, this will map a
+ * file to all tablets in the range even if the file has no data for that
tablet. For example if
+ * a range overlapped 10 tablets but the file only had data for 8 of those
tablets, the file
+ * would still be loaded to all 10. This will not cause problems for scans
or compactions other
+ * than the unnecessary work of opening a file and finding it has no data
for the tablet.
*/
FILE
}
@@ -459,6 +472,7 @@ public class LoadPlan {
* Computes a load plan for a given rfile. This will open the rfile and find
every
* {@link TableSplits} that overlaps rows in the file and add those to the
returned load plan.
*
+ * @return a load plan of type {@link RangeType#TABLE}
* @since 2.1.4
*/
public static LoadPlan compute(URI file, SplitResolver splitResolver) throws
IOException {
@@ -475,6 +489,7 @@ public class LoadPlan {
*
* @param properties used when opening the rfile, see
* {@link
org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)}
+ * @return a load plan of type {@link RangeType#TABLE}
* @since 2.1.4
*/
public static LoadPlan compute(URI file, Map<String,String> properties,
@@ -510,4 +525,41 @@ public class LoadPlan {
return builder.build();
}
}
+
+ /**
+ * Computes a load plan for a rfile based on the minimum and maximum row
present across all
+ * locality groups.
+ *
+ * @param properties used when opening the rfile, see
+ * {@link
org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)}
+ *
+ * @return a load plan of type {@link RangeType#FILE}
+ * @since 2.1.5
+ */
+ public static LoadPlan compute(URI file, Map<String,String> properties)
throws IOException {
+ var path = new Path(file);
+ var conf = new Configuration();
+ var fs = FileSystem.get(path.toUri(), conf);
+ CryptoService cs =
+ CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE,
properties);
+ var tableConf =
SiteConfiguration.empty().withOverrides(properties).build();
+ try (var reader = FileOperations.getInstance().newReaderBuilder()
+ .forFile(file.toString(), fs, conf,
cs).withTableConfiguration(tableConf).build();) {
+ var firstRow = reader.getFirstKey().getRow();
+ var lastRow = reader.getLastKey().getRow();
+ return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE,
firstRow, lastRow)
+ .build();
+ }
+ }
+
+ /**
+ * Computes a load plan for a rfile based on the minimum and maximum row
present across all
+ * locality groups.
+ *
+ * @return a load plan of type {@link RangeType#FILE}
+ * @since 2.1.5
+ */
+ public static LoadPlan compute(URI file) throws IOException {
+ return compute(file, Map.of());
+ }
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
index 58e56abf0a..ae3dccaecb 100644
---
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
@@ -948,6 +948,47 @@ public class RFileClientTest {
var expectedLoadPlan =
LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE,
"001", "009").build();
assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+ assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new
URI(testFile)).toJson());
+
+ // put the first row in the default LG and last row in the first LG
+ testFile = createTmpTestFile();
+ var writer2 =
RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+ try (writer2) {
+ writer2.startNewLocalityGroup("LG1", "F1");
+ writer2.append(new Key("007", "F1"), "V1");
+ writer2.append(new Key("009", "F1"), "V2");
+ writer2.startNewLocalityGroup("LG2", "F3");
+ writer2.append(new Key("003", "F3"), "V3");
+ writer2.append(new Key("004", "F3"), "V4");
+ writer2.startDefaultLocalityGroup();
+ writer2.append(new Key("002", "F4"), "V5");
+ writer2.append(new Key("008", "F4"), "V6");
+ }
+
+ filename = new Path(testFile).getName();
+ loadPlan = writer2.getLoadPlan(filename);
+ assertEquals(1, loadPlan.getDestinations().size());
+ expectedLoadPlan =
+ LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE,
"002", "009").build();
+ assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+ assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new
URI(testFile)).toJson());
+
+ // create a file w/ a single LG
+ testFile = createTmpTestFile();
+ var writer3 =
RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+ try (writer3) {
+ writer3.startDefaultLocalityGroup();
+ writer3.append(new Key("003", "F4"), "V5");
+ writer3.append(new Key("008", "F4"), "V6");
+ }
+
+ filename = new Path(testFile).getName();
+ loadPlan = writer3.getLoadPlan(filename);
+ assertEquals(1, loadPlan.getDestinations().size());
+ expectedLoadPlan =
+ LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE,
"003", "008").build();
+ assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+ assertEquals(expectedLoadPlan.toJson(), LoadPlan.compute(new
URI(testFile)).toJson());
}
@Test
diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
index 08ef7f660f..a27e99a227 100644
--- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
@@ -48,8 +48,11 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
@@ -67,6 +70,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.spi.crypto.AESCryptoService;
@@ -85,6 +89,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -357,6 +362,22 @@ public class CryptoTest {
assertEquals(1, summary.getStatistics().size());
assertEquals(0, summary.getFileStatistics().getInaccurate());
assertEquals(1, summary.getFileStatistics().getTotal());
+
+ // test computing load plan for encrypted files
+ var absUri = new Path(file).makeQualified(fs.getUri(),
fs.getWorkingDirectory()).toUri();
+ var loadPlan = LoadPlan.compute(absUri,
cryptoOnConf.getAllCryptoProperties());
+ var expectedLoadPlan =
+ LoadPlan.builder().loadFileTo("testFile1.rf", LoadPlan.RangeType.FILE,
"a", "a3").build();
+ assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson());
+
+ var splits =
+ Stream.of("a", "b",
"c").map(Text::new).collect(Collectors.toCollection(TreeSet::new));
+ var resolver = LoadPlan.SplitResolver.from(splits);
+ var loadPlan2 = LoadPlan.compute(absUri,
cryptoOnConf.getAllCryptoProperties(), resolver);
+ var expectedLoadPlan2 =
+ LoadPlan.builder().loadFileTo("testFile1.rf",
LoadPlan.RangeType.TABLE, null, "a")
+ .loadFileTo("testFile1.rf", LoadPlan.RangeType.TABLE, "a",
"b").build();
+ assertEquals(expectedLoadPlan2.toJson(), loadPlan2.toJson());
}
@Test