This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2b6ee58809 Validation check for KeyExtents (#5197)
2b6ee58809 is described below

commit 2b6ee58809f4892d4dadb344479948f875806f12
Author: Arbaaz Khan <[email protected]>
AuthorDate: Mon Aug 11 16:39:24 2025 -0400

    Validation check for KeyExtents (#5197)
    
    * Added test to check that validation is working
---
 .../core/clientImpl/bulk/LoadMappingIterator.java  |  25 ++-
 .../clientImpl/bulk/LoadMappingIteratorTest.java   | 175 +++++++++++++++++++++
 2 files changed, 196 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/LoadMappingIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/LoadMappingIterator.java
index 30e104196c..dada67edf0 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/LoadMappingIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/LoadMappingIterator.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
 import com.google.gson.stream.JsonReader;
 
 /**
@@ -45,6 +46,7 @@ public class LoadMappingIterator
   private JsonReader reader;
   private Gson gson = createGson();
   private Map<String,String> renameMap;
+  private KeyExtent lastKeyExtent = null;
 
   LoadMappingIterator(TableId tableId, InputStream loadMapFile) throws 
IOException {
     this.tableId = tableId;
@@ -72,12 +74,27 @@ public class LoadMappingIterator
 
   @Override
   public Map.Entry<KeyExtent,Bulk.Files> next() {
-    Bulk.Mapping bm = gson.fromJson(reader, Bulk.Mapping.class);
+    Bulk.Mapping bm;
+    try {
+      bm = gson.fromJson(reader, Bulk.Mapping.class);
+    } catch (JsonParseException e) {
+      throw new IllegalStateException("Failed to read next mapping", e);
+    }
+
+    KeyExtent currentKeyExtent = bm.getKeyExtent(tableId);
+
+    if (lastKeyExtent != null && currentKeyExtent.compareTo(lastKeyExtent) < 
0) {
+      throw new IllegalStateException(
+          String.format("KeyExtents are not in sorted order: %s was seen 
before %s", lastKeyExtent,
+              currentKeyExtent));
+    }
+
+    lastKeyExtent = currentKeyExtent;
+
     if (renameMap != null) {
-      return new AbstractMap.SimpleEntry<>(bm.getKeyExtent(tableId),
-          bm.getFiles().mapNames(renameMap));
+      return new AbstractMap.SimpleEntry<>(currentKeyExtent, 
bm.getFiles().mapNames(renameMap));
     } else {
-      return new AbstractMap.SimpleEntry<>(bm.getKeyExtent(tableId), 
bm.getFiles());
+      return new AbstractMap.SimpleEntry<>(currentKeyExtent, bm.getFiles());
     }
   }
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/LoadMappingIteratorTest.java
 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/LoadMappingIteratorTest.java
new file mode 100644
index 0000000000..f651a03ab1
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/LoadMappingIteratorTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl.bulk;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.accumulo.core.clientImpl.bulk.BulkSerialize.createGson;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
+import com.google.gson.stream.JsonWriter;
+
+public class LoadMappingIteratorTest {
+  private LoadMappingIterator createLoadMappingIter(Map<KeyExtent,String> 
loadRanges)
+      throws IOException {
+    Map<KeyExtent,Bulk.Files> mapping = new LinkedHashMap<>();
+
+    loadRanges.forEach((extent, files) -> {
+      Bulk.Files testFiles = new Bulk.Files();
+      long c = 0L;
+      for (String f : files.split(" ")) {
+        c++;
+        testFiles.add(new Bulk.FileInfo(f, c, c));
+      }
+
+      mapping.put(extent, testFiles);
+    });
+
+    // Serialize unordered mapping directly
+    byte[] serializedData;
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      writeLoadMappingWithoutSorting(mapping, "/some/dir", p -> baos);
+      serializedData = baos.toByteArray();
+    }
+    ByteArrayInputStream bais = new ByteArrayInputStream(serializedData);
+
+    return BulkSerialize.readLoadMapping("/some/dir", TableId.of("1"), p -> 
bais);
+  }
+
+  KeyExtent nke(String prev, String end) {
+    Text per = prev == null ? null : new Text(prev);
+    Text er = end == null ? null : new Text(end);
+
+    return new KeyExtent(TableId.of("1"), er, per);
+  }
+
+  /**
+   * Serialize bulk load mapping without sorting.
+   */
+  public static void writeLoadMappingWithoutSorting(Map<KeyExtent,Bulk.Files> 
loadMapping,
+      String sourceDir, BulkSerialize.Output output) throws IOException {
+    final Path lmFile = new Path(sourceDir, Constants.BULK_LOAD_MAPPING);
+
+    try (OutputStream fsOut = output.create(lmFile);
+        OutputStreamWriter osw = new OutputStreamWriter(fsOut, UTF_8);
+        BufferedWriter bw = new BufferedWriter(osw); JsonWriter writer = new 
JsonWriter(bw)) {
+      Gson gson = createGson();
+      writer.setIndent("  ");
+      writer.beginArray();
+      // Iterate over entries in the order they are inserted
+      for (Map.Entry<KeyExtent,Bulk.Files> entry : loadMapping.entrySet()) {
+        Bulk.Mapping mapping = new Bulk.Mapping(entry.getKey(), 
entry.getValue());
+        gson.toJson(mapping, Bulk.Mapping.class, writer);
+      }
+      writer.endArray();
+    }
+  }
+
+  @Test
+  void testValidOrderedInput() throws IOException {
+    Map<KeyExtent,String> loadRanges = new LinkedHashMap<>();
+    loadRanges.put(nke(null, "c"), "f1 f2");
+    loadRanges.put(nke("c", "g"), "f2 f3");
+    loadRanges.put(nke("g", "r"), "f2 f4");
+    loadRanges.put(nke("r", "w"), "f2 f5");
+    loadRanges.put(nke("w", null), "f2 f6");
+
+    try (LoadMappingIterator iterator = createLoadMappingIter(loadRanges)) {
+      var loadRangesIter = loadRanges.keySet().iterator();
+
+      while (iterator.hasNext()) {
+        assertEquals(loadRangesIter.next(), iterator.next().getKey());
+      }
+      assertFalse(loadRangesIter.hasNext(), "Iterator should consume all 
expected entries");
+    }
+  }
+
+  @Test
+  void testInvalidOutOfOrderInput() throws IOException {
+    Map<KeyExtent,String> loadRanges = new LinkedHashMap<>();
+    loadRanges.put(nke("c", "g"), "f2 f3");
+    loadRanges.put(nke(null, "c"), "f1 f2");
+    loadRanges.put(nke("g", "r"), "f2 f4");
+    loadRanges.put(nke("r", "w"), "f2 f5");
+    loadRanges.put(nke("w", null), "f2 f6");
+
+    try (LoadMappingIterator iterator = createLoadMappingIter(loadRanges)) {
+      assertEquals(nke("c", "g"), iterator.next().getKey());
+      var e = assertThrows(IllegalStateException.class, iterator::next);
+      String expected = "KeyExtents are not in sorted order: 1;g;c was seen 
before 1;c<";
+      assertEquals(expected, e.getMessage());
+    }
+  }
+
+  @Test
+  void testNullCheck() throws IOException {
+    Map<KeyExtent,String> loadRanges = new LinkedHashMap<>();
+    loadRanges.put(nke(null, "a"), "f1 f2");
+
+    try (LoadMappingIterator iterator = createLoadMappingIter(loadRanges)) {
+      assertTrue(iterator.hasNext());
+      iterator.next();
+
+      assertFalse(iterator.hasNext());
+
+      var e = assertThrows(IllegalStateException.class, iterator::next);
+      String expected = "Failed to read next mapping";
+      assertEquals(expected, e.getMessage());
+    }
+  }
+
+  @Test
+  void testMalformedJson() throws IOException {
+    // Create invalid JSON (missing closing bracket)
+    String malformedJson =
+        "[ { \"extent\": { \"table\": \"1\", \"endRow\": \"a\" }, \"files\": [ 
] ";
+
+    ByteArrayInputStream bais = new 
ByteArrayInputStream(malformedJson.getBytes(UTF_8));
+
+    try (LoadMappingIterator iterator =
+        BulkSerialize.readLoadMapping("/some/dir", TableId.of("1"), p -> 
bais)) {
+      var e = assertThrows(IllegalStateException.class, iterator::next);
+      String expected = "Failed to read next mapping";
+      assertEquals(expected, e.getMessage());
+      assertInstanceOf(JsonParseException.class, e.getCause());
+    }
+  }
+
+}

Reply via email to