This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new 807b62c  Documents bug in bulk batch writer and renames class to 
indicate its buggy (#288)
807b62c is described below

commit 807b62c01c4aae422a155be549fbf19664157b9c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sun Dec 15 13:10:07 2024 -0500

    Documents bug in bulk batch writer and renames class to indicate its buggy 
(#288)
---
 .../testing/continuous/ContinuousIngest.java       |  4 +--
 ...kBatchWriter.java => FlakyBulkBatchWriter.java} | 34 +++++++++++++++++-----
 2 files changed, 29 insertions(+), 9 deletions(-)

diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index b4c3c9f..62cad5d 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -189,8 +189,8 @@ public class ContinuousIngest {
           var workDir = new Path(bulkWorkDir);
           var filesystem = workDir.getFileSystem(conf);
           var memLimit = 
Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT));
-          return tableName -> new BulkBatchWriter(client, tableName, 
filesystem, workDir, memLimit,
-              splitSupplier);
+          return tableName -> new FlakyBulkBatchWriter(client, tableName, 
filesystem, workDir,
+              memLimit, splitSupplier);
         } catch (IOException e) {
           throw new UncheckedIOException(e);
         }
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java 
b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
similarity index 76%
rename from 
src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
rename to 
src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
index b5b68f9..281b183 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
+++ 
b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
@@ -47,9 +47,14 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 
-public class BulkBatchWriter implements BatchWriter {
+/**
+ * BatchWriter that bulk imports in its implementation. The implementation 
contains a bug that was
+ * found to be useful for testing Accumulo. The bug was left and this class 
was renamed to add Flaky
+ * to indicate its danger for other uses.
+ */
+public class FlakyBulkBatchWriter implements BatchWriter {
 
-  private static final Logger log = 
LoggerFactory.getLogger(BulkBatchWriter.class);
+  private static final Logger log = 
LoggerFactory.getLogger(FlakyBulkBatchWriter.class);
 
   private final Deque<Mutation> mutations = new ArrayDeque<>();
   private final AccumuloClient client;
@@ -62,7 +67,7 @@ public class BulkBatchWriter implements BatchWriter {
   private long memUsed;
   private boolean closed = false;
 
-  public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem 
fileSystem,
+  public FlakyBulkBatchWriter(AccumuloClient client, String tableName, 
FileSystem fileSystem,
       Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) {
     this.client = client;
     this.tableName = tableName;
@@ -119,8 +124,7 @@ public class BulkBatchWriter implements BatchWriter {
         }
       }
 
-      Comparator<KeyValue> kvComparator = (kv1, kv2) -> 
kv1.getKey().compareTo(kv2.getKey());
-      keysValues.sort(kvComparator);
+      keysValues.sort(Map.Entry.comparingByKey());
 
       RFileWriter writer = null;
       byte[] currEndRow = null;
@@ -128,14 +132,30 @@ public class BulkBatchWriter implements BatchWriter {
 
       var loadPlanBuilder = LoadPlan.builder();
 
+      // This code is broken because Arrays.compare will compare bytes as 
signed integers. Accumulo
+      // treats bytes as unsigned 8 bit integers for sorting purposes. This 
incorrect comparator
+      // causes this code to sometimes prematurely close rfiles, which can 
lead to lots of files
+      // being bulk imported into a single tablet. The files still go to the 
correct tablet, so this
+      // does not cause data loss. This bug was found to be useful in testing 
as it introduces
+      // stress on bulk import+compactions and it was decided to keep this 
bug. If copying this code
+      // elsewhere then this bug should probably be fixed.
+      Comparator<byte[]> comparator = Arrays::compare;
+      // To fix the code above it should be replaced with the following
+      // Comparator<byte[]> comparator = 
UnsignedBytes.lexicographicalComparator();
+
       for (var keyValue : keysValues) {
         var key = keyValue.getKey();
-        if (writer == null
-            || (currEndRow != null && 
Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) {
+        if (writer == null || (currEndRow != null
+            && comparator.compare(key.getRowData().toArray(), currEndRow) > 
0)) {
           if (writer != null) {
             writer.close();
           }
 
+          // When the above code prematurely closes a rfile because of the 
incorrect comparator, the
+          // following code will find a new Tablet. Since the following code 
uses the Text
+          // comparator its comparisons are correct and it will just find the 
same tablet for the
+          // file that was just closed. This is what cause multiple files to 
added to the same
+          // tablet.
           var row = key.getRow();
           var headSet = splits.headSet(row);
           var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();

Reply via email to