This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new 711d28e  Add deletes to continuous ingest (#166)
711d28e is described below

commit 711d28e83a91cbaa0754064a7d72c0f975f7de31
Author: Dom G <47725857+domgargu...@users.noreply.github.com>
AuthorDate: Fri Nov 19 13:04:02 2021 -0500

    Add deletes to continuous ingest (#166)
    
    * Add the ability for the deletion of entries to occur while running 
continuous ingest
---
 conf/accumulo-testing.properties                   |  3 +
 .../org/apache/accumulo/testing/TestProps.java     |  2 +
 .../testing/continuous/ContinuousIngest.java       | 85 ++++++++++++++++------
 3 files changed, 69 insertions(+), 21 deletions(-)

diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index 4f0e749..6ef5855 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -82,6 +82,9 @@ test.ci.ingest.pause.wait.max=180
 test.ci.ingest.pause.duration.min=60
 # Maximum pause duration (in seconds)
 test.ci.ingest.pause.duration.max=120
+# The probability (between 0.0 and 1.0) that a set of entries will be deleted 
during continuous ingest
+# To disable deletes, set probability to 0
+test.ci.ingest.delete.probability=0.1
 
 # Batch walker
 # ------------
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java 
b/src/main/java/org/apache/accumulo/testing/TestProps.java
index c4c8948..e7801d3 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -94,6 +94,8 @@ public class TestProps {
   public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST + 
"pause.duration.max";
   // Amount of data to write before flushing. Pause checks are only done after 
flush.
   public static final String CI_INGEST_FLUSH_ENTRIES = CI_INGEST + 
"entries.flush";
+  // The probability (between 0.0 and 1.0) that a set of entries will be 
deleted during continuous ingest
+  public static final String CI_INGEST_DELETE_PROBABILITY = CI_INGEST + 
"delete.probability";
 
   /** Batch Walker **/
   // Sleep time between batch scans (in ms)
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 66f5152..459102f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -74,6 +74,14 @@ public class ContinuousIngest {
     return (rand.nextInt(max - min) + min);
   }
 
+  private static float getDeleteProbability(Properties props) {
+    String stringValue = 
props.getProperty(TestProps.CI_INGEST_DELETE_PROBABILITY);
+    float prob = Float.parseFloat(stringValue);
+    Preconditions.checkArgument(prob >= 0.0 && prob <= 1.0,
+        "Delete probability should be between 0.0 and 1.0");
+    return prob;
+  }
+
   private static int getFlushEntries(Properties props) {
     return 
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, 
"1000000"));
   }
@@ -128,11 +136,8 @@ public class ContinuousIngest {
       // always want to point back to flushed data. This way the previous item 
should
       // always exist in accumulo when verifying data. To do this make insert 
N point
       // back to the row from insert (N - flushInterval). The array below is 
used to keep
-      // track of this.
-      long[] prevRows = new long[flushInterval];
-      long[] firstRows = new long[flushInterval];
-      int[] firstColFams = new int[flushInterval];
-      int[] firstColQuals = new int[flushInterval];
+      // track of all inserts.
+      MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];
 
       long lastFlushTime = System.currentTimeMillis();
 
@@ -149,20 +154,21 @@ public class ContinuousIngest {
         log.info("INGESTING for " + pauseWaitSec + "s");
       }
 
+      final float deleteProbability = getDeleteProbability(testProps);
+      log.info("DELETES will occur with a probability of {}",
+          String.format("%.02f", deleteProbability));
+
       out: while (true) {
-        // generate first set of nodes
         ColumnVisibility cv = getVisibility(r);
 
+        // generate first set of nodes
         for (int index = 0; index < flushInterval; index++) {
           long rowLong = genLong(rowMin, rowMax, r);
-          prevRows[index] = rowLong;
-          firstRows[index] = rowLong;
 
           int cf = r.nextInt(maxColF);
           int cq = r.nextInt(maxColQ);
 
-          firstColFams[index] = cf;
-          firstColQuals[index] = cq;
+          nodeMap[0][index] = new MutationInfo(rowLong, cf, cq);
 
           Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, 
count, null, checksum);
           count++;
@@ -177,10 +183,12 @@ public class ContinuousIngest {
         for (int depth = 1; depth < maxDepth; depth++) {
           for (int index = 0; index < flushInterval; index++) {
             long rowLong = genLong(rowMin, rowMax, r);
-            byte[] prevRow = genRow(prevRows[index]);
-            prevRows[index] = rowLong;
-            Mutation m = genMutation(rowLong, r.nextInt(maxColF), 
r.nextInt(maxColQ), cv,
-                ingestInstanceId, count, prevRow, checksum);
+            byte[] prevRow = genRow(nodeMap[depth - 1][index].row);
+            int cfInt = r.nextInt(maxColF);
+            int cqInt = r.nextInt(maxColQ);
+            nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
+            Mutation m = genMutation(rowLong, cfInt, cqInt, cv, 
ingestInstanceId, count, prevRow,
+                checksum);
             count++;
             bw.addMutation(m);
           }
@@ -191,14 +199,36 @@ public class ContinuousIngest {
           pauseCheck(testProps, r);
         }
 
-        // create one big linked list, this makes all of the first inserts 
point to something
-        for (int index = 0; index < flushInterval - 1; index++) {
-          Mutation m = genMutation(firstRows[index], firstColFams[index], 
firstColQuals[index], cv,
-              ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
-          count++;
-          bw.addMutation(m);
+        // random chance that the entries will be deleted
+        boolean delete = r.nextFloat() < deleteProbability;
+
+        // if the previously written entries are scheduled to be deleted
+        if (delete) {
+          log.info("Deleting last portion of written entries");
+          // add delete mutations in the reverse order in which they were 
written
+          for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
+            for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
+              MutationInfo currentNode = nodeMap[depth][index];
+              Mutation m = new Mutation(genRow(currentNode.row));
+              m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
+              bw.addMutation(m);
+            }
+            lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+            pauseCheck(testProps, r);
+          }
+        } else {
+          // create one big linked list, this makes all the first inserts 
point to something
+          for (int index = 0; index < flushInterval - 1; index++) {
+            MutationInfo firstEntry = nodeMap[0][index];
+            MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
+            Mutation m = genMutation(firstEntry.row, firstEntry.cf, 
firstEntry.cq, cv,
+                ingestInstanceId, count, genRow(lastEntry.row), checksum);
+            count++;
+            bw.addMutation(m);
+          }
+          lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
         }
-        lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+
         if (count >= numEntries)
           break out;
         pauseCheck(testProps, r);
@@ -207,6 +237,19 @@ public class ContinuousIngest {
     }
   }
 
+  private static class MutationInfo {
+
+    long row;
+    int cf;
+    int cq;
+
+    public MutationInfo(long row, int cf, int cq) {
+      this.row = row;
+      this.cf = cf;
+      this.cq = cq;
+    }
+  }
+
   public static List<ColumnVisibility> parseVisibilities(String visString) {
     List<ColumnVisibility> vis;
     if (visString == null) {

Reply via email to