This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new 689d6d4 Add a Bulk import to randomwalk MultiTable (#134) 689d6d4 is described below commit 689d6d411e5fda3aeca292656049de1718e949d5 Author: Mike Miller <mmil...@apache.org> AuthorDate: Tue Feb 9 16:48:55 2021 -0500 Add a Bulk import to randomwalk MultiTable (#134) * Create new BulkImport test in MultiTable for more realistic case * Tweak MultiTable.xml to drop 20 tables, same as create. This will keep the number of talbes down when running for long period. * Also set log4j testing logging to DEBUG --- conf/log4j.properties | 2 +- .../testing/randomwalk/multitable/BulkImport.java | 119 +++++++++++++++++++++ .../randomwalk/multitable/MultiTableFixture.java | 3 + .../resources/randomwalk/modules/MultiTable.xml | 10 +- 4 files changed, 130 insertions(+), 4 deletions(-) diff --git a/conf/log4j.properties b/conf/log4j.properties index 726fb2c..525a6e4 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -19,7 +19,7 @@ log4j.appender.CA.layout=org.apache.log4j.PatternLayout log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{3}] %-5p: %m%n log4j.logger.org.apache.accumulo=WARN -log4j.logger.org.apache.accumulo.testing=INFO +log4j.logger.org.apache.accumulo.testing=DEBUG log4j.logger.org.apache.curator=ERROR log4j.logger.org.apache.hadoop=WARN log4j.logger.org.apache.hadoop.mapreduce=ERROR diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java new file mode 100644 index 0000000..4c43b0a --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.randomwalk.multitable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.client.IteratorSetting.Column; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.testing.randomwalk.RandWalkEnv; +import org.apache.accumulo.testing.randomwalk.State; +import org.apache.accumulo.testing.randomwalk.Test; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +public class BulkImport extends Test { + + public static final int LOTS = 100000; + public static final int COLS = 10; + public static final List<Column> COLNAMES = new ArrayList<>(); + public static final Text CHECK_COLUMN_FAMILY = new Text("cf"); + static { + for (int i = 0; i < COLS; i++) { + COLNAMES.add(new Column(CHECK_COLUMN_FAMILY, new Text(String.format("%03d", i)))); + } + } + public static final Text MARKER_CF = new Text("marker"); + static final AtomicLong counter = new AtomicLong(); + + private static final Value ONE = new Value("1".getBytes()); + + /** + * Tests both the legacy (deprecated) and new bulk import methods. + */ + @SuppressWarnings({"deprecation", "unchecked"}) + public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception { + List<String> tables = (List<String>) state.get("tableList"); + + if (tables.isEmpty()) { + log.debug("No tables to ingest into"); + return; + } + + Random rand = new Random(); + String tableName = tables.get(rand.nextInt(tables.size())); + + String uuid = UUID.randomUUID().toString(); + final Path dir = new Path("/tmp/bulk", uuid); + final Path fail = new Path(dir.toString() + "_fail"); + final FileSystem fs = (FileSystem) state.get("fs"); + fs.mkdirs(fail); + final int parts = rand.nextInt(10) + 1; + final boolean useLegacyBulk = rand.nextBoolean(); + + TreeSet<String> rows = new TreeSet<>(); + for (int i = 0; i < LOTS; i++) + rows.add(uuid + String.format("__%05d", i)); + + String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); + log.debug("Preparing {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName); + + for (int i = 0; i < parts; i++) { + String fileName = dir + "/" + String.format("part_%d.rf", i); + RFileWriter f = RFile.newWriter().to(fileName).withFileSystem(fs).build(); + f.startDefaultLocalityGroup(); + for (String r : rows) { + Text row = new Text(r); + for (Column col : COLNAMES) { + f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), ONE); + } + f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE); + } + f.close(); + } + if (useLegacyBulk) { + env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(), + fail.toString(), true); + FileStatus[] failures = fs.listStatus(fail); + if (failures != null && failures.length > 0) { + state.set("bulkImportSuccess", "false"); + throw new Exception(failures.length + " failure files found importing files from " + dir); + } + } else { + env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName) + .tableTime(true).load(); + } + + fs.delete(dir, true); + fs.delete(fail, true); + log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}", + useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(), + markerColumnQualifier); + } + +} diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java index 1c65118..0549bca 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java @@ -27,6 +27,8 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.testing.randomwalk.Fixture; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; public class MultiTableFixture extends Fixture { @@ -41,6 +43,7 @@ public class MultiTableFixture extends Fixture { state.set("numWrites", Long.valueOf(0)); state.set("totalWrites", Long.valueOf(0)); state.set("tableList", new CopyOnWriteArrayList<String>()); + state.set("fs", FileSystem.get(new Configuration())); } @Override diff --git a/src/main/resources/randomwalk/modules/MultiTable.xml b/src/main/resources/randomwalk/modules/MultiTable.xml index a28d75e..55b7f0b 100644 --- a/src/main/resources/randomwalk/modules/MultiTable.xml +++ b/src/main/resources/randomwalk/modules/MultiTable.xml @@ -29,11 +29,11 @@ <node id="dummy.ToAll"> <edge id="mt.CreateTable" weight="20"/> - <edge id="mt.Write" weight="100"/> + <edge id="mt.Write" weight="10"/> <edge id="mt.CopyTable" weight="5"/> + <edge id="mt.BulkImport" weight="100"/> <edge id="mt.OfflineTable" weight="10"/> - <edge id="mt.DropTable" weight="3"/> - <edge id="END" weight="1"/> + <edge id="mt.DropTable" weight="20"/> </node> <node id="mt.Write"> @@ -57,4 +57,8 @@ <edge id="dummy.ToAll" weight="1"/> </node> +<node id="mt.BulkImport"> + <edge id="dummy.ToAll" weight="1"/> +</node> + </module>