This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new d23c2fc Make MulitTable reuse tables (#191) d23c2fc is described below commit d23c2fcbe47ae72df9f667235b6b7f851882a184 Author: Mike Miller <mmil...@apache.org> AuthorDate: Fri Feb 18 11:47:29 2022 -0500 Make MulitTable reuse tables (#191) * This allows restarting rw tests and allow using of the same tables across multiple threads * Other various improvements to MultiTable --- .../testing/randomwalk/multitable/BulkImport.java | 47 +++++++++++++--------- .../testing/randomwalk/multitable/Commit.java | 18 ++++++++- .../testing/randomwalk/multitable/CopyTable.java | 7 ++++ .../randomwalk/multitable/MultiTableFixture.java | 26 ++++++++++-- .../testing/randomwalk/multitable/Write.java | 28 +++++++++---- .../resources/randomwalk/modules/MultiTable.xml | 1 - 6 files changed, 95 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java index 4c43b0a..b2a5c44 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.IteratorSetting.Column; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.data.Key; @@ -39,7 +40,7 @@ import org.apache.hadoop.io.Text; public class BulkImport extends Test { - public static final int LOTS = 100000; + public static final int ROWS = 1_000_000; public static final int COLS = 10; public static final List<Column> COLNAMES = new ArrayList<>(); public static final Text CHECK_COLUMN_FAMILY = new Text("cf"); @@ -61,7 +62,7 @@ public class BulkImport extends Test { List<String> tables = (List<String>) state.get("tableList"); if (tables.isEmpty()) { - log.debug("No tables to ingest into"); + log.trace("No tables to ingest into"); return; } @@ -77,8 +78,8 @@ public class BulkImport extends Test { final boolean useLegacyBulk = rand.nextBoolean(); TreeSet<String> rows = new TreeSet<>(); - for (int i = 0; i < LOTS; i++) - rows.add(uuid + String.format("__%05d", i)); + for (int i = 0; i < ROWS; i++) + rows.add(uuid + String.format("__%06d", i)); String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); log.debug("Preparing {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName); @@ -96,24 +97,30 @@ public class BulkImport extends Test { } f.close(); } - if (useLegacyBulk) { - env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(), - fail.toString(), true); - FileStatus[] failures = fs.listStatus(fail); - if (failures != null && failures.length > 0) { - state.set("bulkImportSuccess", "false"); - throw new Exception(failures.length + " failure files found importing files from " + dir); + log.debug("Starting {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName); + try { + if (useLegacyBulk) { + env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(), + fail.toString(), true); + FileStatus[] failures = fs.listStatus(fail); + if (failures != null && failures.length > 0) { + state.set("bulkImportSuccess", "false"); + throw new Exception(failures.length + " failure files found importing files from " + dir); + } + } else { + env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName) + .tableTime(true).load(); } - } else { - env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName) - .tableTime(true).load(); - } - fs.delete(dir, true); - fs.delete(fail, true); - log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}", - useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(), - markerColumnQualifier); + fs.delete(dir, true); + fs.delete(fail, true); + log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}", + useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(), + markerColumnQualifier); + } catch (TableNotFoundException tnfe) { + log.debug("Table {} was deleted", tableName); + tables.remove(tableName); + } } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Commit.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Commit.java index 0a65e33..1098499 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Commit.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Commit.java @@ -18,6 +18,9 @@ package org.apache.accumulo.testing.randomwalk.multitable; import java.util.Properties; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; import org.apache.accumulo.testing.randomwalk.Test; @@ -26,7 +29,20 @@ public class Commit extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - env.getMultiTableBatchWriter().flush(); + try { + env.getMultiTableBatchWriter().flush(); + } catch (TableOfflineException e) { + log.debug("Commit failed, table offline"); + return; + } catch (MutationsRejectedException mre) { + if (mre.getCause() instanceof TableDeletedException) + log.debug("Commit failed, table deleted"); + else if (mre.getCause() instanceof TableOfflineException) + log.debug("Commit failed, table offline"); + else + throw mre; + return; + } Long numWrites = state.getLong("numWrites"); Long totalWrites = state.getLong("totalWrites") + numWrites; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java index 634c1fc..eaffa93 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java @@ -52,6 +52,13 @@ public class CopyTable extends Test { int nextId = ((Integer) state.get("nextId")).intValue(); String dstTableName = String.format("%s_%d", state.getString("tableNamePrefix"), nextId); + if (env.getAccumuloClient().tableOperations().exists(dstTableName)) { + log.debug(dstTableName + " already exists so don't copy."); + nextId++; + state.set("nextId", Integer.valueOf(nextId)); + return; + } + String[] args = new String[3]; args[0] = env.getClientPropsPath(); args[1] = srcTableName; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java index 68c4cee..f056d3c 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java @@ -16,8 +16,12 @@ */ package org.apache.accumulo.testing.randomwalk.multitable; +import static java.util.stream.Collectors.toCollection; + import java.net.InetAddress; import java.util.List; +import java.util.OptionalInt; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.accumulo.core.client.AccumuloClient; @@ -34,11 +38,27 @@ public class MultiTableFixture extends Fixture { @Override public void setUp(State state, RandWalkEnv env) throws Exception { - String hostname = InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_"); + String prefix = String.format("multi_%s", hostname); + + Set<String> all = env.getAccumuloClient().tableOperations().list(); + List<String> tableList = all.stream().filter(s -> s.startsWith(prefix)) + .collect(toCollection(CopyOnWriteArrayList::new)); - state.set("tableNamePrefix", - String.format("multi_%s_%s_%d", hostname, env.getPid(), System.currentTimeMillis())); + log.debug("Existing MultiTables: {}", tableList); + // get the max of the last ID created + OptionalInt optionalInt = tableList.stream().mapToInt(s -> { + String[] strArr = s.split("_"); + return Integer.parseInt(strArr[strArr.length - 1]); + }).max(); + int nextId = optionalInt.orElse(-1) + 1; + log.debug("Next ID started at {}", nextId); + + state.set("tableNamePrefix", prefix); + state.set("nextId", nextId); + state.set("numWrites", 0L); + state.set("totalWrites", 0L); + state.set("tableList", tableList); state.set("nextId", 0); state.set("numWrites", 0L); state.set("totalWrites", 0L); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java index 65e5d75..414f4e1 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java @@ -25,6 +25,8 @@ import java.util.Random; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.data.Mutation; @@ -43,7 +45,7 @@ public class Write extends Test { List<String> tables = (List<String>) state.get("tableList"); if (tables.isEmpty()) { - log.debug("No tables to ingest into"); + log.trace("No tables to ingest into"); return; } @@ -54,10 +56,11 @@ public class Write extends Test { try { bw = env.getMultiTableBatchWriter().getBatchWriter(tableName); } catch (TableOfflineException e) { - log.error("Table " + tableName + " is offline!"); + log.debug("Table " + tableName + " is offline"); return; } catch (TableNotFoundException e) { - log.error("Table " + tableName + " not found!"); + log.debug("Table " + tableName + " not found"); + tables.remove(tableName); return; } @@ -80,10 +83,21 @@ public class Write extends Test { alg.update(payloadBytes); m.put(meta, new Text("sha1"), new Value(alg.digest())); - // add mutation - bw.addMutation(m); - - state.set("numWrites", state.getLong("numWrites") + 1); + try { + // add mutation + bw.addMutation(m); + state.set("numWrites", state.getLong("numWrites") + 1); + } catch (TableOfflineException e) { + log.debug("BatchWrite " + tableName + " failed, offline"); + } catch (MutationsRejectedException mre) { + if (mre.getCause() instanceof TableDeletedException) { + log.debug("BatchWrite " + tableName + " failed, table deleted"); + tables.remove(tableName); + } else if (mre.getCause() instanceof TableOfflineException) + log.debug("BatchWrite " + tableName + " failed, offline"); + else + throw mre; + } } } diff --git a/src/main/resources/randomwalk/modules/MultiTable.xml b/src/main/resources/randomwalk/modules/MultiTable.xml index 6698e4a..55b7f0b 100644 --- a/src/main/resources/randomwalk/modules/MultiTable.xml +++ b/src/main/resources/randomwalk/modules/MultiTable.xml @@ -34,7 +34,6 @@ <edge id="mt.BulkImport" weight="100"/> <edge id="mt.OfflineTable" weight="10"/> <edge id="mt.DropTable" weight="20"/> - <edge id="END" weight="1"/> </node> <node id="mt.Write">