This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new ed97a2f  fixes sorting in bulk batch writer (#289)
ed97a2f is described below

commit ed97a2f81e617428158e24552ad3a96e0460a875
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Dec 10 15:32:30 2024 -0500

    fixes sorting in bulk batch writer (#289)
    
    BulkBatchWriter was sorting only on the mutation row.  Occasionally
    continuous ingest would generate the same 64 bit id twice in the same
    batch and if its columns were not sorted properly then the write to the
    rfile would fail.
    
    This fixes the problem by sorting on the entire key instead of only the
    row.
---
 .../testing/continuous/BulkBatchWriter.java        | 49 ++++++++++++++--------
 1 file changed, 32 insertions(+), 17 deletions(-)

diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java 
b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
index 6e763b3..b5b68f9 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
@@ -18,8 +18,11 @@
  */
 package org.apache.accumulo.testing.continuous;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
@@ -32,6 +35,7 @@ import 
org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
 import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.hadoop.fs.FileSystem;
@@ -47,7 +51,7 @@ public class BulkBatchWriter implements BatchWriter {
 
   private static final Logger log = 
LoggerFactory.getLogger(BulkBatchWriter.class);
 
-  private final List<Mutation> mutations = new ArrayList<>();
+  private final Deque<Mutation> mutations = new ArrayDeque<>();
   private final AccumuloClient client;
   private final String tableName;
   private final FileSystem fileSystem;
@@ -72,7 +76,7 @@ public class BulkBatchWriter implements BatchWriter {
   public synchronized void addMutation(Mutation mutation) throws 
MutationsRejectedException {
     Preconditions.checkState(!closed);
     mutation = new Mutation(mutation);
-    mutations.add(mutation);
+    mutations.addLast(mutation);
     memUsed += mutation.estimatedMemoryUsed();
     if (memUsed > memLimit) {
       flush();
@@ -96,7 +100,27 @@ public class BulkBatchWriter implements BatchWriter {
 
       Path tmpDir = new Path(workPath, UUID.randomUUID().toString());
       fileSystem.mkdirs(tmpDir);
-      mutations.sort((m1, m2) -> Arrays.compare(m1.getRow(), m2.getRow()));
+
+      List<KeyValue> keysValues = new ArrayList<>(mutations.size());
+
+      // remove mutations from the dequeue as we convert them to Keys making 
the Mutation objects
+      // available for garbage collection
+      Mutation mutation;
+      while ((mutation = mutations.pollFirst()) != null) {
+        for (var columnUpdate : mutation.getUpdates()) {
+          var builder = Key.builder(false).row(mutation.getRow())
+              
.family(columnUpdate.getColumnFamily()).qualifier(columnUpdate.getColumnQualifier())
+              .visibility(columnUpdate.getColumnVisibility());
+          if (columnUpdate.hasTimestamp()) {
+            builder = builder.timestamp(columnUpdate.getTimestamp());
+          }
+          Key key = builder.deleted(columnUpdate.isDeleted()).build();
+          keysValues.add(new KeyValue(key, columnUpdate.getValue()));
+        }
+      }
+
+      Comparator<KeyValue> kvComparator = (kv1, kv2) -> 
kv1.getKey().compareTo(kv2.getKey());
+      keysValues.sort(kvComparator);
 
       RFileWriter writer = null;
       byte[] currEndRow = null;
@@ -104,14 +128,15 @@ public class BulkBatchWriter implements BatchWriter {
 
       var loadPlanBuilder = LoadPlan.builder();
 
-      for (var mutation : mutations) {
+      for (var keyValue : keysValues) {
+        var key = keyValue.getKey();
         if (writer == null
-            || (currEndRow != null && Arrays.compare(mutation.getRow(), 
currEndRow) > 0)) {
+            || (currEndRow != null && 
Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) {
           if (writer != null) {
             writer.close();
           }
 
-          var row = new Text(mutation.getRow());
+          var row = key.getRow();
           var headSet = splits.headSet(row);
           var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
           var tailSet = splits.tailSet(row);
@@ -126,17 +151,7 @@ public class BulkBatchWriter implements BatchWriter {
           log.debug("Created new file {} for range {} {}", filename, 
tabletPrevRow, tabletEndRow);
         }
 
-        for (var colUpdate : mutation.getUpdates()) {
-          var key = new Key(mutation.getRow(), colUpdate.getColumnFamily(),
-              colUpdate.getColumnQualifier(), colUpdate.getColumnVisibility());
-          if (colUpdate.hasTimestamp()) {
-            key.setTimestamp(colUpdate.getTimestamp());
-          }
-          if (colUpdate.isDeleted()) {
-            key.setDeleted(true);
-          }
-          writer.append(key, colUpdate.getValue());
-        }
+        writer.append(key, keyValue.getValue());
       }
 
       if (writer != null) {

Reply via email to