This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push:
new 807b62c Documents bug in bulk batch writer and renames class to
indicate its buggy (#288)
807b62c is described below
commit 807b62c01c4aae422a155be549fbf19664157b9c
Author: Keith Turner <[email protected]>
AuthorDate: Sun Dec 15 13:10:07 2024 -0500
Documents bug in bulk batch writer and renames class to indicate its buggy
(#288)
---
.../testing/continuous/ContinuousIngest.java | 4 +--
...kBatchWriter.java => FlakyBulkBatchWriter.java} | 34 +++++++++++++++++-----
2 files changed, 29 insertions(+), 9 deletions(-)
diff --git
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index b4c3c9f..62cad5d 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -189,8 +189,8 @@ public class ContinuousIngest {
var workDir = new Path(bulkWorkDir);
var filesystem = workDir.getFileSystem(conf);
var memLimit =
Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT));
- return tableName -> new BulkBatchWriter(client, tableName,
filesystem, workDir, memLimit,
- splitSupplier);
+ return tableName -> new FlakyBulkBatchWriter(client, tableName,
filesystem, workDir,
+ memLimit, splitSupplier);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git
a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
similarity index 76%
rename from
src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
rename to
src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
index b5b68f9..281b183 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
+++
b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
@@ -47,9 +47,14 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
-public class BulkBatchWriter implements BatchWriter {
+/**
+ * BatchWriter that bulk imports in its implementation. The implementation
contains a bug that was
+ * found to be useful for testing Accumulo. The bug was left and this class
was renamed to add Flaky
+ * to indicate its danger for other uses.
+ */
+public class FlakyBulkBatchWriter implements BatchWriter {
- private static final Logger log =
LoggerFactory.getLogger(BulkBatchWriter.class);
+ private static final Logger log =
LoggerFactory.getLogger(FlakyBulkBatchWriter.class);
private final Deque<Mutation> mutations = new ArrayDeque<>();
private final AccumuloClient client;
@@ -62,7 +67,7 @@ public class BulkBatchWriter implements BatchWriter {
private long memUsed;
private boolean closed = false;
- public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem
fileSystem,
+ public FlakyBulkBatchWriter(AccumuloClient client, String tableName,
FileSystem fileSystem,
Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) {
this.client = client;
this.tableName = tableName;
@@ -119,8 +124,7 @@ public class BulkBatchWriter implements BatchWriter {
}
}
- Comparator<KeyValue> kvComparator = (kv1, kv2) ->
kv1.getKey().compareTo(kv2.getKey());
- keysValues.sort(kvComparator);
+ keysValues.sort(Map.Entry.comparingByKey());
RFileWriter writer = null;
byte[] currEndRow = null;
@@ -128,14 +132,30 @@ public class BulkBatchWriter implements BatchWriter {
var loadPlanBuilder = LoadPlan.builder();
+ // This code is broken because Arrays.compare will compare bytes as
signed integers. Accumulo
+ // treats bytes as unsigned 8 bit integers for sorting purposes. This
incorrect comparator
+ // causes this code to sometimes prematurely close rfiles, which can
lead to lots of files
+ // being bulk imported into a single tablet. The files still go to the
correct tablet, so this
+ // does not cause data loss. This bug was found to be useful in testing
as it introduces
+ // stress on bulk import+compactions and it was decided to keep this
bug. If copying this code
+ // elsewhere then this bug should probably be fixed.
+ Comparator<byte[]> comparator = Arrays::compare;
+ // To fix the code above it should be replaced with the following
+ // Comparator<byte[]> comparator =
UnsignedBytes.lexicographicalComparator();
+
for (var keyValue : keysValues) {
var key = keyValue.getKey();
- if (writer == null
- || (currEndRow != null &&
Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) {
+ if (writer == null || (currEndRow != null
+ && comparator.compare(key.getRowData().toArray(), currEndRow) >
0)) {
if (writer != null) {
writer.close();
}
+ // When the above code prematurely closes a rfile because of the
incorrect comparator, the
+ // following code will find a new Tablet. Since the following code
uses the Text
+ // comparator its comparisons are correct and it will just find the
same tablet for the
+ // file that was just closed. This is what cause multiple files to
added to the same
+ // tablet.
var row = key.getRow();
var headSet = splits.headSet(row);
var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();