Merge branch 'javadoc-jdk8-1.7'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8ff2ca81 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8ff2ca81 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8ff2ca81 Branch: refs/heads/master Commit: 8ff2ca81cd6b2e7ddc76197bd60cfea64eac465f Parents: c252d1a 0ccba14 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Fri Jan 8 22:35:43 2016 -0500 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Fri Jan 8 22:35:43 2016 -0500 ---------------------------------------------------------------------- .../core/bloomfilter/DynamicBloomFilter.java | 4 +-- .../accumulo/core/client/BatchWriterConfig.java | 10 +++--- .../core/client/ConditionalWriterConfig.java | 4 +-- .../accumulo/core/client/ScannerBase.java | 2 -- .../client/mapred/AccumuloFileOutputFormat.java | 4 +-- .../mapreduce/AccumuloFileOutputFormat.java | 4 +-- .../lib/impl/FileOutputConfigurator.java | 4 +-- .../lib/util/FileOutputConfigurator.java | 4 +-- .../security/tokens/AuthenticationToken.java | 2 +- .../core/constraints/VisibilityConstraint.java | 1 - .../java/org/apache/accumulo/core/data/Key.java | 2 +- .../org/apache/accumulo/core/data/Range.java | 6 ++-- .../file/blockfile/cache/CachedBlockQueue.java | 2 +- .../core/file/blockfile/cache/ClassSize.java | 4 +-- .../accumulo/core/file/rfile/bcfile/Utils.java | 35 +++++++++++--------- .../core/iterators/IteratorEnvironment.java | 2 -- .../user/WholeColumnFamilyIterator.java | 4 +-- .../core/metadata/ServicerForMetadataTable.java | 2 +- .../core/metadata/ServicerForRootTable.java | 2 +- .../core/metadata/ServicerForUserTables.java | 2 +- .../core/metadata/schema/MetadataSchema.java | 2 +- .../core/replication/ReplicationSchema.java | 6 ++-- .../accumulo/core/sample/RowColumnSampler.java | 4 +-- .../core/security/ColumnVisibility.java | 8 ++--- .../security/crypto/CryptoModuleParameters.java | 7 +--- .../org/apache/accumulo/core/util/OpTimer.java | 7 ++-- .../accumulo/core/conf/config-header.html | 12 +++---- .../examples/simple/filedata/ChunkCombiner.java | 18 +++++----- pom.xml | 23 +++++++++++++ .../apache/accumulo/server/ServerConstants.java | 2 +- .../server/master/balancer/GroupBalancer.java | 4 +-- .../master/balancer/RegexGroupBalancer.java | 6 ++-- .../server/security/SecurityOperation.java | 6 ++-- .../server/security/UserImpersonation.java | 2 +- .../server/security/SystemCredentialsTest.java | 2 +- .../replication/SequentialWorkAssigner.java | 2 +- .../monitor/servlets/DefaultServlet.java | 2 +- .../monitor/servlets/ReplicationServlet.java | 2 +- .../monitor/servlets/TablesServlet.java | 4 +-- .../tserver/compaction/CompactionStrategy.java | 6 ++-- .../accumulo/test/functional/ScanIdIT.java | 11 +++--- .../test/replication/merkle/package-info.java | 9 ++--- .../replication/merkle/skvi/DigestIterator.java | 2 +- 43 files changed, 135 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java index aed67bc,b5692d2..51f6fae --- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java @@@ -175,98 -174,4 +175,96 @@@ public interface ScannerBase extends It * @return The authorizations set on the scanner instance */ Authorizations getAuthorizations(); + + /** + * Setting this will cause the scanner to read sample data, as long as that sample data was generated with the given configuration. By default this is not set + * and all data is read. + * + * <p> + * One way to use this method is as follows, where the sampler configuration is obtained from the table configuration. Sample data can be generated in many + * different ways, so its important to verify the sample data configuration meets expectations. + * - * <p> - * + * <pre> + * <code> + * // could cache this if creating many scanners to avoid RPCs. + * SamplerConfiguration samplerConfig = connector.tableOperations().getSamplerConfiguration(table); + * // verify table's sample data is generated in an expected way before using + * userCode.verifySamplerConfig(samplerConfig); + * scanner.setSamplerCongiguration(samplerConfig); + * </code> + * </pre> + * + * <p> + * Of course this is not the only way to obtain a {@link SamplerConfiguration}, it could be a constant, configuration, etc. + * + * <p> + * If sample data is not present or sample data was generated with a different configuration, then the scanner iterator will throw a + * {@link SampleNotPresentException}. Also if a table's sampler configuration is changed while a scanner is iterating over a table, a + * {@link SampleNotPresentException} may be thrown. + * + * @since 1.8.0 + */ + void setSamplerConfiguration(SamplerConfiguration samplerConfig); + + /** + * @return currently set sampler configuration. Returns null if no sampler configuration is set. + * @since 1.8.0 + */ + SamplerConfiguration getSamplerConfiguration(); + + /** + * Clears sampler configuration making a scanner read all data. After calling this, {@link #getSamplerConfiguration()} should return null. + * + * @since 1.8.0 + */ + void clearSamplerConfiguration(); + + /** + * This setting determines how long a scanner will wait to fill the returned batch. By default, a scanner wait until the batch is full. + * + * <p> + * Setting the timeout to zero (with any time unit) or {@link Long#MAX_VALUE} (with {@link TimeUnit#MILLISECONDS}) means no timeout. + * + * @param timeOut + * the length of the timeout + * @param timeUnit + * the units of the timeout + * @since 1.8.0 + */ + void setBatchTimeout(long timeOut, TimeUnit timeUnit); + + /** + * Returns the timeout to fill a batch in the given TimeUnit. + * + * @return the batch timeout configured for this scanner + * @since 1.8.0 + */ + long getBatchTimeout(TimeUnit timeUnit); + + /** + * Sets the name of the classloader context on this scanner. See the administration chapter of the user manual for details on how to configure and use + * classloader contexts. + * + * @param classLoaderContext + * name of the classloader context + * @throws NullPointerException + * if context is null + * @since 1.8.0 + */ + void setClassLoaderContext(String classLoaderContext); + + /** + * Clears the current classloader context set on this scanner + * + * @since 1.8.0 + */ + void clearClassLoaderContext(); + + /** + * Returns the name of the current classloader context set on this scanner + * + * @return name of the current context + * @since 1.8.0 + */ + String getClassLoaderContext(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java index 5dbafa6,5a53e93..5c265e2 --- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java @@@ -39,52 -37,4 +39,50 @@@ public interface IteratorEnvironment void registerSideChannel(SortedKeyValueIterator<Key,Value> iter); Authorizations getAuthorizations(); + + /** + * Returns a new iterator environment object that can be used to create deep copies over sample data. The new object created will use the current sampling + * configuration for the table. The existing iterator environment object will not be modified. + * + * <p> + * Since sample data could be created in many different ways, a good practice for an iterator is to verify the sampling configuration is as expected. + * - * <p> - * + * <pre> + * <code> + * class MyIter implements SortedKeyValueIterator<Key,Value> { + * SortedKeyValueIterator<Key,Value> source; + * SortedKeyValueIterator<Key,Value> sampleIter; + * @Override + * void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) { + * IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled(); + * //do some sanity checks on sampling config + * validateSamplingConfiguration(sampleEnv.getSamplerConfiguration()); + * sampleIter = source.deepCopy(sampleEnv); + * this.source = source; + * } + * } + * </code> + * </pre> + * + * @throws SampleNotPresentException + * when sampling is not configured for table. + * @since 1.8.0 + */ + IteratorEnvironment cloneWithSamplingEnabled(); + + /** + * There are at least two conditions under which sampling will be enabled for an environment. One condition is when sampling is enabled for the scan that + * starts everything. Another possibility is for a deep copy created with an environment created by calling {@link #cloneWithSamplingEnabled()} + * + * @return true if sampling is enabled for this environment. + * @since 1.8.0 + */ + boolean isSamplingEnabled(); + + /** + * + * @return sampling configuration is sampling is enabled for environment, otherwise returns null. + * @since 1.8.0 + */ + SamplerConfiguration getSamplerConfiguration(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java index ad68cf6,0000000..c3464ab mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java +++ b/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java @@@ -1,124 -1,0 +1,124 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.sample; + +import java.util.Set; + +import org.apache.accumulo.core.client.admin.SamplerConfiguration; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; + +import com.google.common.collect.ImmutableSet; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; + +/** + * This sampler can hash any subset of a Key's fields. The fields that hashed for the sample are determined by the configuration options passed in + * {@link #init(SamplerConfiguration)}. The following key values are valid options. + * - * <UL> ++ * <ul> + * <li>row=true|false + * <li>family=true|false + * <li>qualifier=true|false + * <li>visibility=true|false - * </UL> ++ * </ul> + * + * <p> + * If not specified in the options, fields default to false. + * + * <p> + * To determine what options are valid for hashing see {@link AbstractHashSampler} + * + * <p> + * To configure Accumulo to generate sample data on one thousandth of the column qualifiers, the following SamplerConfiguration could be created and used to + * configure a table. + * + * <p> + * {@code new SamplerConfiguration(RowColumnSampler.class.getName()).setOptions(ImmutableMap.of("hasher","murmur3_32","modulus","1009","qualifier","true"))} + * + * <p> + * With this configuration, if a column qualifier is selected then all key values contains that column qualifier will end up in the sample data. + * + * @since 1.8.0 + */ + +public class RowColumnSampler extends AbstractHashSampler { + + private boolean row = true; + private boolean family = true; + private boolean qualifier = true; + private boolean visibility = true; + + private static final Set<String> VALID_OPTIONS = ImmutableSet.of("row", "family", "qualifier", "visibility"); + + private boolean hashField(SamplerConfiguration config, String field) { + String optValue = config.getOptions().get(field); + if (optValue != null) { + return Boolean.parseBoolean(optValue); + } + + return false; + } + + @Override + protected boolean isValidOption(String option) { + return super.isValidOption(option) || VALID_OPTIONS.contains(option); + } + + @Override + public void init(SamplerConfiguration config) { + super.init(config); + + row = hashField(config, "row"); + family = hashField(config, "family"); + qualifier = hashField(config, "qualifier"); + visibility = hashField(config, "visibility"); + + if (!row && !family && !qualifier && !visibility) { + throw new IllegalStateException("Must hash at least one key field"); + } + } + + private void putByteSquence(ByteSequence data, Hasher hasher) { + hasher.putBytes(data.getBackingArray(), data.offset(), data.length()); + } + + @Override + protected HashCode hash(HashFunction hashFunction, Key k) { + Hasher hasher = hashFunction.newHasher(); + + if (row) { + putByteSquence(k.getRowData(), hasher); + } + + if (family) { + putByteSquence(k.getColumnFamilyData(), hasher); + } + + if (qualifier) { + putByteSquence(k.getColumnQualifierData(), hasher); + } + + if (visibility) { + putByteSquence(k.getColumnVisibilityData(), hasher); + } + + return hasher.hash(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/util/OpTimer.java index 0fb8cc0,564a824..33ece1a --- a/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java +++ b/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java @@@ -57,75 -41,12 +57,78 @@@ public class OpTimer return this; } - public void stop(String msg) { - if (log.isEnabledFor(level)) { - long t2 = System.currentTimeMillis(); - String duration = String.format("%.3f secs", (t2 - t1) / 1000.0); - msg = msg.replace("%DURATION%", duration); - log.log(level, "tid=" + Thread.currentThread().getId() + " oid=" + opid + " " + msg); + /** + * Stop the timer instance. + * + * @return this instance for fluent chaining. + * @throws IllegalStateException + * if stop is called on instance that is not running. + */ + public OpTimer stop() throws IllegalStateException { + if (!isStarted) { + throw new IllegalStateException("OpTimer is already stopped"); } + long now = System.nanoTime(); + isStarted = false; + currentElapsedNanos += now - startNanos; + return this; } + + /** + * Stops timer instance and current elapsed time to 0. + * + * @return this instance for fluent chaining + */ + public OpTimer reset() { + currentElapsedNanos = 0; + isStarted = false; + return this; + } + + /** + * Converts current timer value to specific unit. The conversion to courser granularities truncate with loss of precision. + * + * @param timeUnit + * the time unit that will converted to. + * @return truncated time in unit of specified time unit. + */ + public long now(TimeUnit timeUnit) { + return timeUnit.convert(now(), TimeUnit.NANOSECONDS); + } + + /** + * Returns the current elapsed time scaled to the provided time unit. This method does not truncate like {@link #now(TimeUnit)} but returns the value as a - * double. </p> Note: this method is not included in the hadoop 2.7 org.apache.hadoop.util.StopWatch class. If that class is adopted, then provisions will be - * required to replace this method. ++ * double. ++ * ++ * <p> ++ * Note: this method is not included in the hadoop 2.7 org.apache.hadoop.util.StopWatch class. If that class is adopted, then provisions will be required to ++ * replace this method. + * + * @param timeUnit + * the time unit to scale the elapsed time to. + * @return the elapsed time of this instance scaled to the provided time unit. + */ + public double scale(TimeUnit timeUnit) { + return (double) now() / TimeUnit.NANOSECONDS.convert(1L, timeUnit); + } + + /** + * Returns current timer elapsed time as nanoseconds. + * + * @return elapsed time in nanoseconds. + */ + public long now() { + return isStarted ? System.nanoTime() - startNanos + currentElapsedNanos : currentElapsedNanos; + } + + /** + * Return the current elapsed time in nanoseconds as a string. + * + * @return timer elapsed time as nanoseconds. + */ + @Override + public String toString() { + return String.valueOf(now()); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/pom.xml ---------------------------------------------------------------------- diff --cc pom.xml index 0f8ec29,644f506..4149d7a --- a/pom.xml +++ b/pom.xml @@@ -1399,31 -1405,27 +1399,54 @@@ </properties> </profile> <profile> + <id>jdk8</id> + <activation> + <jdk>[1.8,1.9)</jdk> + </activation> + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <encoding>${project.reporting.outputEncoding}</encoding> + <quiet>true</quiet> + <javadocVersion>1.8</javadocVersion> + <additionalJOption>-J-Xmx512m</additionalJOption> + <additionalparam>-Xdoclint:all,-Xdoclint:-missing</additionalparam> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + </profile> ++ <profile> + <id>performanceTests</id> + <build> + <pluginManagement> + <plugins> + <!-- Add an additional execution for performance tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <!-- Run only the performance tests --> + <id>run-performance-tests</id> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + <configuration> + <groups>${accumulo.performanceTests}</groups> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </pluginManagement> + </build> + </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java ---------------------------------------------------------------------- diff --cc server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java index 57c68c4,274ec76..a29e3dc --- a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java @@@ -58,16 -55,9 +58,16 @@@ public class SystemCredentialsTest } } + @Before + public void setupInstance() { + inst = EasyMock.createMock(Instance.class); + EasyMock.expect(inst.getInstanceID()).andReturn(UUID.nameUUIDFromBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0}).toString()).anyTimes(); + EasyMock.replay(inst); + } + /** * This is a test to ensure the string literal in {@link ConnectorImpl#ConnectorImpl(org.apache.accumulo.core.client.impl.ClientContext)} is kept up-to-date - * if we move the {@link SystemToken}<br/> + * if we move the {@link SystemToken}<br> * This check will not be needed after ACCUMULO-1578 */ @Test http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java index 7830939,0000000..4f78b77 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java @@@ -1,387 -1,0 +1,390 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.ActiveScan; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +/** + * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that {@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()} + * returns a unique scan id. ++ * + * <p> - * <p/> + * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator} to create multiple scan sessions. The test exercises multiple + * tablet servers with splits and multiple ranges to force the scans to occur across multiple tablet servers for completeness. - * <p/> ++ * ++ * <p> + * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless the following be added: - * <p/> ++ * ++ * <p> + * private static final long serialVersionUID = -4659975753252858243l; - * <p/> ++ * ++ * <p> + * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated. + */ +public class ScanIdIT extends AccumuloClusterHarness { + + private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class); + + private static final int NUM_SCANNERS = 8; + + private static final int NUM_DATA_ROWS = 100; + + private static final Random random = new Random(); + + private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS); + + private static final AtomicBoolean testInProgress = new AtomicBoolean(true); + + private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>(); + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + /** + * @throws Exception + * any exception is a test failure. + */ + @Test + public void testScanId() throws Exception { + + final String tableName = getUniqueNames(1)[0]; + Connector conn = getConnector(); + conn.tableOperations().create(tableName); + + addSplits(conn, tableName); + + log.info("Splits added"); + + generateSampleData(conn, tableName); + + log.info("Generated data for {}", tableName); + + attachSlowIterator(conn, tableName); + + CountDownLatch latch = new CountDownLatch(NUM_SCANNERS); + + for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) { + ScannerThread st = new ScannerThread(conn, scannerIndex, tableName, latch); + pool.submit(st); + } + + // wait for scanners to report a result. + while (testInProgress.get()) { + + if (resultsByWorker.size() < NUM_SCANNERS) { + log.trace("Results reported {}", resultsByWorker.size()); + sleepUninterruptibly(750, TimeUnit.MILLISECONDS); + } else { + // each worker has reported at least one result. + testInProgress.set(false); + + log.debug("Final result count {}", resultsByWorker.size()); + + // delay to allow scanners to react to end of test and cleanly close. + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + } + + // all scanner have reported at least 1 result, so check for unique scan ids. + Set<Long> scanIds = new HashSet<Long>(); + + List<String> tservers = conn.instanceOperations().getTabletServers(); + + log.debug("tablet servers {}", tservers.toString()); + + for (String tserver : tservers) { + + List<ActiveScan> activeScans = null; + for (int i = 0; i < 10; i++) { + try { + activeScans = conn.instanceOperations().getActiveScans(tserver); + break; + } catch (AccumuloException e) { + if (e.getCause() instanceof TableNotFoundException) { + log.debug("Got TableNotFoundException, will retry"); + Thread.sleep(200); + continue; + } + throw e; + } + } + + assertNotNull("Repeatedly got exception trying to active scans", activeScans); + + log.debug("TServer {} has {} active scans", tserver, activeScans.size()); + + for (ActiveScan scan : activeScans) { + log.debug("Tserver {} scan id {}", tserver, scan.getScanid()); + scanIds.add(scan.getScanid()); + } + } + + assertTrue("Expected at least " + NUM_SCANNERS + " scanIds, but saw " + scanIds.size(), NUM_SCANNERS <= scanIds.size()); + + } + + /** + * Runs scanner in separate thread to allow multiple scanners to execute in parallel. + * <p/> + * The thread run method is terminated when the testInProgress flag is set to false. + */ + private static class ScannerThread implements Runnable { + + private final Connector connector; + private Scanner scanner = null; + private final int workerIndex; + private final String tablename; + private final CountDownLatch latch; + + public ScannerThread(final Connector connector, final int workerIndex, final String tablename, final CountDownLatch latch) { + this.connector = connector; + this.workerIndex = workerIndex; + this.tablename = tablename; + this.latch = latch; + } + + /** + * execute the scan across the sample data and put scan result into result map until testInProgress flag is set to false. + */ + @Override + public void run() { + + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + log.error("Thread interrupted with id {}", workerIndex); + Thread.currentThread().interrupt(); + return; + } + + log.debug("Creating scanner in worker thread {}", workerIndex); + + try { + + scanner = connector.createScanner(tablename, new Authorizations()); + + // Never start readahead + scanner.setReadaheadThreshold(Long.MAX_VALUE); + scanner.setBatchSize(1); + + // create different ranges to try to hit more than one tablet. + scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9"))); + + } catch (TableNotFoundException e) { + throw new IllegalStateException("Initialization failure. Could not create scanner", e); + } + + scanner.fetchColumnFamily(new Text("fam1")); + + for (Map.Entry<Key,Value> entry : scanner) { + + // exit when success condition is met. + if (!testInProgress.get()) { + scanner.clearScanIterators(); + scanner.close(); + + return; + } + + Text row = entry.getKey().getRow(); + + log.debug("worker {}, row {}", workerIndex, row.toString()); + + if (entry.getValue() != null) { + + Value prevValue = resultsByWorker.put(workerIndex, entry.getValue()); + + // value should always being increasing + if (prevValue != null) { + + log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue())); + + assertTrue(prevValue.compareTo(entry.getValue()) > 0); + } + } else { + log.info("Scanner returned null"); + fail("Scanner returned unexpected null value"); + } + + } + + log.debug("Scanner ran out of data. (info only, not an error) "); + + } + } + + /** + * Create splits on table and force migration by taking table offline and then bring back online for test. + * + * @param conn + * Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void addSplits(final Connector conn, final String tableName) { + + SortedSet<Text> splits = createSplits(); + + try { + + conn.tableOperations().addSplits(tableName, splits); + + conn.tableOperations().offline(tableName, true); + + sleepUninterruptibly(2, TimeUnit.SECONDS); + conn.tableOperations().online(tableName, true); + + for (Text split : conn.tableOperations().listSplits(tableName)) { + log.trace("Split {}", split); + } + + } catch (AccumuloSecurityException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } catch (TableNotFoundException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } catch (AccumuloException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } + + } + + /** + * Create splits to distribute data across multiple tservers. + * + * @return splits in sorted set for addSplits. + */ + private SortedSet<Text> createSplits() { + + SortedSet<Text> splits = new TreeSet<Text>(); + + for (int split = 0; split < 10; split++) { + splits.add(new Text(Integer.toString(split))); + } + + return splits; + } + + /** + * Generate some sample data using random row id to distribute across splits. + * <p/> + * The primary goal is to determine that each scanner is assigned a unique scan id. This test does check that the count value for fam1 increases if a scanner + * reads multiple value, but this is secondary consideration for this test, that is included for completeness. + * + * @param connector + * Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void generateSampleData(Connector connector, final String tablename) { + + try { + + BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig()); + + ColumnVisibility vis = new ColumnVisibility("public"); + + for (int i = 0; i < NUM_DATA_ROWS; i++) { + + Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i))); + + Mutation m = new Mutation(rowId); + m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8))); + m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS - i).getBytes(UTF_8))); + m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i - NUM_DATA_ROWS).getBytes(UTF_8))); + + log.trace("Added row {}", rowId); + + bw.addMutation(m); + } + + bw.close(); + } catch (TableNotFoundException ex) { + throw new IllegalStateException("Initialization failed. Could not create test data", ex); + } catch (MutationsRejectedException ex) { + throw new IllegalStateException("Initialization failed. Could not create test data", ex); + } + } + + /** + * Attach the test slow iterator so that we have time to read the scan id without creating a large dataset. Uses a fairly large sleep and delay times because + * we are not concerned with how much data is read and we do not read all of the data - the test stops once each scanner reports a scan id. + * + * @param connector + * Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void attachSlowIterator(Connector connector, final String tablename) { + try { + + IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator"); + slowIter.addOption("sleepTime", "200"); + slowIter.addOption("seekSleepTime", "200"); + + connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan)); + + } catch (AccumuloException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } catch (TableNotFoundException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } catch (AccumuloSecurityException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } + } + +}