This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new d23c2fc  Make MulitTable reuse tables (#191)
d23c2fc is described below

commit d23c2fcbe47ae72df9f667235b6b7f851882a184
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Fri Feb 18 11:47:29 2022 -0500

    Make MulitTable reuse tables (#191)
    
    * This allows restarting rw tests and allow using of the same tables
    across multiple threads
    * Other various improvements to MultiTable
---
 .../testing/randomwalk/multitable/BulkImport.java  | 47 +++++++++++++---------
 .../testing/randomwalk/multitable/Commit.java      | 18 ++++++++-
 .../testing/randomwalk/multitable/CopyTable.java   |  7 ++++
 .../randomwalk/multitable/MultiTableFixture.java   | 26 ++++++++++--
 .../testing/randomwalk/multitable/Write.java       | 28 +++++++++----
 .../resources/randomwalk/modules/MultiTable.xml    |  1 -
 6 files changed, 95 insertions(+), 32 deletions(-)

diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
index 4c43b0a..b2a5c44 100644
--- 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
+++ 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.data.Key;
@@ -39,7 +40,7 @@ import org.apache.hadoop.io.Text;
 
 public class BulkImport extends Test {
 
-  public static final int LOTS = 100000;
+  public static final int ROWS = 1_000_000;
   public static final int COLS = 10;
   public static final List<Column> COLNAMES = new ArrayList<>();
   public static final Text CHECK_COLUMN_FAMILY = new Text("cf");
@@ -61,7 +62,7 @@ public class BulkImport extends Test {
     List<String> tables = (List<String>) state.get("tableList");
 
     if (tables.isEmpty()) {
-      log.debug("No tables to ingest into");
+      log.trace("No tables to ingest into");
       return;
     }
 
@@ -77,8 +78,8 @@ public class BulkImport extends Test {
     final boolean useLegacyBulk = rand.nextBoolean();
 
     TreeSet<String> rows = new TreeSet<>();
-    for (int i = 0; i < LOTS; i++)
-      rows.add(uuid + String.format("__%05d", i));
+    for (int i = 0; i < ROWS; i++)
+      rows.add(uuid + String.format("__%06d", i));
 
     String markerColumnQualifier = String.format("%07d", 
counter.incrementAndGet());
     log.debug("Preparing {} bulk import to {}", useLegacyBulk ? "legacy" : 
"new", tableName);
@@ -96,24 +97,30 @@ public class BulkImport extends Test {
       }
       f.close();
     }
-    if (useLegacyBulk) {
-      env.getAccumuloClient().tableOperations().importDirectory(tableName, 
dir.toString(),
-          fail.toString(), true);
-      FileStatus[] failures = fs.listStatus(fail);
-      if (failures != null && failures.length > 0) {
-        state.set("bulkImportSuccess", "false");
-        throw new Exception(failures.length + " failure files found importing 
files from " + dir);
+    log.debug("Starting {} bulk import to {}", useLegacyBulk ? "legacy" : 
"new", tableName);
+    try {
+      if (useLegacyBulk) {
+        env.getAccumuloClient().tableOperations().importDirectory(tableName, 
dir.toString(),
+            fail.toString(), true);
+        FileStatus[] failures = fs.listStatus(fail);
+        if (failures != null && failures.length > 0) {
+          state.set("bulkImportSuccess", "false");
+          throw new Exception(failures.length + " failure files found 
importing files from " + dir);
+        }
+      } else {
+        
env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
+            .tableTime(true).load();
       }
-    } else {
-      
env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
-          .tableTime(true).load();
-    }
 
-    fs.delete(dir, true);
-    fs.delete(fail, true);
-    log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}",
-        useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(),
-        markerColumnQualifier);
+      fs.delete(dir, true);
+      fs.delete(fail, true);
+      log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}",
+          useLegacyBulk ? "legacy" : "new", tableName, rows.first(), 
rows.last(),
+          markerColumnQualifier);
+    } catch (TableNotFoundException tnfe) {
+      log.debug("Table {} was deleted", tableName);
+      tables.remove(tableName);
+    }
   }
 
 }
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Commit.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Commit.java
index 0a65e33..1098499 100644
--- 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Commit.java
+++ 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Commit.java
@@ -18,6 +18,9 @@ package org.apache.accumulo.testing.randomwalk.multitable;
 
 import java.util.Properties;
 
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
 import org.apache.accumulo.testing.randomwalk.State;
 import org.apache.accumulo.testing.randomwalk.Test;
@@ -26,7 +29,20 @@ public class Commit extends Test {
 
   @Override
   public void visit(State state, RandWalkEnv env, Properties props) throws 
Exception {
-    env.getMultiTableBatchWriter().flush();
+    try {
+      env.getMultiTableBatchWriter().flush();
+    } catch (TableOfflineException e) {
+      log.debug("Commit failed, table offline");
+      return;
+    } catch (MutationsRejectedException mre) {
+      if (mre.getCause() instanceof TableDeletedException)
+        log.debug("Commit failed, table deleted");
+      else if (mre.getCause() instanceof TableOfflineException)
+        log.debug("Commit failed, table offline");
+      else
+        throw mre;
+      return;
+    }
 
     Long numWrites = state.getLong("numWrites");
     Long totalWrites = state.getLong("totalWrites") + numWrites;
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java
 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java
index 634c1fc..eaffa93 100644
--- 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java
+++ 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java
@@ -52,6 +52,13 @@ public class CopyTable extends Test {
     int nextId = ((Integer) state.get("nextId")).intValue();
     String dstTableName = String.format("%s_%d", 
state.getString("tableNamePrefix"), nextId);
 
+    if (env.getAccumuloClient().tableOperations().exists(dstTableName)) {
+      log.debug(dstTableName + " already exists so don't copy.");
+      nextId++;
+      state.set("nextId", Integer.valueOf(nextId));
+      return;
+    }
+
     String[] args = new String[3];
     args[0] = env.getClientPropsPath();
     args[1] = srcTableName;
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
index 68c4cee..f056d3c 100644
--- 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
+++ 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
@@ -16,8 +16,12 @@
  */
 package org.apache.accumulo.testing.randomwalk.multitable;
 
+import static java.util.stream.Collectors.toCollection;
+
 import java.net.InetAddress;
 import java.util.List;
+import java.util.OptionalInt;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -34,11 +38,27 @@ public class MultiTableFixture extends Fixture {
 
   @Override
   public void setUp(State state, RandWalkEnv env) throws Exception {
-
     String hostname = 
InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_");
+    String prefix = String.format("multi_%s", hostname);
+
+    Set<String> all = env.getAccumuloClient().tableOperations().list();
+    List<String> tableList = all.stream().filter(s -> s.startsWith(prefix))
+        .collect(toCollection(CopyOnWriteArrayList::new));
 
-    state.set("tableNamePrefix",
-        String.format("multi_%s_%s_%d", hostname, env.getPid(), 
System.currentTimeMillis()));
+    log.debug("Existing MultiTables: {}", tableList);
+    // get the max of the last ID created
+    OptionalInt optionalInt = tableList.stream().mapToInt(s -> {
+      String[] strArr = s.split("_");
+      return Integer.parseInt(strArr[strArr.length - 1]);
+    }).max();
+    int nextId = optionalInt.orElse(-1) + 1;
+    log.debug("Next ID started at {}", nextId);
+
+    state.set("tableNamePrefix", prefix);
+    state.set("nextId", nextId);
+    state.set("numWrites", 0L);
+    state.set("totalWrites", 0L);
+    state.set("tableList", tableList);
     state.set("nextId", 0);
     state.set("numWrites", 0L);
     state.set("totalWrites", 0L);
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java
index 65e5d75..414f4e1 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java
@@ -25,6 +25,8 @@ import java.util.Random;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.data.Mutation;
@@ -43,7 +45,7 @@ public class Write extends Test {
     List<String> tables = (List<String>) state.get("tableList");
 
     if (tables.isEmpty()) {
-      log.debug("No tables to ingest into");
+      log.trace("No tables to ingest into");
       return;
     }
 
@@ -54,10 +56,11 @@ public class Write extends Test {
     try {
       bw = env.getMultiTableBatchWriter().getBatchWriter(tableName);
     } catch (TableOfflineException e) {
-      log.error("Table " + tableName + " is offline!");
+      log.debug("Table " + tableName + " is offline");
       return;
     } catch (TableNotFoundException e) {
-      log.error("Table " + tableName + " not found!");
+      log.debug("Table " + tableName + " not found");
+      tables.remove(tableName);
       return;
     }
 
@@ -80,10 +83,21 @@ public class Write extends Test {
     alg.update(payloadBytes);
     m.put(meta, new Text("sha1"), new Value(alg.digest()));
 
-    // add mutation
-    bw.addMutation(m);
-
-    state.set("numWrites", state.getLong("numWrites") + 1);
+    try {
+      // add mutation
+      bw.addMutation(m);
+      state.set("numWrites", state.getLong("numWrites") + 1);
+    } catch (TableOfflineException e) {
+      log.debug("BatchWrite " + tableName + " failed, offline");
+    } catch (MutationsRejectedException mre) {
+      if (mre.getCause() instanceof TableDeletedException) {
+        log.debug("BatchWrite " + tableName + " failed, table deleted");
+        tables.remove(tableName);
+      } else if (mre.getCause() instanceof TableOfflineException)
+        log.debug("BatchWrite " + tableName + " failed, offline");
+      else
+        throw mre;
+    }
   }
 
 }
diff --git a/src/main/resources/randomwalk/modules/MultiTable.xml 
b/src/main/resources/randomwalk/modules/MultiTable.xml
index 6698e4a..55b7f0b 100644
--- a/src/main/resources/randomwalk/modules/MultiTable.xml
+++ b/src/main/resources/randomwalk/modules/MultiTable.xml
@@ -34,7 +34,6 @@
   <edge id="mt.BulkImport" weight="100"/>
   <edge id="mt.OfflineTable" weight="10"/>
   <edge id="mt.DropTable" weight="20"/>
-  <edge id="END" weight="1"/>
 </node>
 
 <node id="mt.Write">

Reply via email to