This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new 711d28e Add deletes to continuous ingest (#166) 711d28e is described below commit 711d28e83a91cbaa0754064a7d72c0f975f7de31 Author: Dom G <47725857+domgargu...@users.noreply.github.com> AuthorDate: Fri Nov 19 13:04:02 2021 -0500 Add deletes to continuous ingest (#166) * Add the ability for the deletion of entries to occur while running continuous ingest --- conf/accumulo-testing.properties | 3 + .../org/apache/accumulo/testing/TestProps.java | 2 + .../testing/continuous/ContinuousIngest.java | 85 ++++++++++++++++------ 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties index 4f0e749..6ef5855 100644 --- a/conf/accumulo-testing.properties +++ b/conf/accumulo-testing.properties @@ -82,6 +82,9 @@ test.ci.ingest.pause.wait.max=180 test.ci.ingest.pause.duration.min=60 # Maximum pause duration (in seconds) test.ci.ingest.pause.duration.max=120 +# The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest +# To disable deletes, set probability to 0 +test.ci.ingest.delete.probability=0.1 # Batch walker # ------------ diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java index c4c8948..e7801d3 100644 --- a/src/main/java/org/apache/accumulo/testing/TestProps.java +++ b/src/main/java/org/apache/accumulo/testing/TestProps.java @@ -94,6 +94,8 @@ public class TestProps { public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST + "pause.duration.max"; // Amount of data to write before flushing. Pause checks are only done after flush. public static final String CI_INGEST_FLUSH_ENTRIES = CI_INGEST + "entries.flush"; + // The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest + public static final String CI_INGEST_DELETE_PROBABILITY = CI_INGEST + "delete.probability"; /** Batch Walker **/ // Sleep time between batch scans (in ms) diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index 66f5152..459102f 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -74,6 +74,14 @@ public class ContinuousIngest { return (rand.nextInt(max - min) + min); } + private static float getDeleteProbability(Properties props) { + String stringValue = props.getProperty(TestProps.CI_INGEST_DELETE_PROBABILITY); + float prob = Float.parseFloat(stringValue); + Preconditions.checkArgument(prob >= 0.0 && prob <= 1.0, + "Delete probability should be between 0.0 and 1.0"); + return prob; + } + private static int getFlushEntries(Properties props) { return Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, "1000000")); } @@ -128,11 +136,8 @@ public class ContinuousIngest { // always want to point back to flushed data. This way the previous item should // always exist in accumulo when verifying data. To do this make insert N point // back to the row from insert (N - flushInterval). The array below is used to keep - // track of this. - long[] prevRows = new long[flushInterval]; - long[] firstRows = new long[flushInterval]; - int[] firstColFams = new int[flushInterval]; - int[] firstColQuals = new int[flushInterval]; + // track of all inserts. + MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval]; long lastFlushTime = System.currentTimeMillis(); @@ -149,20 +154,21 @@ public class ContinuousIngest { log.info("INGESTING for " + pauseWaitSec + "s"); } + final float deleteProbability = getDeleteProbability(testProps); + log.info("DELETES will occur with a probability of {}", + String.format("%.02f", deleteProbability)); + out: while (true) { - // generate first set of nodes ColumnVisibility cv = getVisibility(r); + // generate first set of nodes for (int index = 0; index < flushInterval; index++) { long rowLong = genLong(rowMin, rowMax, r); - prevRows[index] = rowLong; - firstRows[index] = rowLong; int cf = r.nextInt(maxColF); int cq = r.nextInt(maxColQ); - firstColFams[index] = cf; - firstColQuals[index] = cq; + nodeMap[0][index] = new MutationInfo(rowLong, cf, cq); Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, checksum); count++; @@ -177,10 +183,12 @@ public class ContinuousIngest { for (int depth = 1; depth < maxDepth; depth++) { for (int index = 0; index < flushInterval; index++) { long rowLong = genLong(rowMin, rowMax, r); - byte[] prevRow = genRow(prevRows[index]); - prevRows[index] = rowLong; - Mutation m = genMutation(rowLong, r.nextInt(maxColF), r.nextInt(maxColQ), cv, - ingestInstanceId, count, prevRow, checksum); + byte[] prevRow = genRow(nodeMap[depth - 1][index].row); + int cfInt = r.nextInt(maxColF); + int cqInt = r.nextInt(maxColQ); + nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt); + Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, count, prevRow, + checksum); count++; bw.addMutation(m); } @@ -191,14 +199,36 @@ public class ContinuousIngest { pauseCheck(testProps, r); } - // create one big linked list, this makes all of the first inserts point to something - for (int index = 0; index < flushInterval - 1; index++) { - Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, - ingestInstanceId, count, genRow(prevRows[index + 1]), checksum); - count++; - bw.addMutation(m); + // random chance that the entries will be deleted + boolean delete = r.nextFloat() < deleteProbability; + + // if the previously written entries are scheduled to be deleted + if (delete) { + log.info("Deleting last portion of written entries"); + // add delete mutations in the reverse order in which they were written + for (int depth = nodeMap.length - 1; depth >= 0; depth--) { + for (int index = nodeMap[depth].length - 1; index >= 0; index--) { + MutationInfo currentNode = nodeMap[depth][index]; + Mutation m = new Mutation(genRow(currentNode.row)); + m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq)); + bw.addMutation(m); + } + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); + pauseCheck(testProps, r); + } + } else { + // create one big linked list, this makes all the first inserts point to something + for (int index = 0; index < flushInterval - 1; index++) { + MutationInfo firstEntry = nodeMap[0][index]; + MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1]; + Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv, + ingestInstanceId, count, genRow(lastEntry.row), checksum); + count++; + bw.addMutation(m); + } + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); } - lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); + if (count >= numEntries) break out; pauseCheck(testProps, r); @@ -207,6 +237,19 @@ public class ContinuousIngest { } } + private static class MutationInfo { + + long row; + int cf; + int cq; + + public MutationInfo(long row, int cf, int cq) { + this.row = row; + this.cf = cf; + this.cq = cq; + } + } + public static List<ColumnVisibility> parseVisibilities(String visString) { List<ColumnVisibility> vis; if (visString == null) {