This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new 807b62c Documents bug in bulk batch writer and renames class to indicate its buggy (#288) 807b62c is described below commit 807b62c01c4aae422a155be549fbf19664157b9c Author: Keith Turner <ktur...@apache.org> AuthorDate: Sun Dec 15 13:10:07 2024 -0500 Documents bug in bulk batch writer and renames class to indicate its buggy (#288) --- .../testing/continuous/ContinuousIngest.java | 4 +-- ...kBatchWriter.java => FlakyBulkBatchWriter.java} | 34 +++++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index b4c3c9f..62cad5d 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -189,8 +189,8 @@ public class ContinuousIngest { var workDir = new Path(bulkWorkDir); var filesystem = workDir.getFileSystem(conf); var memLimit = Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT)); - return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit, - splitSupplier); + return tableName -> new FlakyBulkBatchWriter(client, tableName, filesystem, workDir, + memLimit, splitSupplier); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java similarity index 76% rename from src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java rename to src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java index b5b68f9..281b183 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java @@ -47,9 +47,14 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -public class BulkBatchWriter implements BatchWriter { +/** + * BatchWriter that bulk imports in its implementation. The implementation contains a bug that was + * found to be useful for testing Accumulo. The bug was left and this class was renamed to add Flaky + * to indicate its danger for other uses. + */ +public class FlakyBulkBatchWriter implements BatchWriter { - private static final Logger log = LoggerFactory.getLogger(BulkBatchWriter.class); + private static final Logger log = LoggerFactory.getLogger(FlakyBulkBatchWriter.class); private final Deque<Mutation> mutations = new ArrayDeque<>(); private final AccumuloClient client; @@ -62,7 +67,7 @@ public class BulkBatchWriter implements BatchWriter { private long memUsed; private boolean closed = false; - public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem, + public FlakyBulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem, Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) { this.client = client; this.tableName = tableName; @@ -119,8 +124,7 @@ public class BulkBatchWriter implements BatchWriter { } } - Comparator<KeyValue> kvComparator = (kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey()); - keysValues.sort(kvComparator); + keysValues.sort(Map.Entry.comparingByKey()); RFileWriter writer = null; byte[] currEndRow = null; @@ -128,14 +132,30 @@ public class BulkBatchWriter implements BatchWriter { var loadPlanBuilder = LoadPlan.builder(); + // This code is broken because Arrays.compare will compare bytes as signed integers. Accumulo + // treats bytes as unsigned 8 bit integers for sorting purposes. This incorrect comparator + // causes this code to sometimes prematurely close rfiles, which can lead to lots of files + // being bulk imported into a single tablet. The files still go to the correct tablet, so this + // does not cause data loss. This bug was found to be useful in testing as it introduces + // stress on bulk import+compactions and it was decided to keep this bug. If copying this code + // elsewhere then this bug should probably be fixed. + Comparator<byte[]> comparator = Arrays::compare; + // To fix the code above it should be replaced with the following + // Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator(); + for (var keyValue : keysValues) { var key = keyValue.getKey(); - if (writer == null - || (currEndRow != null && Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) { + if (writer == null || (currEndRow != null + && comparator.compare(key.getRowData().toArray(), currEndRow) > 0)) { if (writer != null) { writer.close(); } + // When the above code prematurely closes a rfile because of the incorrect comparator, the + // following code will find a new Tablet. Since the following code uses the Text + // comparator its comparisons are correct and it will just find the same tablet for the + // file that was just closed. This is what cause multiple files to added to the same + // tablet. var row = key.getRow(); var headSet = splits.headSet(row); var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();