http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/TestIngest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java index 47033f3..76bedf2 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java @@ -57,61 +57,62 @@ import org.apache.log4j.Logger; import com.beust.jcommander.Parameter; - public class TestIngest { public static final Authorizations AUTHS = new Authorizations("L1", "L2", "G1", "GROUP2"); - + public static class Opts extends ClientOnDefaultTable { - - @Parameter(names="--createTable") + + @Parameter(names = "--createTable") public boolean createTable = false; - - @Parameter(names="--splits", description="the number of splits to use when creating the table") + + @Parameter(names = "--splits", description = "the number of splits to use when creating the table") public int numsplits = 1; - - @Parameter(names="--start", description="the starting row number") + + @Parameter(names = "--start", description = "the starting row number") public int startRow = 0; - - @Parameter(names="--rows", description="the number of rows to ingest") + + @Parameter(names = "--rows", description = "the number of rows to ingest") public int rows = 100000; - - @Parameter(names="--cols", description="the number of columns to ingest per row") + + @Parameter(names = "--cols", description = "the number of columns to ingest per row") public int cols = 1; - - @Parameter(names="--random", description="insert random rows and use the given number to seed the psuedo-random number generator") + + @Parameter(names = "--random", description = "insert random rows and use the given number to seed the psuedo-random number generator") public Integer random = null; - - @Parameter(names="--size", description="the size of the value to ingest") + + @Parameter(names = "--size", description = "the size of the value to ingest") public int dataSize = 1000; - - @Parameter(names="--delete", description="delete values instead of inserting them") + + @Parameter(names = "--delete", description = "delete values instead of inserting them") public boolean delete = false; - - @Parameter(names={"-ts", "--timestamp"}, description="timestamp to use for all values") + + @Parameter(names = {"-ts", "--timestamp"}, description = "timestamp to use for all values") public long timestamp = -1; - - @Parameter(names="--rfile", description="generate data into a file that can be imported") + + @Parameter(names = "--rfile", description = "generate data into a file that can be imported") public String outputFile = null; - - @Parameter(names="--stride", description="the difference between successive row ids") + + @Parameter(names = "--stride", description = "the difference between successive row ids") public int stride; - @Parameter(names={"-cf","--columnFamily"}, description="place columns in this column family") + @Parameter(names = {"-cf", "--columnFamily"}, description = "place columns in this column family") public String columnFamily = "colf"; - @Parameter(names={"-cv","--columnVisibility"}, description="place columns in this column family", converter=VisibilityConverter.class) + @Parameter(names = {"-cv", "--columnVisibility"}, description = "place columns in this column family", converter = VisibilityConverter.class) public ColumnVisibility columnVisibility = new ColumnVisibility(); - - public Opts() { super("test_ingest"); } + + public Opts() { + super("test_ingest"); + } } - + @SuppressWarnings("unused") private static final Logger log = Logger.getLogger(TestIngest.class); - + public static void createTable(Connector conn, Opts args) throws AccumuloException, AccumuloSecurityException, TableExistsException { if (args.createTable) { TreeSet<Text> splits = getSplitPoints(args.startRow, args.startRow + args.rows, args.numsplits); - + if (!conn.tableOperations().exists(args.getTableName())) conn.tableOperations().create(args.getTableName()); try { @@ -122,27 +123,27 @@ public class TestIngest { } } } - + public static TreeSet<Text> getSplitPoints(long start, long end, long numsplits) { long splitSize = (end - start) / numsplits; - + long pos = start + splitSize; - + TreeSet<Text> splits = new TreeSet<Text>(); - + while (pos < end) { splits.add(new Text(String.format("row_%010d", pos))); pos += splitSize; } return splits; } - + public static byte[][] generateValues(int dataSize) { - + byte[][] bytevals = new byte[10][]; - + byte[] letters = {'1', '2', '3', '4', '5', '6', '7', '8', '9', '0'}; - + for (int i = 0; i < 10; i++) { bytevals[i] = new byte[dataSize]; for (int j = 0; j < dataSize; j++) @@ -150,46 +151,46 @@ public class TestIngest { } return bytevals; } - + private static byte ROW_PREFIX[] = "row_".getBytes(UTF_8); private static byte COL_PREFIX[] = "col_".getBytes(UTF_8); - + public static Text generateRow(int rowid, int startRow) { return new Text(FastFormat.toZeroPaddedString(rowid + startRow, 10, 10, ROW_PREFIX)); } - + public static byte[] genRandomValue(Random random, byte dest[], int seed, int row, int col) { random.setSeed((row ^ seed) ^ col); random.nextBytes(dest); toPrintableChars(dest); - + return dest; } - + public static void toPrintableChars(byte[] dest) { // transform to printable chars for (int i = 0; i < dest.length; i++) { dest[i] = (byte) (((0xff & dest[i]) % 92) + ' '); } } - + public static void main(String[] args) throws Exception { - + Opts opts = new Opts(); BatchWriterOpts bwOpts = new BatchWriterOpts(); opts.parseArgs(TestIngest.class.getName(), args, bwOpts); String name = TestIngest.class.getSimpleName(); DistributedTrace.enable(name); - + try { opts.startTracing(name); - + if (opts.debug) Logger.getLogger(TabletServerBatchWriter.class.getName()).setLevel(Level.TRACE); - + // test batch update - + ingest(opts.getConnector(), opts, bwOpts); } catch (Exception e) { throw new RuntimeException(e); @@ -199,34 +200,33 @@ public class TestIngest { } } - public static void ingest(Connector connector, Opts opts, BatchWriterOpts bwOpts) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException, - MutationsRejectedException, TableExistsException { + public static void ingest(Connector connector, Opts opts, BatchWriterOpts bwOpts) throws IOException, AccumuloException, AccumuloSecurityException, + TableNotFoundException, MutationsRejectedException, TableExistsException { long stopTime; - + byte[][] bytevals = generateValues(opts.dataSize); - + byte randomValue[] = new byte[opts.dataSize]; Random random = new Random(); - + long bytesWritten = 0; createTable(connector, opts); - + BatchWriter bw = null; FileSKVWriter writer = null; - + if (opts.outputFile != null) { Configuration conf = CachedConfiguration.getInstance(); FileSystem fs = FileSystem.get(conf); - writer = FileOperations.getInstance().openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, - AccumuloConfiguration.getDefaultConfiguration()); + writer = FileOperations.getInstance().openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, AccumuloConfiguration.getDefaultConfiguration()); writer.startDefaultLocalityGroup(); } else { bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig()); connector.securityOperations().changeUserAuthorizations(opts.principal, AUTHS); } Text labBA = new Text(opts.columnVisibility.getExpression()); - + long startTime = System.currentTimeMillis(); for (int i = 0; i < opts.rows; i++) { int rowid; @@ -235,13 +235,13 @@ public class TestIngest { } else { rowid = i; } - + Text row = generateRow(rowid, opts.startRow); Mutation m = new Mutation(row); for (int j = 0; j < opts.cols; j++) { Text colf = new Text(opts.columnFamily); Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX)); - + if (writer != null) { Key key = new Key(row, colf, colq, labBA); if (opts.timestamp >= 0) { @@ -249,15 +249,15 @@ public class TestIngest { } else { key.setTimestamp(startTime); } - + if (opts.delete) { key.setDeleted(true); } else { key.setDeleted(false); } - + bytesWritten += key.getSize(); - + if (opts.delete) { writer.append(key, new Value(new byte[0])); } else { @@ -267,16 +267,16 @@ public class TestIngest { } else { value = bytevals[j % bytevals.length]; } - + Value v = new Value(value); writer.append(key, v); bytesWritten += v.getSize(); } - + } else { Key key = new Key(row, colf, colq, labBA); bytesWritten += key.getSize(); - + if (opts.delete) { if (opts.timestamp >= 0) m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp); @@ -290,22 +290,22 @@ public class TestIngest { value = bytevals[j % bytevals.length]; } bytesWritten += value.length; - + if (opts.timestamp >= 0) { m.put(colf, colq, opts.columnVisibility, opts.timestamp, new Value(value, true)); } else { m.put(colf, colq, opts.columnVisibility, new Value(value, true)); - + } } } - + } if (bw != null) bw.addMutation(m); - + } - + if (writer != null) { writer.close(); } else if (bw != null) { @@ -317,22 +317,22 @@ public class TestIngest { System.err.println("ERROR : Not authorized to write to : " + entry.getKey() + " due to " + entry.getValue()); } } - + if (e.getConstraintViolationSummaries().size() > 0) { for (ConstraintViolationSummary cvs : e.getConstraintViolationSummaries()) { System.err.println("ERROR : Constraint violates : " + cvs); } } - + throw e; } } - + stopTime = System.currentTimeMillis(); - + int totalValues = opts.rows * opts.cols; double elapsed = (stopTime - startTime) / 1000.0; - + System.out.printf("%,12d records written | %,8d records/sec | %,12d bytes written | %,8d bytes/sec | %6.3f secs %n", totalValues, (int) (totalValues / elapsed), bytesWritten, (int) (bytesWritten / elapsed), elapsed); }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java b/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java index 1b553f4..ba5874d 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java +++ b/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java @@ -40,30 +40,30 @@ import org.apache.log4j.Logger; public class TestRandomDeletes { private static final Logger log = Logger.getLogger(TestRandomDeletes.class); private static Authorizations auths = new Authorizations("L1", "L2", "G1", "GROUP2"); - + static private class RowColumn implements Comparable<RowColumn> { Text row; Column column; long timestamp; - + public RowColumn(Text row, Column column, long timestamp) { this.row = row; this.column = column; this.timestamp = timestamp; } - + public int compareTo(RowColumn other) { int result = row.compareTo(other.row); if (result != 0) return result; return column.compareTo(other.column); } - + public String toString() { return row.toString() + ":" + column.toString(); } } - + private static TreeSet<RowColumn> scanAll(ClientOnDefaultTable opts, ScannerOpts scanOpts, String tableName) throws Exception { TreeSet<RowColumn> result = new TreeSet<RowColumn>(); Connector conn = opts.getConnector(); @@ -77,26 +77,28 @@ public class TestRandomDeletes { } return result; } - - private static long scrambleDeleteHalfAndCheck(ClientOnDefaultTable opts, ScannerOpts scanOpts, BatchWriterOpts bwOpts, String tableName, Set<RowColumn> rows) throws Exception { + + private static long scrambleDeleteHalfAndCheck(ClientOnDefaultTable opts, ScannerOpts scanOpts, BatchWriterOpts bwOpts, String tableName, Set<RowColumn> rows) + throws Exception { int result = 0; ArrayList<RowColumn> entries = new ArrayList<RowColumn>(rows); java.util.Collections.shuffle(entries); - + Connector connector = opts.getConnector(); BatchWriter mutations = connector.createBatchWriter(tableName, bwOpts.getBatchWriterConfig()); - + for (int i = 0; i < (entries.size() + 1) / 2; i++) { RowColumn rc = entries.get(i); Mutation m = new Mutation(rc.row); - m.putDelete(new Text(rc.column.columnFamily), new Text(rc.column.columnQualifier), new ColumnVisibility(rc.column.getColumnVisibility()), rc.timestamp + 1); + m.putDelete(new Text(rc.column.columnFamily), new Text(rc.column.columnQualifier), new ColumnVisibility(rc.column.getColumnVisibility()), + rc.timestamp + 1); mutations.addMutation(m); rows.remove(rc); result++; } - + mutations.close(); - + Set<RowColumn> current = scanAll(opts, scanOpts, tableName); current.removeAll(rows); if (current.size() > 0) { @@ -104,25 +106,24 @@ public class TestRandomDeletes { } return result; } - + static public void main(String[] args) { - + ClientOnDefaultTable opts = new ClientOnDefaultTable("test_ingest"); ScannerOpts scanOpts = new ScannerOpts(); BatchWriterOpts bwOpts = new BatchWriterOpts(); opts.parseArgs(TestRandomDeletes.class.getName(), args, scanOpts, bwOpts); - + log.info("starting random delete test"); - try { long deleted = 0; - + String tableName = opts.getTableName(); - + TreeSet<RowColumn> doomed = scanAll(opts, scanOpts, tableName); log.info("Got " + doomed.size() + " rows"); - + long startTime = System.currentTimeMillis(); while (true) { long half = scrambleDeleteHalfAndCheck(opts, scanOpts, bwOpts, tableName, doomed); @@ -131,7 +132,7 @@ public class TestRandomDeletes { break; } long stopTime = System.currentTimeMillis(); - + long elapsed = (stopTime - startTime) / 1000; log.info("deleted " + deleted + " values in " + elapsed + " seconds"); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java index 02eead2..8717c26 100644 --- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java @@ -34,7 +34,6 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.trace.Trace; - import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java index e4abed3..31e7b06 100644 --- a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java +++ b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java @@ -58,8 +58,8 @@ public class WrongTabletTest { Mutation mutation = new Mutation(new Text("row_0003750001")); mutation.putDelete(new Text("colf"), new Text("colq")); - client.update(Tracer.traceInfo(), context.rpcCreds(), new KeyExtent(new Text("!!"), null, - new Text("row_0003750000")).toThrift(), mutation.toThrift(), TDurability.DEFAULT); + client.update(Tracer.traceInfo(), context.rpcCreds(), new KeyExtent(new Text("!!"), null, new Text("row_0003750000")).toThrift(), mutation.toThrift(), + TDurability.DEFAULT); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java index 1a74962..a2687bb 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java @@ -43,81 +43,81 @@ import com.beust.jcommander.validators.PositiveInteger; public class ContinuousBatchWalker { static class Opts extends ContinuousWalk.Opts { - @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class) + @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class) long numToScan = 0; } public static void main(String[] args) throws Exception { - + Opts opts = new Opts(); ScannerOpts scanOpts = new ScannerOpts(); BatchScannerOpts bsOpts = new BatchScannerOpts(); opts.parseArgs(ContinuousBatchWalker.class.getName(), args, scanOpts, bsOpts); - + Random r = new Random(); Authorizations auths = opts.randomAuths.getAuths(r); Connector conn = opts.getConnector(); Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths); scanner.setBatchSize(scanOpts.scanBatchSize); - + BatchScanner bs = conn.createBatchScanner(opts.getTableName(), auths, bsOpts.scanThreads); bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS); while (true) { Set<Text> batch = getBatch(scanner, opts.min, opts.max, scanOpts.scanBatchSize, r); List<Range> ranges = new ArrayList<Range>(batch.size()); - + for (Text row : batch) { ranges.add(new Range(row)); } - + runBatchScan(scanOpts.scanBatchSize, bs, batch, ranges); - + UtilWaitThread.sleep(opts.sleepTime); } - + } - + /* * private static void runSequentialScan(Scanner scanner, List<Range> ranges) { Set<Text> srowsSeen = new HashSet<Text>(); long st1 = * System.currentTimeMillis(); int scount = 0; for (Range range : ranges) { scanner.setRange(range); - * + * * for (Entry<Key,Value> entry : scanner) { srowsSeen.add(entry.getKey().getRow()); scount++; } } - * - * + * + * * long st2 = System.currentTimeMillis(); System.out.println("SRQ "+(st2 - st1)+" "+srowsSeen.size() +" "+scount); } */ - + private static void runBatchScan(int batchSize, BatchScanner bs, Set<Text> batch, List<Range> ranges) { bs.setRanges(ranges); - + Set<Text> rowsSeen = new HashSet<Text>(); - + int count = 0; - + long t1 = System.currentTimeMillis(); - + for (Entry<Key,Value> entry : bs) { ContinuousWalk.validate(entry.getKey(), entry.getValue()); - + rowsSeen.add(entry.getKey().getRow()); - + addRow(batchSize, entry.getValue()); - + count++; } bs.close(); - + long t2 = System.currentTimeMillis(); - + if (!rowsSeen.equals(batch)) { HashSet<Text> copy1 = new HashSet<Text>(rowsSeen); HashSet<Text> copy2 = new HashSet<Text>(batch); - + copy1.removeAll(batch); copy2.removeAll(rowsSeen); - + System.out.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size()); System.err.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size()); System.err.println("Extra seen : " + copy1); @@ -125,12 +125,12 @@ public class ContinuousBatchWalker { } else { System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count, (int) (rowsSeen.size() / ((t2 - t1) / 1000.0))); } - + } - + private static void addRow(int batchSize, Value v) { byte[] val = v.get(); - + int offset = ContinuousWalk.getPrevRowOffset(val); if (offset > 1) { Text prevRow = new Text(); @@ -140,19 +140,19 @@ public class ContinuousBatchWalker { } } } - + private static HashSet<Text> rowsToQuery = new HashSet<Text>(); - + private static Set<Text> getBatch(Scanner scanner, long min, long max, int batchSize, Random r) { - + while (rowsToQuery.size() < batchSize) { byte[] scanStart = ContinuousIngest.genRow(min, max, r); scanner.setRange(new Range(new Text(scanStart), null)); - + int count = 0; - + long t1 = System.currentTimeMillis(); - + Iterator<Entry<Key,Value>> iter = scanner.iterator(); while (iter.hasNext() && rowsToQuery.size() < 3 * batchSize) { Entry<Key,Value> entry = iter.next(); @@ -160,24 +160,24 @@ public class ContinuousBatchWalker { addRow(batchSize, entry.getValue()); count++; } - + long t2 = System.currentTimeMillis(); - + System.out.println("FSB " + t1 + " " + (t2 - t1) + " " + count); - + UtilWaitThread.sleep(100); } - + HashSet<Text> ret = new HashSet<Text>(); - + Iterator<Text> iter = rowsToQuery.iterator(); - + for (int i = 0; i < batchSize; i++) { ret.add(iter.next()); iter.remove(); } - + return ret; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java index f54b8db..dba6ac9 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java @@ -54,9 +54,8 @@ import org.apache.log4j.PatternLayout; import com.beust.jcommander.IStringConverter; import com.beust.jcommander.Parameter; - public class ContinuousIngest { - + static public class BaseOpts extends MapReduceClientOnDefaultTable { public class DebugConverter implements IStringConverter<String> { @Override @@ -72,64 +71,66 @@ public class ContinuousIngest { return debugLog; } } - - @Parameter(names="--min", description="lowest random row number to use") + + @Parameter(names = "--min", description = "lowest random row number to use") long min = 0; - - @Parameter(names="--max", description="maximum random row number to use") + + @Parameter(names = "--max", description = "maximum random row number to use") long max = Long.MAX_VALUE; - - @Parameter(names="--debugLog", description="file to write debugging output", converter=DebugConverter.class) + + @Parameter(names = "--debugLog", description = "file to write debugging output", converter = DebugConverter.class) String debugLog = null; - BaseOpts() { super("ci"); } + BaseOpts() { + super("ci"); + } } - + public static class ShortConverter implements IStringConverter<Short> { @Override public Short convert(String value) { return Short.valueOf(value); } } - + static public class Opts extends BaseOpts { - @Parameter(names="--num", description="the number of entries to ingest") + @Parameter(names = "--num", description = "the number of entries to ingest") long num = Long.MAX_VALUE; - - @Parameter(names="--maxColF", description="maximum column family value to use", converter=ShortConverter.class) + + @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class) short maxColF = Short.MAX_VALUE; - - @Parameter(names="--maxColQ", description="maximum column qualifier value to use", converter=ShortConverter.class) + + @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class) short maxColQ = Short.MAX_VALUE; - - @Parameter(names="--addCheckSum", description="turn on checksums") + + @Parameter(names = "--addCheckSum", description = "turn on checksums") boolean checksum = false; - - @Parameter(names="--visibilities", description="read the visibilities to ingest with from a file") + + @Parameter(names = "--visibilities", description = "read the visibilities to ingest with from a file") String visFile = null; } - + private static final byte[] EMPTY_BYTES = new byte[0]; - + private static List<ColumnVisibility> visibilities; - + private static void initVisibilities(Opts opts) throws Exception { if (opts.visFile == null) { visibilities = Collections.singletonList(new ColumnVisibility()); return; } - + visibilities = new ArrayList<ColumnVisibility>(); - + FileSystem fs = FileSystem.get(new Configuration()); BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8)); - + String line; - + while ((line = in.readLine()) != null) { visibilities.add(new ColumnVisibility(line)); } - + in.close(); } @@ -138,35 +139,35 @@ public class ContinuousIngest { } public static void main(String[] args) throws Exception { - + Opts opts = new Opts(); BatchWriterOpts bwOpts = new BatchWriterOpts(); opts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts); - + initVisibilities(opts); if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) { throw new IllegalArgumentException("bad min and max"); } Connector conn = opts.getConnector(); - + if (!conn.tableOperations().exists(opts.getTableName())) { throw new TableNotFoundException(null, opts.getTableName(), "Consult the README and create the table before starting ingest."); } BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig()); bw = Trace.wrapAll(bw, new CountSampler(1024)); - + Random r = new Random(); - + byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8); - + System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId, UTF_8)); - + long count = 0; final int flushInterval = 1000000; final int maxDepth = 25; - + // always want to point back to flushed data. This way the previous item should // always exist in accumulo when verifying data. To do this make insert N point // back to the row from insert (N - flushInterval). The array below is used to keep @@ -175,9 +176,9 @@ public class ContinuousIngest { long firstRows[] = new long[flushInterval]; int firstColFams[] = new int[flushInterval]; int firstColQuals[] = new int[flushInterval]; - + long lastFlushTime = System.currentTimeMillis(); - + out: while (true) { // generate first set of nodes ColumnVisibility cv = getVisibility(r); @@ -186,22 +187,22 @@ public class ContinuousIngest { long rowLong = genLong(opts.min, opts.max, r); prevRows[index] = rowLong; firstRows[index] = rowLong; - + int cf = r.nextInt(opts.maxColF); int cq = r.nextInt(opts.maxColQ); - + firstColFams[index] = cf; firstColQuals[index] = cq; - + Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum); count++; bw.addMutation(m); } - + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); if (count >= opts.num) break out; - + // generate subsequent sets of nodes that link to previous set of nodes for (int depth = 1; depth < maxDepth; depth++) { for (int index = 0; index < flushInterval; index++) { @@ -212,12 +213,12 @@ public class ContinuousIngest { count++; bw.addMutation(m); } - + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); if (count >= opts.num) break out; } - + // create one big linked list, this makes all of the first inserts // point to something for (int index = 0; index < flushInterval - 1; index++) { @@ -230,7 +231,7 @@ public class ContinuousIngest { if (count >= opts.num) break out; } - + bw.close(); opts.stopTracing(); } @@ -243,17 +244,17 @@ public class ContinuousIngest { lastFlushTime = t2; return lastFlushTime; } - + public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r, boolean checksum) { // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead CRC32 cksum = null; - + byte[] rowString = genRow(rowLong); - + byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES); byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES); - + if (checksum) { cksum = new CRC32(); cksum.update(rowString); @@ -261,25 +262,25 @@ public class ContinuousIngest { cksum.update(cqString); cksum.update(cv.getExpression()); } - + Mutation m = new Mutation(new Text(rowString)); - + m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum)); return m; } - + public static final long genLong(long min, long max, Random r) { return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min; } - + static final byte[] genRow(long min, long max, Random r) { return genRow(genLong(min, max, r)); } - + static final byte[] genRow(long rowLong) { return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); } - + private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) { int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; if (cksum != null) @@ -297,17 +298,17 @@ public class ContinuousIngest { System.arraycopy(prevRow, 0, val, index, prevRow.length); index += prevRow.length; } - + val[index++] = ':'; - + if (cksum != null) { cksum.update(val, 0, index); cksum.getValue(); FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES); } - + // System.out.println("val "+new String(val)); - + return new Value(val); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java index 797413f..89ff515 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java @@ -50,7 +50,7 @@ import com.beust.jcommander.validators.PositiveInteger; /** * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes. - * + * */ public class ContinuousMoru extends Configured implements Tool { private static final String PREFIX = ContinuousMoru.class.getSimpleName() + "."; @@ -59,49 +59,49 @@ public class ContinuousMoru extends Configured implements Tool { private static final String MAX = PREFIX + "MAX"; private static final String MIN = PREFIX + "MIN"; private static final String CI_ID = PREFIX + "CI_ID"; - + static enum Counts { SELF_READ; } - + public static class CMapper extends Mapper<Key,Value,Text,Mutation> { - + private short max_cf; private short max_cq; private Random random; private String ingestInstanceId; private byte[] iiId; private long count; - + private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility(); - + @Override public void setup(Context context) throws IOException, InterruptedException { int max_cf = context.getConfiguration().getInt(MAX_CF, -1); int max_cq = context.getConfiguration().getInt(MAX_CQ, -1); - + if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE) throw new IllegalArgumentException(); - + this.max_cf = (short) max_cf; this.max_cq = (short) max_cq; - + random = new Random(); ingestInstanceId = context.getConfiguration().get(CI_ID); iiId = ingestInstanceId.getBytes(UTF_8); - + count = 0; } - + @Override public void map(Key key, Value data, Context context) throws IOException, InterruptedException { - + ContinuousWalk.validate(key, data); - + if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) { // only rewrite data not written by this M/R job byte[] val = data.get(); - + int offset = ContinuousWalk.getPrevRowOffset(val); if (offset > 0) { long rowLong = Long.parseLong(new String(val, offset, 16, UTF_8), 16); @@ -109,24 +109,24 @@ public class ContinuousMoru extends Configured implements Tool { .toArray(), random, true); context.write(null, m); } - + } else { ContinuousVerify.increment(context.getCounter(Counts.SELF_READ)); } } } - + static class Opts extends BaseOpts { - @Parameter(names = "--maxColF", description = "maximum column family value to use", converter=ShortConverter.class) + @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class) short maxColF = Short.MAX_VALUE; - - @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter=ShortConverter.class) + + @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class) short maxColQ = Short.MAX_VALUE; - + @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class) int maxMaps = 0; } - + @Override public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException { Opts opts = new Opts(); @@ -136,10 +136,10 @@ public class ContinuousMoru extends Configured implements Tool { @SuppressWarnings("deprecation") Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); job.setJarByClass(this.getClass()); - + job.setInputFormatClass(AccumuloInputFormat.class); opts.setAccumuloConfigs(job); - + // set up ranges try { Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); @@ -148,28 +148,28 @@ public class ContinuousMoru extends Configured implements Tool { } catch (Exception e) { throw new IOException(e); } - + job.setMapperClass(CMapper.class); - + job.setNumReduceTasks(0); - + job.setOutputFormatClass(AccumuloOutputFormat.class); AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig()); - + Configuration conf = job.getConfiguration(); conf.setLong(MIN, opts.min); conf.setLong(MAX, opts.max); conf.setInt(MAX_CF, opts.maxColF); conf.setInt(MAX_CQ, opts.maxColQ); conf.set(CI_ID, UUID.randomUUID().toString()); - + job.waitForCompletion(true); opts.stopTracing(); return job.isSuccessful() ? 0 : 1; } - + /** - * + * * @param args * instanceName zookeepers username password table columns outputpath */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java index dcc3e49..73048f6 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java @@ -33,28 +33,28 @@ import org.apache.hadoop.io.Text; import com.beust.jcommander.Parameter; public class ContinuousQuery { - + public static class Opts extends BaseOpts { - @Parameter(names="--sleep", description="the time to wait between queries", converter=TimeConverter.class) + @Parameter(names = "--sleep", description = "the time to wait between queries", converter = TimeConverter.class) long sleepTime = 100; } - + public static void main(String[] args) throws Exception { Opts opts = new Opts(); ScannerOpts scanOpts = new ScannerOpts(); opts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts); - + Connector conn = opts.getConnector(); Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.auths); scanner.setBatchSize(scanOpts.scanBatchSize); - + Random r = new Random(); - + while (true) { byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r); - + int count = 0; - + long t1 = System.currentTimeMillis(); scanner.setRange(new Range(new Text(row))); for (Entry<Key,Value> entry : scanner) { @@ -62,9 +62,9 @@ public class ContinuousQuery { count++; } long t2 = System.currentTimeMillis(); - + System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, UTF_8), (t2 - t1), count); - + if (opts.sleepTime > 0) Thread.sleep(opts.sleepTime); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java index 60154df..f68377a 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java @@ -36,50 +36,50 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.validators.PositiveInteger; public class ContinuousScanner { - + static class Opts extends ContinuousWalk.Opts { - @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class) + @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class) long numToScan = 0; } - + public static void main(String[] args) throws Exception { Opts opts = new Opts(); ScannerOpts scanOpts = new ScannerOpts(); opts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts); - + Random r = new Random(); long distance = 1000000000000l; - + Connector conn = opts.getConnector(); Authorizations auths = opts.randomAuths.getAuths(r); Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths); scanner.setBatchSize(scanOpts.scanBatchSize); - + double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0)); - + while (true) { long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r); byte[] scanStart = ContinuousIngest.genRow(startRow); byte[] scanStop = ContinuousIngest.genRow(startRow + distance); - + scanner.setRange(new Range(new Text(scanStart), new Text(scanStop))); - + int count = 0; Iterator<Entry<Key,Value>> iter = scanner.iterator(); - + long t1 = System.currentTimeMillis(); - + while (iter.hasNext()) { Entry<Key,Value> entry = iter.next(); ContinuousWalk.validate(entry.getKey(), entry.getValue()); count++; } - + long t2 = System.currentTimeMillis(); - + // System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan); - + if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) { if (count == 0) { distance = distance * 10; @@ -91,15 +91,15 @@ public class ContinuousScanner { ratio = ratio - (ratio - 1.0) * (2.0 / 3.0); distance = (long) (ratio * distance); } - + // System.out.println("P2 "+delta +" "+numToScan+" "+distance+" "+((double)numToScan/count )); } - + System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count); - + if (opts.sleepTime > 0) UtilWaitThread.sleep(opts.sleepTime); } - + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java index 1e3b636..7c2f93b 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java @@ -58,15 +58,15 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.log4j.Logger; public class ContinuousStatsCollector { - + private static final Logger log = Logger.getLogger(ContinuousStatsCollector.class); - + static class StatsCollectionTask extends TimerTask { - + private final String tableId; private final Opts opts; private final int scanBatchSize; - + public StatsCollectionTask(Opts opts, int scanBatchSize) { this.opts = opts; this.scanBatchSize = scanBatchSize; @@ -76,7 +76,7 @@ public class ContinuousStatsCollector { + " ACCUMULO_DU ACCUMULO_DIRS ACCUMULO_FILES TABLE_DU TABLE_DIRS TABLE_FILES" + " MAP_TASK MAX_MAP_TASK REDUCE_TASK MAX_REDUCE_TASK TASK_TRACKERS BLACK_LISTED MIN_FILES/TABLET MAX_FILES/TABLET AVG_FILES/TABLET STDDEV_FILES/TABLET"); } - + @Override public void run() { try { @@ -84,37 +84,37 @@ public class ContinuousStatsCollector { String fsStats = getFSStats(); String mrStats = getMRStats(); String tabletStats = getTabletStats(); - + System.out.println(System.currentTimeMillis() + " " + acuStats + " " + fsStats + " " + mrStats + " " + tabletStats); } catch (Exception e) { - log.error(System.currentTimeMillis()+" - Failed to collect stats", e); + log.error(System.currentTimeMillis() + " - Failed to collect stats", e); } } - + private String getTabletStats() throws Exception { - + Connector conn = opts.getConnector(); Scanner scanner = conn.createScanner(MetadataTable.NAME, opts.auths); scanner.setBatchSize(scanBatchSize); scanner.fetchColumnFamily(DataFileColumnFamily.NAME); scanner.addScanIterator(new IteratorSetting(1000, "cfc", ColumnFamilyCounter.class.getName())); scanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); - + Stat s = new Stat(); - + int count = 0; for (Entry<Key,Value> entry : scanner) { count++; s.addStat(Long.parseLong(entry.getValue().toString())); } - + if (count > 0) return String.format("%d %d %.3f %.3f", s.getMin(), s.getMax(), s.getAverage(), s.getStdDev()); else return "0 0 0 0"; - + } - + private String getFSStats() throws Exception { VolumeManager fs = VolumeManagerImpl.get(); long length1 = 0, dcount1 = 0, fcount1 = 0; @@ -129,22 +129,22 @@ public class ContinuousStatsCollector { dcount2 += contentSummary.getDirectoryCount(); fcount2 += contentSummary.getFileCount(); } - + return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2; } - + private String getACUStats() throws Exception { - + MasterClientService.Iface client = null; try { ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.principal, opts.getToken()), new ServerConfigurationFactory( opts.getInstance()).getConfiguration()); client = MasterClient.getConnectionWithRetry(context); MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds()); - + TableInfo all = new TableInfo(); Map<String,TableInfo> tableSummaries = new HashMap<String,TableInfo>(); - + for (TabletServerStatus server : stats.tServerInfo) { for (Entry<String,TableInfo> info : server.tableMap.entrySet()) { TableInfo tableSummary = tableSummaries.get(info.getKey()); @@ -156,42 +156,42 @@ public class ContinuousStatsCollector { TableInfoUtil.add(all, info.getValue()); } } - + TableInfo ti = tableSummaries.get(tableId); - + return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate + " " + (long) all.queryRate + " " + ti.recs + " " + ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate + " " + ti.tablets + " " + ti.onlineTablets; - + } finally { if (client != null) MasterClient.close(client); } - + } - + } - + private static String getMRStats() throws Exception { Configuration conf = CachedConfiguration.getInstance(); // No alternatives for hadoop 20 JobClient jc = new JobClient(new org.apache.hadoop.mapred.JobConf(conf)); - + ClusterStatus cs = jc.getClusterStatus(false); - + return "" + cs.getMapTasks() + " " + cs.getMaxMapTasks() + " " + cs.getReduceTasks() + " " + cs.getMaxReduceTasks() + " " + cs.getTaskTrackers() + " " + cs.getBlacklistedTrackers(); - + } - + static class Opts extends ClientOnRequiredTable {} - + public static void main(String[] args) { Opts opts = new Opts(); ScannerOpts scanOpts = new ScannerOpts(); opts.parseArgs(ContinuousStatsCollector.class.getName(), args, scanOpts); Timer jtimer = new Timer(); - + jtimer.schedule(new StatsCollectionTask(opts, scanOpts.scanBatchSize), 0, 30000); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java index 049f9b8..461d226 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java @@ -234,7 +234,7 @@ public class ContinuousVerify extends Configured implements Tool { } /** - * + * * @param args * instanceName zookeepers username password table columns outputpath */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java index 9253093..60f8ec2 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java @@ -45,7 +45,7 @@ import com.beust.jcommander.IStringConverter; import com.beust.jcommander.Parameter; public class ContinuousWalk { - + static public class Opts extends ContinuousQuery.Opts { class RandomAuthsConverter implements IStringConverter<RandomAuths> { @Override @@ -57,35 +57,35 @@ public class ContinuousWalk { } } } - + @Parameter(names = "--authsFile", description = "read the authorities to use from a file") RandomAuths randomAuths = new RandomAuths(); } - + static class BadChecksumException extends RuntimeException { private static final long serialVersionUID = 1L; - + public BadChecksumException(String msg) { super(msg); } - + } - + static class RandomAuths { private List<Authorizations> auths; - + RandomAuths() { auths = Collections.singletonList(Authorizations.EMPTY); } - + RandomAuths(String file) throws IOException { if (file == null) { auths = Collections.singletonList(Authorizations.EMPTY); return; } - + auths = new ArrayList<Authorizations>(); - + FileSystem fs = FileSystem.get(new Configuration()); BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), UTF_8)); try { @@ -97,30 +97,30 @@ public class ContinuousWalk { in.close(); } } - + Authorizations getAuths(Random r) { return auths.get(r.nextInt(auths.size())); } } - + public static void main(String[] args) throws Exception { Opts opts = new Opts(); opts.parseArgs(ContinuousWalk.class.getName(), args); - + Connector conn = opts.getConnector(); - + Random r = new Random(); - + ArrayList<Value> values = new ArrayList<Value>(); - + while (true) { Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.randomAuths.getAuths(r)); String row = findAStartRow(opts.min, opts.max, scanner, r); - + while (row != null) { - + values.clear(); - + long t1 = System.currentTimeMillis(); Span span = Trace.on("walk"); try { @@ -133,9 +133,9 @@ public class ContinuousWalk { span.stop(); } long t2 = System.currentTimeMillis(); - + System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size()); - + if (values.size() > 0) { row = getPrevRow(values.get(r.nextInt(values.size()))); } else { @@ -143,27 +143,27 @@ public class ContinuousWalk { System.err.printf("MIS %d %s%n", t1, row); row = null; } - + if (opts.sleepTime > 0) Thread.sleep(opts.sleepTime); } - + if (opts.sleepTime > 0) Thread.sleep(opts.sleepTime); } } - + private static String findAStartRow(long min, long max, Scanner scanner, Random r) { - + byte[] scanStart = ContinuousIngest.genRow(min, max, r); scanner.setRange(new Range(new Text(scanStart), null)); scanner.setBatchSize(100); - + int count = 0; String pr = null; - + long t1 = System.currentTimeMillis(); - + for (Entry<Key,Value> entry : scanner) { validate(entry.getKey(), entry.getValue()); pr = getPrevRow(entry.getValue()); @@ -171,66 +171,66 @@ public class ContinuousWalk { if (pr != null) break; } - + long t2 = System.currentTimeMillis(); - + System.out.printf("FSR %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count); - + return pr; } - + static int getPrevRowOffset(byte val[]) { if (val.length == 0) throw new IllegalArgumentException(); if (val[53] != ':') throw new IllegalArgumentException(new String(val, UTF_8)); - + // prev row starts at 54 if (val[54] != ':') { if (val[54 + 16] != ':') throw new IllegalArgumentException(new String(val, UTF_8)); return 54; } - + return -1; } - + static String getPrevRow(Value value) { - + byte[] val = value.get(); int offset = getPrevRowOffset(val); if (offset > 0) { return new String(val, offset, 16, UTF_8); } - + return null; } - + static int getChecksumOffset(byte val[]) { if (val[val.length - 1] != ':') { if (val[val.length - 9] != ':') throw new IllegalArgumentException(new String(val, UTF_8)); return val.length - 8; } - + return -1; } - + static void validate(Key key, Value value) throws BadChecksumException { int ckOff = getChecksumOffset(value.get()); if (ckOff < 0) return; - + long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16); - + CRC32 cksum = new CRC32(); - + cksum.update(key.getRowData().toArray()); cksum.update(key.getColumnFamilyData().toArray()); cksum.update(key.getColumnQualifierData().toArray()); cksum.update(key.getColumnVisibilityData().toArray()); cksum.update(value.get(), 0, ckOff); - + if (cksum.getValue() != storedCksum) { throw new BadChecksumException("Checksum invalid " + key + " " + value); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java b/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java index 1320ed5..ba39f1c 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java @@ -23,27 +23,27 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; /** - * + * */ public class GenSplits { - + static class Opts { @Parameter(names = "--min", description = "minimum row") long minRow = 0; - + @Parameter(names = "--max", description = "maximum row") long maxRow = Long.MAX_VALUE; - + @Parameter(description = "<num tablets>") List<String> args = null; } public static void main(String[] args) { - + Opts opts = new Opts(); JCommander jcommander = new JCommander(opts); jcommander.setProgramName(GenSplits.class.getSimpleName()); - + try { jcommander.parse(args); } catch (ParameterException pe) { @@ -56,14 +56,14 @@ public class GenSplits { jcommander.usage(); System.exit(-1); } - + int numTablets = Integer.parseInt(opts.args.get(0)); - + if (numTablets < 1) { System.err.println("ERROR: numTablets < 1"); System.exit(-1); } - + if (opts.minRow >= opts.maxRow) { System.err.println("ERROR: min >= max"); System.exit(-1); @@ -73,13 +73,13 @@ public class GenSplits { long distance = ((opts.maxRow - opts.minRow) / numTablets) + 1; long split = distance; for (int i = 0; i < numSplits; i++) { - + String s = String.format("%016x", split + opts.minRow); - + while (s.charAt(s.length() - 1) == '0') { s = s.substring(0, s.length() - 1); } - + System.out.println(s); split += distance; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java b/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java index b3aae46..6362afd 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java @@ -34,15 +34,15 @@ import java.util.TreeSet; class HistData<T> implements Comparable<HistData<T>>, Serializable { private static final long serialVersionUID = 1L; - + T bin; long count; - + HistData(T bin) { this.bin = bin; count = 0; } - + @SuppressWarnings("unchecked") public int compareTo(HistData<T> o) { return ((Comparable<T>) bin).compareTo(o.bin); @@ -50,55 +50,55 @@ class HistData<T> implements Comparable<HistData<T>>, Serializable { } public class Histogram<T> implements Serializable { - + private static final long serialVersionUID = 1L; - + protected long sum; protected HashMap<T,HistData<T>> counts; - + public Histogram() { sum = 0; counts = new HashMap<T,HistData<T>>(); } - + public void addPoint(T x) { addPoint(x, 1); } - + public void addPoint(T x, long y) { - + HistData<T> hd = counts.get(x); if (hd == null) { hd = new HistData<T>(x); counts.put(x, hd); } - + hd.count += y; sum += y; } - + public long getCount(T x) { HistData<T> hd = counts.get(x); if (hd == null) return 0; return hd.count; } - + public double getPercentage(T x) { if (getSum() == 0) { return 0; } return (double) getCount(x) / (double) getSum() * 100.0; } - + public long getSum() { return sum; } - + public List<T> getKeysInCountSortedOrder() { - + ArrayList<HistData<T>> sortedCounts = new ArrayList<HistData<T>>(counts.values()); - + Collections.sort(sortedCounts, new Comparator<HistData<T>>() { public int compare(HistData<T> o1, HistData<T> o2) { if (o1.count < o2.count) @@ -108,60 +108,60 @@ public class Histogram<T> implements Serializable { return 0; } }); - + ArrayList<T> sortedKeys = new ArrayList<T>(); - + for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { HistData<T> hd = iter.next(); sortedKeys.add(hd.bin); } - + return sortedKeys; } - + public void print(StringBuilder out) { TreeSet<HistData<T>> sortedCounts = new TreeSet<HistData<T>>(counts.values()); - + int maxValueLen = 0; - + for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { HistData<T> hd = iter.next(); if (("" + hd.bin).length() > maxValueLen) { maxValueLen = ("" + hd.bin).length(); } } - + double psum = 0; - + for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { HistData<T> hd = iter.next(); - + psum += getPercentage(hd.bin); - + out.append(String.format(" %" + (maxValueLen + 1) + "s %,16d %6.2f%s %6.2f%s%n", hd.bin + "", hd.count, getPercentage(hd.bin), "%", psum, "%")); } out.append(String.format("%n %" + (maxValueLen + 1) + "s %,16d %n", "TOTAL", sum)); } - + public void save(String file) throws IOException { - + FileOutputStream fos = new FileOutputStream(file); BufferedOutputStream bos = new BufferedOutputStream(fos); PrintStream ps = new PrintStream(bos, false, UTF_8.name()); - + TreeSet<HistData<T>> sortedCounts = new TreeSet<HistData<T>>(counts.values()); for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { HistData<T> hd = iter.next(); ps.println(" " + hd.bin + " " + hd.count); } - + ps.close(); } - + public Set<T> getKeys() { return counts.keySet(); } - + public void clear() { counts.clear(); sum = 0; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java b/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java index cab3126..d77f427 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java @@ -23,38 +23,39 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; + import org.apache.log4j.Logger; public class PrintScanTimeHistogram { - + private static final Logger log = Logger.getLogger(PrintScanTimeHistogram.class); public static void main(String[] args) throws Exception { Histogram<String> srqHist = new Histogram<String>(); Histogram<String> fsrHist = new Histogram<String>(); - + processFile(System.in, srqHist, fsrHist); - + StringBuilder report = new StringBuilder(); report.append(String.format("%n *** Single row queries histogram *** %n")); srqHist.print(report); log.info(report); - + report = new StringBuilder(); report.append(String.format("%n *** Find start rows histogram *** %n")); fsrHist.print(report); log.info(report); } - + private static void processFile(InputStream ins, Histogram<String> srqHist, Histogram<String> fsrHist) throws FileNotFoundException, IOException { String line; BufferedReader in = new BufferedReader(new InputStreamReader(ins, UTF_8)); - + while ((line = in.readLine()) != null) { - + try { String[] tokens = line.split(" "); - + String type = tokens[0]; if (type.equals("SRQ")) { long delta = Long.parseLong(tokens[3]); @@ -66,16 +67,16 @@ public class PrintScanTimeHistogram { fsrHist.addPoint(point); } } catch (Exception e) { - log.error("Failed to process line '"+line+"'.", e); + log.error("Failed to process line '" + line + "'.", e); } } - + in.close(); } - + private static String generateHistPoint(long delta) { String point; - + if (delta / 1000.0 < .1) { point = String.format("%07.2f", delta / 1000.0); if (point.equals("0000.10")) @@ -89,5 +90,5 @@ public class PrintScanTimeHistogram { } return point; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java b/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java index 0824948..186e8d0 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java @@ -33,16 +33,16 @@ import org.apache.accumulo.core.cli.Help; import com.beust.jcommander.Parameter; public class TimeBinner { - + enum Operation { AVG, SUM, MIN, MAX, COUNT, CUMULATIVE, AMM, // avg,min,max AMM_HACK1 // special case } - + private static class DoubleWrapper { double d; } - + private static DoubleWrapper get(long l, HashMap<Long,DoubleWrapper> m, double init) { DoubleWrapper dw = m.get(l); if (dw == null) { @@ -52,49 +52,49 @@ public class TimeBinner { } return dw; } - + static class Opts extends Help { - @Parameter(names="--period", description="period", converter=TimeConverter.class, required=true) + @Parameter(names = "--period", description = "period", converter = TimeConverter.class, required = true) long period = 0; - @Parameter(names="--timeColumn", description="time column", required=true) + @Parameter(names = "--timeColumn", description = "time column", required = true) int timeColumn = 0; - @Parameter(names="--dataColumn", description="data column", required=true) + @Parameter(names = "--dataColumn", description = "data column", required = true) int dataColumn = 0; - @Parameter(names="--operation", description="one of: AVG, SUM, MIN, MAX, COUNT", required=true) + @Parameter(names = "--operation", description = "one of: AVG, SUM, MIN, MAX, COUNT", required = true) String operation; - @Parameter(names="--dateFormat", description="a SimpleDataFormat string that describes the data format") + @Parameter(names = "--dateFormat", description = "a SimpleDataFormat string that describes the data format") String dateFormat = "MM/dd/yy-HH:mm:ss"; } - + public static void main(String[] args) throws Exception { Opts opts = new Opts(); opts.parseArgs(TimeBinner.class.getName(), args); - + Operation operation = Operation.valueOf(opts.operation); SimpleDateFormat sdf = new SimpleDateFormat(opts.dateFormat); - + BufferedReader in = new BufferedReader(new InputStreamReader(System.in, UTF_8)); - + String line = null; - + HashMap<Long,DoubleWrapper> aggregation1 = new HashMap<Long,DoubleWrapper>(); HashMap<Long,DoubleWrapper> aggregation2 = new HashMap<Long,DoubleWrapper>(); HashMap<Long,DoubleWrapper> aggregation3 = new HashMap<Long,DoubleWrapper>(); HashMap<Long,DoubleWrapper> aggregation4 = new HashMap<Long,DoubleWrapper>(); - + while ((line = in.readLine()) != null) { - + try { String tokens[] = line.split("\\s+"); - + long time = (long) Double.parseDouble(tokens[opts.timeColumn]); double data = Double.parseDouble(tokens[opts.dataColumn]); - + time = (time / opts.period) * opts.period; - + double data_min = data; double data_max = data; - + switch (operation) { case AMM_HACK1: { data_min = Double.parseDouble(tokens[opts.dataColumn - 2]); @@ -105,20 +105,20 @@ public class TimeBinner { DoubleWrapper mindw = get(time, aggregation3, Double.POSITIVE_INFINITY); if (data < mindw.d) mindw.d = data_min; - + DoubleWrapper maxdw = get(time, aggregation4, Double.NEGATIVE_INFINITY); if (data > maxdw.d) maxdw.d = data_max; - + // fall through to AVG } case AVG: { DoubleWrapper sumdw = get(time, aggregation1, 0); DoubleWrapper countdw = get(time, aggregation2, 0); - + sumdw.d += data; countdw.d++; - + break; } case MAX: { @@ -147,20 +147,20 @@ public class TimeBinner { break; } } - + } catch (Exception e) { System.err.println("Failed to process line : " + line + " " + e.getMessage()); } } - + TreeMap<Long,DoubleWrapper> sorted = new TreeMap<Long,DoubleWrapper>(aggregation1); - + Set<Entry<Long,DoubleWrapper>> es = sorted.entrySet(); - + double cumulative = 0; for (Entry<Long,DoubleWrapper> entry : es) { String value; - + switch (operation) { case AMM_HACK1: case AMM: { @@ -181,9 +181,9 @@ public class TimeBinner { default: value = "" + entry.getValue().d; } - + System.out.println(sdf.format(new Date(entry.getKey())) + " " + value); } - + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java b/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java index dffd6c3..7d2c65b 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java @@ -51,7 +51,7 @@ import com.beust.jcommander.Parameter; /** * BUGS This code does not handle the fact that these files could include log events from previous months. It therefore it assumes all dates are in the current * month. One solution might be to skip log files that haven't been touched in the last month, but that doesn't prevent newer files that have old dates in them. - * + * */ public class UndefinedAnalyzer { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java b/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java index b7ee6fc..c589137 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java @@ -23,10 +23,10 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.Combiner; public class BadCombiner extends Combiner { - + @Override public Value reduce(Key key, Iterator<Value> iter) { throw new IllegalStateException(); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java index 1c62720..5d13d1d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java @@ -23,17 +23,17 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.WrappingIterator; public class BadIterator extends WrappingIterator { - + @Override public Key getTopKey() { throw new NullPointerException(); } - + @Override public boolean hasTop() { throw new NullPointerException(); } - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java index d112b5b..62afb32 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java @@ -24,17 +24,17 @@ import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.commons.io.FileUtils; public class CacheTestClean { - + public static void main(String[] args) throws Exception { String rootDir = args[0]; File reportDir = new File(args[1]); - + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - + if (zoo.exists(rootDir)) { zoo.recursiveDelete(rootDir, NodeMissingPolicy.FAIL); } - + if (reportDir.exists()) { FileUtils.deleteDirectory(reportDir); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java index fdc704d..82eef6c 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java @@ -35,48 +35,48 @@ public class CacheTestReader { String reportDir = args[1]; String keepers = args[2]; int numData = CacheTestWriter.NUM_DATA; - + File myfile = new File(reportDir + "/" + UUID.randomUUID()); myfile.deleteOnExit(); - + ZooCache zc = new ZooCache(keepers, 30000); - + while (true) { if (myfile.exists()) myfile.delete(); - + if (zc.get(rootDir + "/die") != null) { return; } - + Map<String,String> readData = new TreeMap<String,String>(); - + for (int i = 0; i < numData; i++) { byte[] v = zc.get(rootDir + "/data" + i); if (v != null) readData.put(rootDir + "/data" + i, new String(v, UTF_8)); } - + byte[] v = zc.get(rootDir + "/dataS"); if (v != null) readData.put(rootDir + "/dataS", new String(v, UTF_8)); - + List<String> children = zc.getChildren(rootDir + "/dir"); if (children != null) for (String child : children) { readData.put(rootDir + "/dir/" + child, ""); } - + FileOutputStream fos = new FileOutputStream(myfile); ObjectOutputStream oos = new ObjectOutputStream(fos); - + oos.writeObject(readData); - + fos.close(); oos.close(); - + UtilWaitThread.sleep(20); } - + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java index e1be8e6..3a3baf0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java @@ -35,40 +35,40 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; public class CacheTestWriter { - + static final int NUM_DATA = 3; - + public static void main(String[] args) throws Exception { IZooReaderWriter zk = ZooReaderWriter.getInstance(); - + String rootDir = args[0]; File reportDir = new File(args[1]); int numReaders = Integer.parseInt(args[2]); int numVerifications = Integer.parseInt(args[3]); int numData = NUM_DATA; - + boolean dataSExists = false; int count = 0; - + zk.putPersistentData(rootDir, new byte[0], NodeExistsPolicy.FAIL); for (int i = 0; i < numData; i++) { zk.putPersistentData(rootDir + "/data" + i, new byte[0], NodeExistsPolicy.FAIL); } - + zk.putPersistentData(rootDir + "/dir", new byte[0], NodeExistsPolicy.FAIL); - + ArrayList<String> children = new ArrayList<String>(); - + Random r = new Random(); - + while (count++ < numVerifications) { - + Map<String,String> expectedData = null; // change children in dir - + for (int u = 0; u < r.nextInt(4) + 1; u++) { expectedData = new TreeMap<String,String>(); - + if (r.nextFloat() < .5) { String child = UUID.randomUUID().toString(); zk.putPersistentData(rootDir + "/dir/" + child, new byte[0], NodeExistsPolicy.SKIP); @@ -78,32 +78,32 @@ public class CacheTestWriter { String child = children.remove(index); zk.recursiveDelete(rootDir + "/dir/" + child, NodeMissingPolicy.FAIL); } - + for (String child : children) { expectedData.put(rootDir + "/dir/" + child, ""); } - + // change values for (int i = 0; i < numData; i++) { byte data[] = Long.toString(r.nextLong(), 16).getBytes(UTF_8); zk.putPersistentData(rootDir + "/data" + i, data, NodeExistsPolicy.OVERWRITE); expectedData.put(rootDir + "/data" + i, new String(data, UTF_8)); } - + // test a data node that does not always exists... if (r.nextFloat() < .5) { - + byte data[] = Long.toString(r.nextLong(), 16).getBytes(UTF_8); - + if (!dataSExists) { zk.putPersistentData(rootDir + "/dataS", data, NodeExistsPolicy.SKIP); dataSExists = true; } else { zk.putPersistentData(rootDir + "/dataS", data, NodeExistsPolicy.OVERWRITE); } - + expectedData.put(rootDir + "/dataS", new String(data, UTF_8)); - + } else { if (dataSExists) { zk.recursiveDelete(rootDir + "/dataS", NodeMissingPolicy.FAIL); @@ -111,34 +111,34 @@ public class CacheTestWriter { } } } - + // change children in dir and change values - + System.out.println("expectedData " + expectedData); - + // wait for all readers to see changes while (true) { - + File[] files = reportDir.listFiles(); - + System.out.println("files.length " + files.length); - + if (files.length == numReaders) { boolean ok = true; - + for (int i = 0; i < files.length; i++) { try { FileInputStream fis = new FileInputStream(files[i]); ObjectInputStream ois = new ObjectInputStream(fis); - + @SuppressWarnings("unchecked") Map<String,String> readerMap = (Map<String,String>) ois.readObject(); - + fis.close(); ois.close(); - + System.out.println("read " + readerMap); - + if (!readerMap.equals(expectedData)) { System.out.println("maps not equals"); ok = false; @@ -148,16 +148,16 @@ public class CacheTestWriter { ok = false; } } - + if (ok) break; } - + UtilWaitThread.sleep(5); } } - + zk.putPersistentData(rootDir + "/die", new byte[0], NodeExistsPolicy.FAIL); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java b/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java index 20fe856..32a178d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java @@ -26,26 +26,26 @@ import org.apache.accumulo.core.iterators.SkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; public class DropModIter extends SkippingIterator { - + private int mod; private int drop; - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { super.init(source, options, env); this.mod = Integer.parseInt(options.get("mod")); this.drop = Integer.parseInt(options.get("drop")); } - + protected void consume() throws IOException { while (getSource().hasTop() && Integer.parseInt(getSource().getTopKey().getRow().toString()) % mod == drop) { getSource().next(); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java index 8e29955..187da35 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java @@ -23,19 +23,19 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.util.UtilWaitThread; /** - * + * */ public class SlowConstraint implements Constraint { - + @Override public String getViolationDescription(short violationCode) { return null; } - + @Override public List<Short> check(Environment env, Mutation mutation) { UtilWaitThread.sleep(20000); return null; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java index cb29688..f84a4d9 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java @@ -37,9 +37,9 @@ public class SlowIterator extends WrappingIterator { private long sleepTime = 0; private long seekSleepTime = 0; - + public static void setSleepTime(IteratorSetting is, long millis) { - is.addOption(SLEEP_TIME, Long.toString(millis)); + is.addOption(SLEEP_TIME, Long.toString(millis)); } public static void setSeekSleepTime(IteratorSetting is, long t) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index eb84533..3bb44ff 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -102,8 +102,8 @@ public class ZombieTServer { TransactionWatcher watcher = new TransactionWatcher(); final ThriftClientHandler tch = new ThriftClientHandler(context, watcher); Processor<Iface> processor = new Processor<Iface>(tch); - ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", "walking dead", 2, 1, 1000, - 10 * 1024 * 1024, null, -1); + ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", + "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, -1); String addressString = serverPort.address.toString(); String zPath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addressString;