http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java new file mode 100644 index 0000000..4681cb8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.trace.CountSampler; +import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.core.util.FastFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +public class ContinuousIngest { + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static List<ColumnVisibility> visibilities; + + private static void initVisibilities(ContinuousOpts opts) throws Exception { + if (opts.visFile == null) { + visibilities = Collections.singletonList(new ColumnVisibility()); + return; + } + + visibilities = new ArrayList<>(); + + FileSystem fs = FileSystem.get(new Configuration()); + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8)); + + String line; + + while ((line = in.readLine()) != null) { + visibilities.add(new ColumnVisibility(line)); + } + + in.close(); + } + + private static ColumnVisibility getVisibility(Random rand) { + return visibilities.get(rand.nextInt(visibilities.size())); + } + + public static void main(String[] args) throws Exception { + + ContinuousOpts opts = new ContinuousOpts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); + clientOpts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts, opts); + + initVisibilities(opts); + + if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) { + throw new IllegalArgumentException("bad min and max"); + } + Connector conn = clientOpts.getConnector(); + + if (!conn.tableOperations().exists(clientOpts.getTableName())) { + throw new TableNotFoundException(null, clientOpts.getTableName(), "Consult the README and create the table before starting ingest."); + } + + BatchWriter bw = conn.createBatchWriter(clientOpts.getTableName(), bwOpts.getBatchWriterConfig()); + bw = Trace.wrapAll(bw, new CountSampler(1024)); + + Random r = new Random(); + + byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8); + + System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId, UTF_8)); + + long count = 0; + final int flushInterval = 1000000; + final int maxDepth = 25; + + // always want to point back to flushed data. This way the previous item should + // always exist in accumulo when verifying data. To do this make insert N point + // back to the row from insert (N - flushInterval). The array below is used to keep + // track of this. + long prevRows[] = new long[flushInterval]; + long firstRows[] = new long[flushInterval]; + int firstColFams[] = new int[flushInterval]; + int firstColQuals[] = new int[flushInterval]; + + long lastFlushTime = System.currentTimeMillis(); + + out: while (true) { + // generate first set of nodes + ColumnVisibility cv = getVisibility(r); + + for (int index = 0; index < flushInterval; index++) { + long rowLong = genLong(opts.min, opts.max, r); + prevRows[index] = rowLong; + firstRows[index] = rowLong; + + int cf = r.nextInt(opts.maxColF); + int cq = r.nextInt(opts.maxColQ); + + firstColFams[index] = cf; + firstColQuals[index] = cq; + + Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum); + count++; + bw.addMutation(m); + } + + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); + if (count >= opts.num) + break out; + + // generate subsequent sets of nodes that link to previous set of nodes + for (int depth = 1; depth < maxDepth; depth++) { + for (int index = 0; index < flushInterval; index++) { + long rowLong = genLong(opts.min, opts.max, r); + byte[] prevRow = genRow(prevRows[index]); + prevRows[index] = rowLong; + Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum); + count++; + bw.addMutation(m); + } + + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); + if (count >= opts.num) + break out; + } + + // create one big linked list, this makes all of the first inserts + // point to something + for (int index = 0; index < flushInterval - 1; index++) { + Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r, + opts.checksum); + count++; + bw.addMutation(m); + } + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); + if (count >= opts.num) + break out; + } + + bw.close(); + clientOpts.stopTracing(); + } + + private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException { + long t1 = System.currentTimeMillis(); + bw.flush(); + long t2 = System.currentTimeMillis(); + System.out.printf("FLUSH %d %d %d %d %d%n", t2, (t2 - lastFlushTime), (t2 - t1), count, flushInterval); + lastFlushTime = t2; + return lastFlushTime; + } + + public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r, + boolean checksum) { + // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead + CRC32 cksum = null; + + byte[] rowString = genRow(rowLong); + + byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES); + byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES); + + if (checksum) { + cksum = new CRC32(); + cksum.update(rowString); + cksum.update(cfString); + cksum.update(cqString); + cksum.update(cv.getExpression()); + } + + Mutation m = new Mutation(new Text(rowString)); + + m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum)); + return m; + } + + public static final long genLong(long min, long max, Random r) { + return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min; + } + + static final byte[] genRow(long min, long max, Random r) { + return genRow(genLong(min, max, r)); + } + + static final byte[] genRow(long rowLong) { + return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); + } + + private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) { + int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; + if (cksum != null) + dataLen += 8; + byte val[] = new byte[dataLen]; + System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length); + int index = ingestInstanceId.length; + val[index++] = ':'; + int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, EMPTY_BYTES); + if (added != 16) + throw new RuntimeException(" " + added); + index += 16; + val[index++] = ':'; + if (prevRow != null) { + System.arraycopy(prevRow, 0, val, index, prevRow.length); + index += prevRow.length; + } + + val[index++] = ':'; + + if (cksum != null) { + cksum.update(val, 0, index); + cksum.getValue(); + FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES); + } + + // System.out.println("val "+new String(val)); + + return new Value(val); + } +}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java new file mode 100644 index 0000000..c2902ee --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +/** + * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to + * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes. + * + */ +public class ContinuousMoru extends Configured implements Tool { + private static final String PREFIX = ContinuousMoru.class.getSimpleName() + "."; + private static final String MAX_CQ = PREFIX + "MAX_CQ"; + private static final String MAX_CF = PREFIX + "MAX_CF"; + private static final String MAX = PREFIX + "MAX"; + private static final String MIN = PREFIX + "MIN"; + private static final String CI_ID = PREFIX + "CI_ID"; + + static enum Counts { + SELF_READ; + } + + public static class CMapper extends Mapper<Key,Value,Text,Mutation> { + + private short max_cf; + private short max_cq; + private Random random; + private String ingestInstanceId; + private byte[] iiId; + private long count; + + private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility(); + + @Override + public void setup(Context context) throws IOException, InterruptedException { + int max_cf = context.getConfiguration().getInt(MAX_CF, -1); + int max_cq = context.getConfiguration().getInt(MAX_CQ, -1); + + if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE) + throw new IllegalArgumentException(); + + this.max_cf = (short) max_cf; + this.max_cq = (short) max_cq; + + random = new Random(); + ingestInstanceId = context.getConfiguration().get(CI_ID); + iiId = ingestInstanceId.getBytes(UTF_8); + + count = 0; + } + + @Override + public void map(Key key, Value data, Context context) throws IOException, InterruptedException { + + ContinuousWalk.validate(key, data); + + if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) { + // only rewrite data not written by this M/R job + byte[] val = data.get(); + + int offset = ContinuousWalk.getPrevRowOffset(val); + if (offset > 0) { + long rowLong = Long.parseLong(new String(val, offset, 16, UTF_8), 16); + Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData() + .toArray(), random, true); + context.write(null, m); + } + + } else { + context.getCounter(Counts.SELF_READ).increment(1l); + } + } + } + + static class Opts extends ContinuousOpts { + @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class) + short maxColF = Short.MAX_VALUE; + + @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class) + short maxColQ = Short.MAX_VALUE; + + @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class) + int maxMaps = 0; + } + + @Override + public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException { + Opts opts = new Opts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + MapReduceClientOnDefaultTable clientOpts = new MapReduceClientOnDefaultTable("ci"); + clientOpts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts, opts); + + Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloInputFormat.class); + clientOpts.setAccumuloConfigs(job); + + // set up ranges + try { + Set<Range> ranges = clientOpts.getConnector().tableOperations().splitRangeByTablets(clientOpts.getTableName(), new Range(), opts.maxMaps); + AccumuloInputFormat.setRanges(job, ranges); + AccumuloInputFormat.setAutoAdjustRanges(job, false); + } catch (Exception e) { + throw new IOException(e); + } + + job.setMapperClass(CMapper.class); + + job.setNumReduceTasks(0); + + job.setOutputFormatClass(AccumuloOutputFormat.class); + AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig()); + + Configuration conf = job.getConfiguration(); + conf.setLong(MIN, opts.min); + conf.setLong(MAX, opts.max); + conf.setInt(MAX_CF, opts.maxColF); + conf.setInt(MAX_CQ, opts.maxColQ); + conf.set(CI_ID, UUID.randomUUID().toString()); + + job.waitForCompletion(true); + clientOpts.stopTracing(); + return job.isSuccessful() ? 0 : 1; + } + + /** + * + * @param args + * instanceName zookeepers username password table columns outputpath + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args); + if (res != 0) + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousOpts.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousOpts.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousOpts.java new file mode 100644 index 0000000..3bb11fb --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousOpts.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import java.io.IOException; + +import org.apache.accumulo.core.Constants; +import org.apache.log4j.FileAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.Parameter; + +/** + * Common CLI arguments for the Continuous Ingest suite. + */ +public class ContinuousOpts { + + public static class DebugConverter implements IStringConverter<String> { + @Override + public String convert(String debugLog) { + Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME); + logger.setLevel(Level.TRACE); + logger.setAdditivity(false); + try { + logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true)); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + return debugLog; + } + } + + public static class ShortConverter implements IStringConverter<Short> { + @Override + public Short convert(String value) { + return Short.valueOf(value); + } + } + + @Parameter(names = "--min", description = "lowest random row number to use") + long min = 0; + + @Parameter(names = "--max", description = "maximum random row number to use") + long max = Long.MAX_VALUE; + + @Parameter(names = "--debugLog", description = "file to write debugging output", converter = DebugConverter.class) + String debugLog = null; + + @Parameter(names = "--num", description = "the number of entries to ingest") + long num = Long.MAX_VALUE; + + @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class) + short maxColF = Short.MAX_VALUE; + + @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class) + short maxColQ = Short.MAX_VALUE; + + @Parameter(names = "--addCheckSum", description = "turn on checksums") + boolean checksum = false; + + @Parameter(names = "--visibilities", description = "read the visibilities to ingest with from a file") + String visFile = null; +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java new file mode 100644 index 0000000..8180383 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.cli.ClientOpts.TimeConverter; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.Parameter; + +public class ContinuousQuery { + + public static class Opts extends ContinuousOpts { + @Parameter(names = "--sleep", description = "the time to wait between queries", converter = TimeConverter.class) + long sleepTime = 100; + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + ScannerOpts scanOpts = new ScannerOpts(); + ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); + clientOpts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts, opts); + + Connector conn = clientOpts.getConnector(); + Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), clientOpts.auths); + scanner.setBatchSize(scanOpts.scanBatchSize); + + Random r = new Random(); + + while (true) { + byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r); + + int count = 0; + + long t1 = System.currentTimeMillis(); + scanner.setRange(new Range(new Text(row))); + for (Entry<Key,Value> entry : scanner) { + ContinuousWalk.validate(entry.getKey(), entry.getValue()); + count++; + } + long t2 = System.currentTimeMillis(); + + System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, UTF_8), (t2 - t1), count); + + if (opts.sleepTime > 0) + Thread.sleep(opts.sleepTime); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java new file mode 100644 index 0000000..42e0ea8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class ContinuousScanner { + + static class Opts extends ContinuousWalk.Opts { + @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class) + long numToScan = 0; + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + ScannerOpts scanOpts = new ScannerOpts(); + ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); + clientOpts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts, opts); + + Random r = new Random(); + + long distance = 1000000000000l; + + Connector conn = clientOpts.getConnector(); + Authorizations auths = opts.randomAuths.getAuths(r); + Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), auths); + scanner.setBatchSize(scanOpts.scanBatchSize); + + double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0)); + + while (true) { + long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r); + byte[] scanStart = ContinuousIngest.genRow(startRow); + byte[] scanStop = ContinuousIngest.genRow(startRow + distance); + + scanner.setRange(new Range(new Text(scanStart), new Text(scanStop))); + + int count = 0; + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + long t1 = System.currentTimeMillis(); + + while (iter.hasNext()) { + Entry<Key,Value> entry = iter.next(); + ContinuousWalk.validate(entry.getKey(), entry.getValue()); + count++; + } + + long t2 = System.currentTimeMillis(); + + // System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan); + + if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) { + if (count == 0) { + distance = distance * 10; + if (distance < 0) + distance = 1000000000000l; + } else { + double ratio = (double) opts.numToScan / count; + // move ratio closer to 1 to make change slower + ratio = ratio - (ratio - 1.0) * (2.0 / 3.0); + distance = (long) (ratio * distance); + } + + // System.out.println("P2 "+delta +" "+numToScan+" "+distance+" "+((double)numToScan/count )); + } + + System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count); + + if (opts.sleepTime > 0) + sleepUninterruptibly(opts.sleepTime, TimeUnit.MILLISECONDS); + } + + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java new file mode 100644 index 0000000..818e387 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.Credentials; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.iterators.ColumnFamilyCounter; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.Stat; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.cli.ClientOnRequiredTable; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.util.TableInfoUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.JobClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContinuousStatsCollector { + + private static final Logger log = LoggerFactory.getLogger(ContinuousStatsCollector.class); + + static class StatsCollectionTask extends TimerTask { + + private final String tableId; + private final Opts opts; + private final int scanBatchSize; + + public StatsCollectionTask(Opts opts, int scanBatchSize) { + this.opts = opts; + this.scanBatchSize = scanBatchSize; + this.tableId = Tables.getNameToIdMap(opts.getInstance()).get(opts.getTableName()); + System.out + .println("TIME TABLET_SERVERS TOTAL_ENTRIES TOTAL_INGEST TOTAL_QUERY TABLE_RECS TABLE_RECS_IN_MEM TABLE_INGEST TABLE_QUERY TABLE_TABLETS TABLE_TABLETS_ONLINE" + + " ACCUMULO_DU ACCUMULO_DIRS ACCUMULO_FILES TABLE_DU TABLE_DIRS TABLE_FILES" + + " MAP_TASK MAX_MAP_TASK REDUCE_TASK MAX_REDUCE_TASK TASK_TRACKERS BLACK_LISTED MIN_FILES/TABLET MAX_FILES/TABLET AVG_FILES/TABLET STDDEV_FILES/TABLET"); + } + + @Override + public void run() { + try { + String acuStats = getACUStats(); + String fsStats = getFSStats(); + String mrStats = getMRStats(); + String tabletStats = getTabletStats(); + + System.out.println(System.currentTimeMillis() + " " + acuStats + " " + fsStats + " " + mrStats + " " + tabletStats); + } catch (Exception e) { + log.error(System.currentTimeMillis() + " - Failed to collect stats", e); + } + } + + private String getTabletStats() throws Exception { + + Connector conn = opts.getConnector(); + Scanner scanner = conn.createScanner(MetadataTable.NAME, opts.auths); + scanner.setBatchSize(scanBatchSize); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.addScanIterator(new IteratorSetting(1000, "cfc", ColumnFamilyCounter.class.getName())); + scanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + + Stat s = new Stat(); + + int count = 0; + for (Entry<Key,Value> entry : scanner) { + count++; + s.addStat(Long.parseLong(entry.getValue().toString())); + } + + if (count > 0) + return String.format("%d %d %.3f %.3f", s.getMin(), s.getMax(), s.getAverage(), s.getStdDev()); + else + return "0 0 0 0"; + + } + + private String getFSStats() throws Exception { + VolumeManager fs = VolumeManagerImpl.get(); + long length1 = 0, dcount1 = 0, fcount1 = 0; + long length2 = 0, dcount2 = 0, fcount2 = 0; + for (String dir : ServerConstants.getTablesDirs()) { + ContentSummary contentSummary = fs.getContentSummary(new Path(dir)); + length1 += contentSummary.getLength(); + dcount1 += contentSummary.getDirectoryCount(); + fcount1 += contentSummary.getFileCount(); + contentSummary = fs.getContentSummary(new Path(dir, tableId)); + length2 += contentSummary.getLength(); + dcount2 += contentSummary.getDirectoryCount(); + fcount2 += contentSummary.getFileCount(); + } + + return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2; + } + + private String getACUStats() throws Exception { + + MasterClientService.Iface client = null; + while (true) { + try { + ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.getPrincipal(), opts.getToken()), new ServerConfigurationFactory( + opts.getInstance()).getConfiguration()); + client = MasterClient.getConnectionWithRetry(context); + MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds()); + + TableInfo all = new TableInfo(); + Map<String,TableInfo> tableSummaries = new HashMap<>(); + + for (TabletServerStatus server : stats.tServerInfo) { + for (Entry<String,TableInfo> info : server.tableMap.entrySet()) { + TableInfo tableSummary = tableSummaries.get(info.getKey()); + if (tableSummary == null) { + tableSummary = new TableInfo(); + tableSummaries.put(info.getKey(), tableSummary); + } + TableInfoUtil.add(tableSummary, info.getValue()); + TableInfoUtil.add(all, info.getValue()); + } + } + + TableInfo ti = tableSummaries.get(tableId); + + return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate + " " + (long) all.queryRate + " " + ti.recs + " " + + ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate + " " + ti.tablets + " " + ti.onlineTablets; + + } catch (ThriftNotActiveServiceException e) { + // Let it loop, fetching a new location + log.debug("Contacted a Master which is no longer active, retrying"); + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } finally { + if (client != null) + MasterClient.close(client); + } + } + } + + } + + private static String getMRStats() throws Exception { + Configuration conf = CachedConfiguration.getInstance(); + // No alternatives for hadoop 20 + JobClient jc = new JobClient(new org.apache.hadoop.mapred.JobConf(conf)); + + ClusterStatus cs = jc.getClusterStatus(false); + + return "" + cs.getMapTasks() + " " + cs.getMaxMapTasks() + " " + cs.getReduceTasks() + " " + cs.getMaxReduceTasks() + " " + cs.getTaskTrackers() + " " + + cs.getBlacklistedTrackers(); + + } + + static class Opts extends ClientOnRequiredTable {} + + public static void main(String[] args) { + Opts opts = new Opts(); + ScannerOpts scanOpts = new ScannerOpts(); + opts.parseArgs(ContinuousStatsCollector.class.getName(), args, scanOpts); + Timer jtimer = new Timer(); + + jtimer.schedule(new StatsCollectionTask(opts, scanOpts.scanBatchSize), 0, 30000); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousUtil.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousUtil.java new file mode 100644 index 0000000..5a67a34 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.security.Authorizations; + +/** + * Useful utility methods common to the Continuous test suite. + */ +final class ContinuousUtil { + private ContinuousUtil() {} + + /** + * Attempt to create a table scanner, or fail if the table does not exist. + * + * @param connector + * A populated connector object + * @param table + * The table name to scan over + * @param auths + * The authorizations to use for the scanner + * @return a scanner for the requested table + * @throws TableNotFoundException + * If the table does not exist + */ + static Scanner createScanner(Connector connector, String table, Authorizations auths) throws TableNotFoundException { + if (!connector.tableOperations().exists(table)) { + throw new TableNotFoundException(null, table, "Consult the README and create the table before starting test processes."); + } + return connector.createScanner(table, auths); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java new file mode 100644 index 0000000..64f8a35 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.testing.core.continuous.ContinuousWalk.BadChecksumException; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +/** + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined. + */ + +public class ContinuousVerify extends Configured implements Tool { + + public static final VLongWritable DEF = new VLongWritable(-1); + + public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> { + + private static final Logger log = LoggerFactory.getLogger(CMapper.class); + private LongWritable row = new LongWritable(); + private LongWritable ref = new LongWritable(); + private VLongWritable vrow = new VLongWritable(); + + private long corrupt = 0; + + @Override + public void map(Key key, Value data, Context context) throws IOException, InterruptedException { + long r = Long.parseLong(key.getRow().toString(), 16); + if (r < 0) + throw new IllegalArgumentException(); + + try { + ContinuousWalk.validate(key, data); + } catch (BadChecksumException bce) { + context.getCounter(Counts.CORRUPT).increment(1l); + if (corrupt < 1000) { + log.error("Bad checksum : " + key); + } else if (corrupt == 1000) { + System.out.println("Too many bad checksums, not printing anymore!"); + } + corrupt++; + return; + } + + row.set(r); + + context.write(row, DEF); + byte[] val = data.get(); + + int offset = ContinuousWalk.getPrevRowOffset(val); + if (offset > 0) { + ref.set(Long.parseLong(new String(val, offset, 16, UTF_8), 16)); + vrow.set(r); + context.write(ref, vrow); + } + } + } + + public static enum Counts { + UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT + } + + public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> { + private ArrayList<Long> refs = new ArrayList<>(); + + @Override + public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException { + + int defCount = 0; + + refs.clear(); + for (VLongWritable type : values) { + if (type.get() == -1) { + defCount++; + } else { + refs.add(type.get()); + } + } + + if (defCount == 0 && refs.size() > 0) { + StringBuilder sb = new StringBuilder(); + String comma = ""; + for (Long ref : refs) { + sb.append(comma); + comma = ","; + sb.append(new String(ContinuousIngest.genRow(ref), UTF_8)); + } + + context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); + context.getCounter(Counts.UNDEFINED).increment(1l); + + } else if (defCount > 0 && refs.size() == 0) { + context.getCounter(Counts.UNREFERENCED).increment(1l); + } else { + context.getCounter(Counts.REFERENCED).increment(1l); + } + + } + } + + static class Opts extends MapReduceClientOnDefaultTable { + @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist") + String outputDir = "/tmp/continuousVerify"; + + @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", validateWith = PositiveInteger.class) + int maxMaps = 1; + + @Parameter(names = "--reducers", description = "the number of reducers to use", validateWith = PositiveInteger.class) + int reducers = 1; + + @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline") + boolean scanOffline = false; + + public Opts() { + super("ci"); + } + } + + @Override + public int run(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(this.getClass().getName(), args); + + Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloInputFormat.class); + opts.setAccumuloConfigs(job); + + Set<Range> ranges = null; + String clone = opts.getTableName(); + Connector conn = null; + + if (opts.scanOffline) { + Random random = new Random(); + clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl)); + conn = opts.getConnector(); + conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>()); + ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + conn.tableOperations().offline(clone); + AccumuloInputFormat.setInputTableName(job, clone); + AccumuloInputFormat.setOfflineTableScan(job, true); + } else { + ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + } + + AccumuloInputFormat.setRanges(job, ranges); + AccumuloInputFormat.setAutoAdjustRanges(job, false); + + job.setMapperClass(CMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(VLongWritable.class); + + job.setReducerClass(CReducer.class); + job.setNumReduceTasks(opts.reducers); + + job.setOutputFormatClass(TextOutputFormat.class); + + job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline); + + TextOutputFormat.setOutputPath(job, new Path(opts.outputDir)); + + job.waitForCompletion(true); + + if (opts.scanOffline) { + conn.tableOperations().delete(clone); + } + opts.stopTracing(); + return job.isSuccessful() ? 0 : 1; + } + + /** + * + * @param args + * instanceName zookeepers username password table columns outputpath + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args); + if (res != 0) + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java new file mode 100644 index 0000000..2335fd4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Random; +import java.util.zip.CRC32; + +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.trace.Span; +import org.apache.accumulo.core.trace.Trace; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.Parameter; + +public class ContinuousWalk { + + static public class Opts extends ContinuousQuery.Opts { + class RandomAuthsConverter implements IStringConverter<RandomAuths> { + @Override + public RandomAuths convert(String value) { + try { + return new RandomAuths(value); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Parameter(names = "--authsFile", description = "read the authorities to use from a file") + RandomAuths randomAuths = new RandomAuths(); + } + + static class BadChecksumException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public BadChecksumException(String msg) { + super(msg); + } + + } + + static class RandomAuths { + private List<Authorizations> auths; + + RandomAuths() { + auths = Collections.singletonList(Authorizations.EMPTY); + } + + RandomAuths(String file) throws IOException { + if (file == null) { + auths = Collections.singletonList(Authorizations.EMPTY); + return; + } + + auths = new ArrayList<>(); + + FileSystem fs = FileSystem.get(new Configuration()); + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), UTF_8)); + try { + String line; + while ((line = in.readLine()) != null) { + auths.add(new Authorizations(line.split(","))); + } + } finally { + in.close(); + } + } + + Authorizations getAuths(Random r) { + return auths.get(r.nextInt(auths.size())); + } + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); + clientOpts.parseArgs(ContinuousWalk.class.getName(), args, opts); + + Connector conn = clientOpts.getConnector(); + + Random r = new Random(); + + ArrayList<Value> values = new ArrayList<>(); + + while (true) { + Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), opts.randomAuths.getAuths(r)); + String row = findAStartRow(opts.min, opts.max, scanner, r); + + while (row != null) { + + values.clear(); + + long t1 = System.currentTimeMillis(); + Span span = Trace.on("walk"); + try { + scanner.setRange(new Range(new Text(row))); + for (Entry<Key,Value> entry : scanner) { + validate(entry.getKey(), entry.getValue()); + values.add(entry.getValue()); + } + } finally { + span.stop(); + } + long t2 = System.currentTimeMillis(); + + System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size()); + + if (values.size() > 0) { + row = getPrevRow(values.get(r.nextInt(values.size()))); + } else { + System.out.printf("MIS %d %s%n", t1, row); + System.err.printf("MIS %d %s%n", t1, row); + row = null; + } + + if (opts.sleepTime > 0) + Thread.sleep(opts.sleepTime); + } + + if (opts.sleepTime > 0) + Thread.sleep(opts.sleepTime); + } + } + + private static String findAStartRow(long min, long max, Scanner scanner, Random r) { + + byte[] scanStart = ContinuousIngest.genRow(min, max, r); + scanner.setRange(new Range(new Text(scanStart), null)); + scanner.setBatchSize(100); + + int count = 0; + String pr = null; + + long t1 = System.currentTimeMillis(); + + for (Entry<Key,Value> entry : scanner) { + validate(entry.getKey(), entry.getValue()); + pr = getPrevRow(entry.getValue()); + count++; + if (pr != null) + break; + } + + long t2 = System.currentTimeMillis(); + + System.out.printf("FSR %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count); + + return pr; + } + + static int getPrevRowOffset(byte val[]) { + if (val.length == 0) + throw new IllegalArgumentException(); + if (val[53] != ':') + throw new IllegalArgumentException(new String(val, UTF_8)); + + // prev row starts at 54 + if (val[54] != ':') { + if (val[54 + 16] != ':') + throw new IllegalArgumentException(new String(val, UTF_8)); + return 54; + } + + return -1; + } + + static String getPrevRow(Value value) { + + byte[] val = value.get(); + int offset = getPrevRowOffset(val); + if (offset > 0) { + return new String(val, offset, 16, UTF_8); + } + + return null; + } + + static int getChecksumOffset(byte val[]) { + if (val[val.length - 1] != ':') { + if (val[val.length - 9] != ':') + throw new IllegalArgumentException(new String(val, UTF_8)); + return val.length - 8; + } + + return -1; + } + + static void validate(Key key, Value value) throws BadChecksumException { + int ckOff = getChecksumOffset(value.get()); + if (ckOff < 0) + return; + + long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16); + + CRC32 cksum = new CRC32(); + + cksum.update(key.getRowData().toArray()); + cksum.update(key.getColumnFamilyData().toArray()); + cksum.update(key.getColumnQualifierData().toArray()); + cksum.update(key.getColumnVisibilityData().toArray()); + cksum.update(value.get(), 0, ckOff); + + if (cksum.getValue() != storedCksum) { + throw new BadChecksumException("Checksum invalid " + key + " " + value); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/GenSplits.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/GenSplits.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/GenSplits.java new file mode 100644 index 0000000..be9ef7a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/GenSplits.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import java.util.List; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +/** + * + */ +public class GenSplits { + + static class Opts { + @Parameter(names = "--min", description = "minimum row") + long minRow = 0; + + @Parameter(names = "--max", description = "maximum row") + long maxRow = Long.MAX_VALUE; + + @Parameter(description = "<num tablets>") + List<String> args = null; + } + + public static void main(String[] args) { + + Opts opts = new Opts(); + JCommander jcommander = new JCommander(opts); + jcommander.setProgramName(GenSplits.class.getSimpleName()); + + try { + jcommander.parse(args); + } catch (ParameterException pe) { + System.err.println(pe.getMessage()); + jcommander.usage(); + System.exit(-1); + } + + if (opts.args == null || opts.args.size() != 1) { + jcommander.usage(); + System.exit(-1); + } + + int numTablets = Integer.parseInt(opts.args.get(0)); + + if (numTablets < 1) { + System.err.println("ERROR: numTablets < 1"); + System.exit(-1); + } + + if (opts.minRow >= opts.maxRow) { + System.err.println("ERROR: min >= max"); + System.exit(-1); + } + + int numSplits = numTablets - 1; + long distance = ((opts.maxRow - opts.minRow) / numTablets) + 1; + long split = distance; + for (int i = 0; i < numSplits; i++) { + + String s = String.format("%016x", split + opts.minRow); + + while (s.charAt(s.length() - 1) == '0') { + s = s.substring(0, s.length() - 1); + } + + System.out.println(s); + split += distance; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/HistData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/HistData.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/HistData.java new file mode 100644 index 0000000..2fff363 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/HistData.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import java.io.Serializable; +import java.util.Objects; + +class HistData<T> implements Comparable<HistData<T>>, Serializable { + private static final long serialVersionUID = 1L; + + T bin; + long count; + + HistData(T bin) { + this.bin = bin; + count = 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(bin) + Objects.hashCode(count); + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + return obj == this || (obj != null && obj instanceof HistData && 0 == compareTo((HistData<T>) obj)); + } + + @SuppressWarnings("unchecked") + @Override + public int compareTo(HistData<T> o) { + return ((Comparable<T>) bin).compareTo(o.bin); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/Histogram.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/Histogram.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/Histogram.java new file mode 100644 index 0000000..0f1ba05 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/Histogram.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +public class Histogram<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + protected long sum; + protected HashMap<T,HistData<T>> counts; + + public Histogram() { + sum = 0; + counts = new HashMap<>(); + } + + public void addPoint(T x) { + addPoint(x, 1); + } + + public void addPoint(T x, long y) { + + HistData<T> hd = counts.get(x); + if (hd == null) { + hd = new HistData<>(x); + counts.put(x, hd); + } + + hd.count += y; + sum += y; + } + + public long getCount(T x) { + HistData<T> hd = counts.get(x); + if (hd == null) + return 0; + return hd.count; + } + + public double getPercentage(T x) { + if (getSum() == 0) { + return 0; + } + return (double) getCount(x) / (double) getSum() * 100.0; + } + + public long getSum() { + return sum; + } + + public List<T> getKeysInCountSortedOrder() { + + ArrayList<HistData<T>> sortedCounts = new ArrayList<>(counts.values()); + + Collections.sort(sortedCounts, new Comparator<HistData<T>>() { + @Override + public int compare(HistData<T> o1, HistData<T> o2) { + if (o1.count < o2.count) + return -1; + if (o1.count > o2.count) + return 1; + return 0; + } + }); + + ArrayList<T> sortedKeys = new ArrayList<>(); + + for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { + HistData<T> hd = iter.next(); + sortedKeys.add(hd.bin); + } + + return sortedKeys; + } + + public void print(StringBuilder out) { + TreeSet<HistData<T>> sortedCounts = new TreeSet<>(counts.values()); + + int maxValueLen = 0; + + for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { + HistData<T> hd = iter.next(); + if (("" + hd.bin).length() > maxValueLen) { + maxValueLen = ("" + hd.bin).length(); + } + } + + double psum = 0; + + for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { + HistData<T> hd = iter.next(); + + psum += getPercentage(hd.bin); + + out.append(String.format(" %" + (maxValueLen + 1) + "s %,16d %6.2f%s %6.2f%s%n", hd.bin + "", hd.count, getPercentage(hd.bin), "%", psum, "%")); + } + out.append(String.format("%n %" + (maxValueLen + 1) + "s %,16d %n", "TOTAL", sum)); + } + + public void save(String file) throws IOException { + + FileOutputStream fos = new FileOutputStream(file); + BufferedOutputStream bos = new BufferedOutputStream(fos); + PrintStream ps = new PrintStream(bos, false, UTF_8.name()); + + TreeSet<HistData<T>> sortedCounts = new TreeSet<>(counts.values()); + for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { + HistData<T> hd = iter.next(); + ps.println(" " + hd.bin + " " + hd.count); + } + + ps.close(); + } + + public Set<T> getKeys() { + return counts.keySet(); + } + + public void clear() { + counts.clear(); + sum = 0; + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/PrintScanTimeHistogram.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/PrintScanTimeHistogram.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/PrintScanTimeHistogram.java new file mode 100644 index 0000000..7172f3a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/PrintScanTimeHistogram.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PrintScanTimeHistogram { + + private static final Logger log = LoggerFactory.getLogger(PrintScanTimeHistogram.class); + + public static void main(String[] args) throws Exception { + Histogram<String> srqHist = new Histogram<>(); + Histogram<String> fsrHist = new Histogram<>(); + + processFile(System.in, srqHist, fsrHist); + + StringBuilder report = new StringBuilder(); + report.append(String.format("%n *** Single row queries histogram *** %n")); + srqHist.print(report); + log.info("{}", report); + + report = new StringBuilder(); + report.append(String.format("%n *** Find start rows histogram *** %n")); + fsrHist.print(report); + log.info("{}", report); + } + + private static void processFile(InputStream ins, Histogram<String> srqHist, Histogram<String> fsrHist) throws FileNotFoundException, IOException { + String line; + BufferedReader in = new BufferedReader(new InputStreamReader(ins, UTF_8)); + + while ((line = in.readLine()) != null) { + + try { + String[] tokens = line.split(" "); + + String type = tokens[0]; + if (type.equals("SRQ")) { + long delta = Long.parseLong(tokens[3]); + String point = generateHistPoint(delta); + srqHist.addPoint(point); + } else if (type.equals("FSR")) { + long delta = Long.parseLong(tokens[3]); + String point = generateHistPoint(delta); + fsrHist.addPoint(point); + } + } catch (Exception e) { + log.error("Failed to process line '" + line + "'.", e); + } + } + + in.close(); + } + + private static String generateHistPoint(long delta) { + String point; + + if (delta / 1000.0 < .1) { + point = String.format("%07.2f", delta / 1000.0); + if (point.equals("0000.10")) + point = "0000.1x"; + } else if (delta / 1000.0 < 1.0) { + point = String.format("%06.1fx", delta / 1000.0); + if (point.equals("0001.0x")) + point = "0001.xx"; + } else { + point = String.format("%04.0f.xx", delta / 1000.0); + } + return point; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/TimeBinner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/TimeBinner.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/TimeBinner.java new file mode 100644 index 0000000..d43e2e5 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/TimeBinner.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.accumulo.core.cli.ClientOpts.TimeConverter; +import org.apache.accumulo.core.cli.Help; + +import com.beust.jcommander.Parameter; + +public class TimeBinner { + + enum Operation { + AVG, SUM, MIN, MAX, COUNT, CUMULATIVE, AMM, // avg,min,max + AMM_HACK1 // special case + } + + private static class DoubleWrapper { + double d; + } + + private static DoubleWrapper get(long l, HashMap<Long,DoubleWrapper> m, double init) { + DoubleWrapper dw = m.get(l); + if (dw == null) { + dw = new DoubleWrapper(); + dw.d = init; + m.put(l, dw); + } + return dw; + } + + static class Opts extends Help { + @Parameter(names = "--period", description = "period", converter = TimeConverter.class, required = true) + long period = 0; + @Parameter(names = "--timeColumn", description = "time column", required = true) + int timeColumn = 0; + @Parameter(names = "--dataColumn", description = "data column", required = true) + int dataColumn = 0; + @Parameter(names = "--operation", description = "one of: AVG, SUM, MIN, MAX, COUNT", required = true) + String operation; + @Parameter(names = "--dateFormat", description = "a SimpleDataFormat string that describes the data format") + String dateFormat = "MM/dd/yy-HH:mm:ss"; + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(TimeBinner.class.getName(), args); + + Operation operation = Operation.valueOf(opts.operation); + SimpleDateFormat sdf = new SimpleDateFormat(opts.dateFormat); + + BufferedReader in = new BufferedReader(new InputStreamReader(System.in, UTF_8)); + + String line = null; + + HashMap<Long,DoubleWrapper> aggregation1 = new HashMap<>(); + HashMap<Long,DoubleWrapper> aggregation2 = new HashMap<>(); + HashMap<Long,DoubleWrapper> aggregation3 = new HashMap<>(); + HashMap<Long,DoubleWrapper> aggregation4 = new HashMap<>(); + + while ((line = in.readLine()) != null) { + + try { + String tokens[] = line.split("\\s+"); + + long time = (long) Double.parseDouble(tokens[opts.timeColumn]); + double data = Double.parseDouble(tokens[opts.dataColumn]); + + time = (time / opts.period) * opts.period; + + switch (operation) { + case AMM_HACK1: { + if (opts.dataColumn < 2) { + throw new IllegalArgumentException("--dataColumn must be at least 2"); + } + double data_min = Double.parseDouble(tokens[opts.dataColumn - 2]); + double data_max = Double.parseDouble(tokens[opts.dataColumn - 1]); + + updateMin(time, aggregation3, data, data_min); + updateMax(time, aggregation4, data, data_max); + + increment(time, aggregation1, data); + increment(time, aggregation2, 1); + break; + } + case AMM: { + updateMin(time, aggregation3, data, data); + updateMax(time, aggregation4, data, data); + + increment(time, aggregation1, data); + increment(time, aggregation2, 1); + break; + } + case AVG: { + increment(time, aggregation1, data); + increment(time, aggregation2, 1); + break; + } + case MAX: { + updateMax(time, aggregation1, data, data); + break; + } + case MIN: { + updateMin(time, aggregation1, data, data); + break; + } + case COUNT: { + increment(time, aggregation1, 1); + break; + } + case SUM: + case CUMULATIVE: { + increment(time, aggregation1, data); + break; + } + } + + } catch (Exception e) { + System.err.println("Failed to process line : " + line + " " + e.getMessage()); + } + } + + TreeMap<Long,DoubleWrapper> sorted = new TreeMap<>(aggregation1); + + Set<Entry<Long,DoubleWrapper>> es = sorted.entrySet(); + + double cumulative = 0; + for (Entry<Long,DoubleWrapper> entry : es) { + String value; + + switch (operation) { + case AMM_HACK1: + case AMM: { + DoubleWrapper countdw = aggregation2.get(entry.getKey()); + value = "" + (entry.getValue().d / countdw.d) + " " + aggregation3.get(entry.getKey()).d + " " + aggregation4.get(entry.getKey()).d; + break; + } + case AVG: { + DoubleWrapper countdw = aggregation2.get(entry.getKey()); + value = "" + (entry.getValue().d / countdw.d); + break; + } + case CUMULATIVE: { + cumulative += entry.getValue().d; + value = "" + cumulative; + break; + } + default: + value = "" + entry.getValue().d; + } + + System.out.println(sdf.format(new Date(entry.getKey())) + " " + value); + } + + } + + private static void increment(long time, HashMap<Long,DoubleWrapper> aggregation, double amount) { + get(time, aggregation, 0).d += amount; + } + + private static void updateMax(long time, HashMap<Long,DoubleWrapper> aggregation, double data, double new_max) { + DoubleWrapper maxdw = get(time, aggregation, Double.NEGATIVE_INFINITY); + if (data > maxdw.d) + maxdw.d = new_max; + } + + private static void updateMin(long time, HashMap<Long,DoubleWrapper> aggregation, double data, double new_min) { + DoubleWrapper mindw = get(time, aggregation, Double.POSITIVE_INFINITY); + if (data < mindw.d) + mindw.d = new_min; + } +}