This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 80dde6105da995af5e9b4368d21194ceda9a66b2 Merge: 3bd262f ed1bb67 Author: Mike Miller <mmil...@apache.org> AuthorDate: Tue Jan 28 10:29:19 2020 -0500 Merge branch '1.9' Conflicts: test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java test/src/main/java/org/apache/accumulo/test/ManySplitIT.java test/src/main/java/org/apache/accumulo/test/MetaGetsReadersIT.java test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java test/src/main/java/org/apache/accumulo/test/functional/DurabilityIT.java test/src/main/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java test/src/main/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java .../org/apache/accumulo/test/GarbageCollectWALIT.java | 7 ++++++- .../java/org/apache/accumulo/test/LargeSplitRowIT.java | 7 ++++++- .../org/apache/accumulo/test/MetaGetsReadersIT.java | 7 +++++-- .../java/org/apache/accumulo/test/MetaRecoveryIT.java | 7 ++++++- .../org/apache/accumulo/test/MultiTableRecoveryIT.java | 7 ++++++- .../org/apache/accumulo/test/ScanFlushWithTimeIT.java | 7 ++++++- .../java/org/apache/accumulo/test/ShellServerIT.java | 2 +- .../org/apache/accumulo/test/TabletServerGivesUpIT.java | 7 ++++++- .../apache/accumulo/test/TabletServerHdfsRestartIT.java | 7 ++++++- .../java/org/apache/accumulo/test/TotalQueuedIT.java | 7 ++++++- .../main/java/org/apache/accumulo/test/UnusedWALIT.java | 10 +++++----- .../apache/accumulo/test/VerifySerialRecoveryIT.java | 7 ++++++- .../apache/accumulo/test/functional/DeleteRowsIT.java | 4 ++-- .../apache/accumulo/test/functional/DurabilityIT.java | 17 +++++++++++------ .../accumulo/test/functional/RegexGroupBalanceIT.java | 7 ++++++- .../accumulo/test/functional/SessionDurabilityIT.java | 13 +++++++++---- .../GarbageCollectorCommunicatesWithTServersIT.java | 2 +- .../test/replication/MultiInstanceReplicationIT.java | 2 +- 18 files changed, 95 insertions(+), 32 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/MetaGetsReadersIT.java index c902129,43eed8b..0981dd1 --- a/test/src/main/java/org/apache/accumulo/test/MetaGetsReadersIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MetaGetsReadersIT.java @@@ -57,12 -53,19 +56,17 @@@ public class MetaGetsReadersIT extends cfg.setProperty(Property.TABLE_BLOCKCACHE_ENABLED, "false"); } + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + - private static Thread slowScan(final Connector c, final String tableName, + private static Thread slowScan(final AccumuloClient c, final String tableName, final AtomicBoolean stop) { - Thread thread = new Thread() { - @Override - public void run() { - try { - while (stop.get() == false) { - Scanner s = c.createScanner(tableName, Authorizations.EMPTY); + return new Thread(() -> { + try { + while (!stop.get()) { + try (Scanner s = c.createScanner(tableName, Authorizations.EMPTY)) { IteratorSetting is = new IteratorSetting(50, SlowIterator.class); SlowIterator.setSleepTime(is, 10); s.addScanIterator(is); @@@ -71,52 -74,48 +75,51 @@@ iterator.next(); } } - } catch (Exception ex) { - log.trace("{}", ex.getMessage(), ex); - stop.set(true); } + } catch (Exception ex) { + log.trace("{}", ex.getMessage(), ex); + stop.set(true); } - }; - return thread; + }); } - @Test(timeout = 2 * 60 * 1000) public void test() throws Exception { final String tableName = getUniqueNames(1)[0]; - final Connector c = getConnector(); - c.tableOperations().create(tableName); - Random random = new Random(); - BatchWriter bw = c.createBatchWriter(tableName, null); - for (int i = 0; i < 50000; i++) { - byte[] row = new byte[100]; - random.nextBytes(row); - Mutation m = new Mutation(row); - m.put("", "", ""); - bw.addMutation(m); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + c.tableOperations().create(tableName); + Random random = new SecureRandom(); + try (BatchWriter bw = c.createBatchWriter(tableName)) { + for (int i = 0; i < 50000; i++) { + byte[] row = new byte[100]; + random.nextBytes(row); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + } + } + c.tableOperations().flush(tableName, null, null, true); + final AtomicBoolean stop = new AtomicBoolean(false); + Thread t1 = slowScan(c, tableName, stop); + t1.start(); + Thread t2 = slowScan(c, tableName, stop); + t2.start(); + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + long now = System.currentTimeMillis(); + + try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + Iterators.size(s.iterator()); + } + + long delay = System.currentTimeMillis() - now; + System.out.println("Delay = " + delay); + assertTrue("metadata table scan was slow", delay < 1000); + assertFalse(stop.get()); + stop.set(true); + t1.interrupt(); + t2.interrupt(); + t1.join(); + t2.join(); } - bw.close(); - c.tableOperations().flush(tableName, null, null, true); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread t1 = slowScan(c, tableName, stop); - t1.start(); - Thread t2 = slowScan(c, tableName, stop); - t2.start(); - sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - long now = System.currentTimeMillis(); - Scanner m = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - Iterators.size(m.iterator()); - long delay = System.currentTimeMillis() - now; - System.out.println("Delay = " + delay); - assertTrue("metadata table scan was slow", delay < 1000); - assertFalse(stop.get()); - stop.set(true); - t1.interrupt(); - t2.interrupt(); - t1.join(); - t2.join(); } } diff --cc test/src/main/java/org/apache/accumulo/test/MetaRecoveryIT.java index c67c8c3,552579c..4004452 --- a/test/src/main/java/org/apache/accumulo/test/MetaRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MetaRecoveryIT.java @@@ -55,43 -54,47 +55,48 @@@ public class MetaRecoveryIT extends Con cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1048576"); } - @Test(timeout = 4 * 60 * 1000) + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + @Test public void test() throws Exception { String[] tables = getUniqueNames(10); - Connector c = getConnector(); - int i = 0; - for (String table : tables) { - log.info("Creating table {}", i); - c.tableOperations().create(table); - BatchWriter bw = c.createBatchWriter(table, null); - for (int j = 0; j < 1000; j++) { - Mutation m = new Mutation("" + j); - m.put("cf", "cq", "value"); - bw.addMutation(m); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + int i = 0; + for (String table : tables) { + log.info("Creating table {}", i); + c.tableOperations().create(table); + try (BatchWriter bw = c.createBatchWriter(table)) { + for (int j = 0; j < 1000; j++) { + Mutation m = new Mutation("" + j); + m.put("cf", "cq", "value"); + bw.addMutation(m); + } + } + log.info("Data written to table {}", i); + i++; + } + c.tableOperations().flush(MetadataTable.NAME, null, null, true); + c.tableOperations().flush(RootTable.NAME, null, null, true); + SortedSet<Text> splits = new TreeSet<>(); + for (i = 1; i < tables.length; i++) { + splits.add(new Text("" + i)); + } + c.tableOperations().addSplits(MetadataTable.NAME, splits); + log.info("Added {} splits to {}", splits.size(), MetadataTable.NAME); + c.instanceOperations().waitForBalance(); + log.info("Restarting"); + getCluster().getClusterControl().kill(ServerType.TABLET_SERVER, "localhost"); + getCluster().start(); + log.info("Verifying"); + for (String table : tables) { + try (BatchScanner scanner = c.createBatchScanner(table)) { + scanner.setRanges(Collections.singletonList(new Range())); + assertEquals(1000, Iterators.size(scanner.iterator())); + } } - bw.close(); - log.info("Data written to table {}", i); - i++; - } - c.tableOperations().flush(MetadataTable.NAME, null, null, true); - c.tableOperations().flush(RootTable.NAME, null, null, true); - SortedSet<Text> splits = new TreeSet<>(); - for (i = 1; i < tables.length; i++) { - splits.add(new Text("" + i)); - } - c.tableOperations().addSplits(MetadataTable.NAME, splits); - log.info("Added {} splits to {}", splits.size(), MetadataTable.NAME); - c.instanceOperations().waitForBalance(); - log.info("Restarting"); - getCluster().getClusterControl().kill(ServerType.TABLET_SERVER, "localhost"); - getCluster().start(); - log.info("Verifying"); - for (String table : tables) { - BatchScanner scanner = c.createBatchScanner(table, Authorizations.EMPTY, 5); - scanner.setRanges(Collections.singletonList(new Range())); - assertEquals(1000, Iterators.size(scanner.iterator())); - scanner.close(); } } diff --cc test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java index c5e959d,8945915..8619280 --- a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java @@@ -59,62 -53,66 +59,67 @@@ public class MultiTableRecoveryIT exten hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } - @Test(timeout = 4 * 60 * 1000) + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + @Test public void testRecoveryOverMultipleTables() throws Exception { final int N = 3; - final Connector c = getConnector(); - final String[] tables = getUniqueNames(N); - final BatchWriter[] writers = new BatchWriter[N]; - final byte[][] values = new byte[N][]; - int i = 0; - System.out.println("Creating tables"); - for (String tableName : tables) { - c.tableOperations().create(tableName); - values[i] = Integer.toString(i).getBytes(); - writers[i] = c.createBatchWriter(tableName, null); - i++; - } - System.out.println("Creating agitator"); - final AtomicBoolean stop = new AtomicBoolean(false); - final Thread agitator = agitator(stop); - agitator.start(); - System.out.println("writing"); - final Random random = new Random(); - for (i = 0; i < 1_000_000; i++) { - // make non-negative avoiding Math.abs, because that can still be negative - long randomRow = random.nextLong() & Long.MAX_VALUE; - assertTrue(randomRow >= 0); - final int table = (int) (randomRow % N); - final Mutation m = new Mutation(Long.toHexString(randomRow)); - m.put(new byte[0], new byte[0], values[table]); - writers[table].addMutation(m); - if (i % 10_000 == 0) { - System.out.println("flushing"); - for (int w = 0; w < N; w++) { - writers[w].flush(); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + final String[] tables = getUniqueNames(N); + final BatchWriter[] writers = new BatchWriter[N]; + final byte[][] values = new byte[N][]; + int i = 0; + System.out.println("Creating tables"); + for (String tableName : tables) { + c.tableOperations().create(tableName); + values[i] = Integer.toString(i).getBytes(); + writers[i] = c.createBatchWriter(tableName, null); + i++; + } + System.out.println("Creating agitator"); + final AtomicBoolean stop = new AtomicBoolean(false); + final Thread agitator = agitator(stop); + agitator.start(); + System.out.println("writing"); + final Random random = new SecureRandom(); + for (i = 0; i < 1_000_000; i++) { + // make non-negative avoiding Math.abs, because that can still be negative + long randomRow = random.nextLong() & Long.MAX_VALUE; + assertTrue(randomRow >= 0); + final int table = (int) (randomRow % N); + final Mutation m = new Mutation(Long.toHexString(randomRow)); + m.put(new byte[0], new byte[0], values[table]); + writers[table].addMutation(m); + if (i % 10_000 == 0) { + System.out.println("flushing"); + for (int w = 0; w < N; w++) { + writers[w].flush(); + } } } - } - System.out.println("closing"); - for (int w = 0; w < N; w++) { - writers[w].close(); - } - System.out.println("stopping the agitator"); - stop.set(true); - agitator.join(); - System.out.println("checking the data"); - long count = 0; - for (int w = 0; w < N; w++) { - Scanner scanner = c.createScanner(tables[w], Authorizations.EMPTY); - for (Entry<Key,Value> entry : scanner) { - int value = Integer.parseInt(entry.getValue().toString()); - assertEquals(w, value); - count++; + System.out.println("closing"); + for (int w = 0; w < N; w++) { + writers[w].close(); } - scanner.close(); + System.out.println("stopping the agitator"); + stop.set(true); + agitator.join(); + System.out.println("checking the data"); + long count = 0; + for (int w = 0; w < N; w++) { + try (Scanner scanner = c.createScanner(tables[w], Authorizations.EMPTY)) { + for (Entry<Key,Value> entry : scanner) { + int value = Integer.parseInt(entry.getValue().toString()); + assertEquals(w, value); + count++; + } + } + } + assertEquals(1_000_000, count); } - assertEquals(1_000_000, count); } private Thread agitator(final AtomicBoolean stop) { diff --cc test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java index 4d1f3d2,a617469..6dad24d --- a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java @@@ -46,20 -44,25 +46,25 @@@ public class TabletServerGivesUpIT exte cfg.setProperty(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION, "0s"); } - @Test(timeout = 45 * 1000) + @Override + protected int defaultTimeoutSeconds() { + return 45; + } + + @Test public void test() throws Exception { - final Connector conn = this.getConnector(); - // Yes, there's a tabletserver - assertEquals(1, conn.instanceOperations().getTabletServers().size()); - final String tableName = getUniqueNames(1)[0]; - conn.tableOperations().create(tableName); - // Kill dfs - cluster.getMiniDfs().shutdown(); - // ask the tserver to do something - final AtomicReference<Exception> ex = new AtomicReference<>(); - Thread splitter = new Thread() { - @Override - public void run() { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + while (client.instanceOperations().getTabletServers().isEmpty()) { + // Wait until at least one tablet server is up + Thread.sleep(100); + } + final String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + // Kill dfs + cluster.getMiniDfs().shutdown(); + // ask the tserver to do something + final AtomicReference<Exception> ex = new AtomicReference<>(); + Thread splitter = new Thread(() -> { try { TreeSet<Text> splits = new TreeSet<>(); splits.add(new Text("X")); diff --cc test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java index e707fcb,0ab5397..df41ba5 --- a/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java @@@ -45,30 -42,31 +45,35 @@@ public class TabletServerHdfsRestartIT cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); } - @Test(timeout = 2 * 60 * 1000) + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test public void test() throws Exception { - final Connector conn = this.getConnector(); - // Yes, there's a tabletserver - assertEquals(1, conn.instanceOperations().getTabletServers().size()); - final String tableName = getUniqueNames(1)[0]; - conn.tableOperations().create(tableName); - BatchWriter bw = conn.createBatchWriter(tableName, null); - for (int i = 0; i < N; i++) { - Mutation m = new Mutation("" + i); - m.put("", "", ""); - bw.addMutation(m); - } - bw.close(); - conn.tableOperations().flush(tableName, null, null, true); + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + // wait until a tablet server is up + while (client.instanceOperations().getTabletServers().isEmpty()) { + Thread.sleep(50); + } + final String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < N; i++) { + Mutation m = new Mutation("" + i); + m.put("", "", ""); + bw.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); - // Kill dfs - cluster.getMiniDfs().restartNameNode(false); + // Kill dfs + cluster.getMiniDfs().restartNameNode(false); - assertEquals(N, Iterators.size(conn.createScanner(tableName, Authorizations.EMPTY).iterator())); + assertEquals(N, + Iterators.size(client.createScanner(tableName, Authorizations.EMPTY).iterator())); + } } } diff --cc test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java index 05d3652,0f904ac..10e155e --- a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java @@@ -52,81 -49,87 +52,86 @@@ public class TotalQueuedIT extends Conf cfg.useMiniDFS(); } + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + - int SMALL_QUEUE_SIZE = 100000; - int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10; - static final long N = 1000000; + private int SMALL_QUEUE_SIZE = 100000; + private int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10; + private static final long N = 1000000; - @Test(timeout = 4 * 60 * 1000) + @Test public void test() throws Exception { - Random random = new Random(); - Connector c = getConnector(); - c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), - "" + SMALL_QUEUE_SIZE); - String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999"); - c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999"); - sleepUninterruptibly(1, TimeUnit.SECONDS); - // get an idea of how fast the syncs occur - byte row[] = new byte[250]; - BatchWriterConfig cfg = new BatchWriterConfig(); - cfg.setMaxWriteThreads(10); - cfg.setMaxLatency(1, TimeUnit.SECONDS); - cfg.setMaxMemory(1024 * 1024); - long realSyncs = getSyncs(); - BatchWriter bw = c.createBatchWriter(tableName, cfg); - long now = System.currentTimeMillis(); - long bytesSent = 0; - for (int i = 0; i < N; i++) { - random.nextBytes(row); - Mutation m = new Mutation(row); - m.put("", "", ""); - bw.addMutation(m); - bytesSent += m.estimatedMemoryUsed(); - } - bw.close(); - long diff = System.currentTimeMillis() - now; - double secs = diff / 1000.; - double syncs = bytesSent / SMALL_QUEUE_SIZE; - double syncsPerSec = syncs / secs; - System.out - .println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", - bytesSent, secs, ((long) syncs), syncsPerSec)); - long update = getSyncs(); - System.out.println("Syncs " + (update - realSyncs)); - realSyncs = update; + Random random = new SecureRandom(); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), + "" + SMALL_QUEUE_SIZE); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999"); + c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999"); + sleepUninterruptibly(1, TimeUnit.SECONDS); + // get an idea of how fast the syncs occur + byte[] row = new byte[250]; + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setMaxWriteThreads(10); + cfg.setMaxLatency(1, TimeUnit.SECONDS); + cfg.setMaxMemory(1024 * 1024); + long realSyncs = getSyncs(c); + long now = System.currentTimeMillis(); + long bytesSent = 0; + try (BatchWriter bw = c.createBatchWriter(tableName, cfg)) { + for (int i = 0; i < N; i++) { + random.nextBytes(row); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bytesSent += m.estimatedMemoryUsed(); + } + } + long diff = System.currentTimeMillis() - now; + double secs = diff / 1000.; + double syncs = bytesSent / SMALL_QUEUE_SIZE; + double syncsPerSec = syncs / secs; + System.out.println( + String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", + bytesSent, secs, ((long) syncs), syncsPerSec)); + long update = getSyncs(c); + System.out.println("Syncs " + (update - realSyncs)); + realSyncs = update; - // Now with a much bigger total queue - c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), - "" + LARGE_QUEUE_SIZE); - c.tableOperations().flush(tableName, null, null, true); - sleepUninterruptibly(1, TimeUnit.SECONDS); - bw = c.createBatchWriter(tableName, cfg); - now = System.currentTimeMillis(); - bytesSent = 0; - for (int i = 0; i < N; i++) { - random.nextBytes(row); - Mutation m = new Mutation(row); - m.put("", "", ""); - bw.addMutation(m); - bytesSent += m.estimatedMemoryUsed(); + // Now with a much bigger total queue + c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), + "" + LARGE_QUEUE_SIZE); + c.tableOperations().flush(tableName, null, null, true); + sleepUninterruptibly(1, TimeUnit.SECONDS); + try (BatchWriter bw = c.createBatchWriter(tableName, cfg)) { + now = System.currentTimeMillis(); + bytesSent = 0; + for (int i = 0; i < N; i++) { + random.nextBytes(row); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bytesSent += m.estimatedMemoryUsed(); + } + } + diff = System.currentTimeMillis() - now; + secs = diff / 1000.; + syncs = bytesSent / LARGE_QUEUE_SIZE; + syncsPerSec = syncs / secs; + System.out.println( + String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", + bytesSent, secs, ((long) syncs), syncsPerSec)); + update = getSyncs(c); + System.out.println("Syncs " + (update - realSyncs)); + assertTrue(update - realSyncs < realSyncs); } - bw.close(); - diff = System.currentTimeMillis() - now; - secs = diff / 1000.; - syncs = bytesSent / LARGE_QUEUE_SIZE; - syncsPerSec = syncs / secs; - System.out - .println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", - bytesSent, secs, ((long) syncs), syncsPerSec)); - update = getSyncs(); - System.out.println("Syncs " + (update - realSyncs)); - assertTrue(update - realSyncs < realSyncs); } - private long getSyncs() throws Exception { - Connector c = getConnector(); - ServerConfigurationFactory confFactory = new ServerConfigurationFactory(c.getInstance()); - AccumuloServerContext context = new AccumuloServerContext(confFactory); + private long getSyncs(AccumuloClient c) throws Exception { + ServerContext context = getServerContext(); for (String address : c.instanceOperations().getTabletServers()) { TabletClientService.Client client = ThriftUtil.getTServerClient(HostAndPort.fromString(address), context); diff --cc test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java index 72802e0,3575a91..90600c7 --- a/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java +++ b/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java @@@ -56,12 -52,9 +56,7 @@@ import com.google.common.collect.Iterat // It would be useful to have an IT that will test this situation. public class UnusedWALIT extends ConfigurableMacBase { - private ZooReaderWriter zk; - @Override - protected int defaultTimeoutSeconds() { - return 120; - } - - @Override protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { final long logSize = 1024 * 1024 * 10; cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); diff --cc test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java index b83b938,b89c223..cd25612 --- a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java @@@ -70,17 -67,15 +70,17 @@@ public class DeleteRowsIT extends Accum ROWS.add("{"); } - @Test(timeout = 5 * 60 * 1000) + @Test public void testDeleteAllRows() throws Exception { - Connector c = getConnector(); - String[] tableNames = this.getUniqueNames(20); - for (String tableName : tableNames) { - c.tableOperations().create(tableName); - c.tableOperations().deleteRows(tableName, null, null); - Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY); - assertEquals(0, Iterators.size(scanner.iterator())); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String[] tableNames = this.getUniqueNames(20); + for (String tableName : tableNames) { + c.tableOperations().create(tableName); + c.tableOperations().deleteRows(tableName, null, null); + try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) { + assertEquals(0, Iterators.size(scanner.iterator())); + } + } } } diff --cc test/src/main/java/org/apache/accumulo/test/functional/DurabilityIT.java index 6505d59,de423ff..9728a9b --- a/test/src/main/java/org/apache/accumulo/test/functional/DurabilityIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DurabilityIT.java @@@ -57,8 -59,13 +57,13 @@@ public class DurabilityIT extends Confi cfg.setNumTservers(1); } + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + @BeforeClass - static public void checkMR() { + public static void checkMR() { assumeFalse(IntegrationTestMapReduce.isMapReduce()); } @@@ -84,75 -93,91 +89,75 @@@ } } - private void createTable(String tableName) throws Exception { - TableOperations tableOps = getConnector().tableOperations(); - tableOps.create(tableName); - } - - @Test - public void testWriteSpeed() throws Exception { - TableOperations tableOps = getConnector().tableOperations(); - String tableNames[] = init(); - // write some gunk, delete the table to keep that table from messing with the performance - // numbers of successive calls - // sync - long t0 = writeSome(tableNames[0], N); - tableOps.delete(tableNames[0]); - // flush - long t1 = writeSome(tableNames[1], N); - tableOps.delete(tableNames[1]); - // log - long t2 = writeSome(tableNames[2], N); - tableOps.delete(tableNames[2]); - // none - long t3 = writeSome(tableNames[3], N); - tableOps.delete(tableNames[3]); - System.out.println(String.format("sync %d flush %d log %d none %d", t0, t1, t2, t3)); - assertTrue("flush should be faster than sync", t0 > t1); - assertTrue("log should be faster than flush", t1 > t2); - assertTrue("no durability should be faster than log", t2 > t3); + private void createTable(AccumuloClient c, String tableName) throws Exception { + c.tableOperations().create(tableName); } - @Test(timeout = 4 * 60 * 1000) + @Test public void testSync() throws Exception { - String tableNames[] = init(); - // sync table should lose nothing - writeSome(tableNames[0], N); - restartTServer(); - assertEquals(N, readSome(tableNames[0])); - cleanup(tableNames); + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + String[] tableNames = init(client); + // sync table should lose nothing + writeSome(client, tableNames[0], N); + restartTServer(); + assertEquals(N, readSome(client, tableNames[0])); + cleanup(client, tableNames); + } } - @Test(timeout = 4 * 60 * 1000) + @Test public void testFlush() throws Exception { - String tableNames[] = init(); - // flush table won't lose anything since we're not losing power/dfs - writeSome(tableNames[1], N); - restartTServer(); - assertEquals(N, readSome(tableNames[1])); - cleanup(tableNames); + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + String[] tableNames = init(client); + // flush table won't lose anything since we're not losing power/dfs + writeSome(client, tableNames[1], N); + restartTServer(); + assertEquals(N, readSome(client, tableNames[1])); + cleanup(client, tableNames); + } } - @Test(timeout = 4 * 60 * 1000) + @Test public void testLog() throws Exception { - String tableNames[] = init(); - // we're probably going to lose something the the log setting - writeSome(tableNames[2], N); - restartTServer(); - long numResults = readSome(tableNames[2]); - assertTrue("Expected " + N + " >= " + numResults, N >= numResults); - cleanup(tableNames); + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + String[] tableNames = init(client); + // we're probably going to lose something the the log setting + writeSome(client, tableNames[2], N); + restartTServer(); + long numResults = readSome(client, tableNames[2]); + assertTrue("Expected " + N + " >= " + numResults, numResults <= N); + cleanup(client, tableNames); + } } - @Test(timeout = 4 * 60 * 1000) + @Test public void testNone() throws Exception { - String tableNames[] = init(); - // probably won't get any data back without logging - writeSome(tableNames[3], N); - restartTServer(); - long numResults = readSome(tableNames[3]); - assertTrue("Expected " + N + " >= " + numResults, N >= numResults); - cleanup(tableNames); + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + String[] tableNames = init(client); + // probably won't get any data back without logging + writeSome(client, tableNames[3], N); + restartTServer(); + long numResults = readSome(client, tableNames[3]); + assertTrue("Expected " + N + " >= " + numResults, numResults <= N); + cleanup(client, tableNames); + } } - @Test(timeout = 4 * 60 * 1000) + @Test public void testIncreaseDurability() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); - writeSome(tableName, N); - restartTServer(); - long numResults = readSome(tableName); - assertTrue("Expected " + N + " >= " + numResults, N >= numResults); - c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); - writeSome(tableName, N); - restartTServer(); - assertTrue(N == readSome(tableName)); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); + writeSome(c, tableName, N); + restartTServer(); + long numResults = readSome(c, tableName); + assertTrue("Expected " + N + " >= " + numResults, numResults <= N); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); + writeSome(c, tableName, N); + restartTServer(); + assertEquals(N, readSome(c, tableName)); + } } private static Map<String,String> map(Iterable<Entry<String,String>> entries) { @@@ -163,23 -188,22 +168,23 @@@ return result; } - @Test(timeout = 4 * 60 * 1000) + @Test public void testMetaDurability() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - c.instanceOperations().setProperty(Property.TABLE_DURABILITY.getKey(), "none"); - Map<String,String> props = map(c.tableOperations().getProperties(MetadataTable.NAME)); - assertEquals("sync", props.get(Property.TABLE_DURABILITY.getKey())); - c.tableOperations().create(tableName); - props = map(c.tableOperations().getProperties(tableName)); - assertEquals("none", props.get(Property.TABLE_DURABILITY.getKey())); - restartTServer(); - assertTrue(c.tableOperations().exists(tableName)); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + String tableName = getUniqueNames(1)[0]; + c.instanceOperations().setProperty(Property.TABLE_DURABILITY.getKey(), "none"); + Map<String,String> props = map(c.tableOperations().getProperties(MetadataTable.NAME)); + assertEquals("sync", props.get(Property.TABLE_DURABILITY.getKey())); + c.tableOperations().create(tableName); + props = map(c.tableOperations().getProperties(tableName)); + assertEquals("none", props.get(Property.TABLE_DURABILITY.getKey())); + restartTServer(); + assertTrue(c.tableOperations().exists(tableName)); + } } - private long readSome(String table) throws Exception { - return Iterators.size(getConnector().createScanner(table, Authorizations.EMPTY).iterator()); + private long readSome(AccumuloClient client, String table) throws Exception { + return Iterators.size(client.createScanner(table, Authorizations.EMPTY).iterator()); } private void restartTServer() throws Exception { diff --cc test/src/main/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java index 53fa5a9,1db7cce..71458e2 --- a/test/src/main/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java @@@ -55,91 -50,95 +55,96 @@@ public class RegexGroupBalanceIT extend cfg.setNumTservers(4); } - @Test(timeout = 120000) + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test public void testBalancing() throws Exception { - Connector conn = getConnector(); - String tablename = getUniqueNames(1)[0]; - conn.tableOperations().create(tablename); - - SortedSet<Text> splits = new TreeSet<>(); - splits.add(new Text("01a")); - splits.add(new Text("01m")); - splits.add(new Text("01z")); - - splits.add(new Text("02a")); - splits.add(new Text("02f")); - splits.add(new Text("02r")); - splits.add(new Text("02z")); - - splits.add(new Text("03a")); - splits.add(new Text("03f")); - splits.add(new Text("03m")); - splits.add(new Text("03r")); - - conn.tableOperations().setProperty(tablename, RegexGroupBalancer.REGEX_PROPERTY, "(\\d\\d).*"); - conn.tableOperations().setProperty(tablename, RegexGroupBalancer.DEFAUT_GROUP_PROPERTY, "03"); - conn.tableOperations().setProperty(tablename, RegexGroupBalancer.WAIT_TIME_PROPERTY, "50ms"); - conn.tableOperations().setProperty(tablename, Property.TABLE_LOAD_BALANCER.getKey(), - RegexGroupBalancer.class.getName()); - - conn.tableOperations().addSplits(tablename, splits); - - while (true) { - Thread.sleep(250); - - Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename); - - boolean allGood = true; - allGood &= checkGroup(groupLocationCounts, "01", 1, 1, 3); - allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); - allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); - allGood &= checkTabletsPerTserver(groupLocationCounts, 3, 3, 4); - - if (allGood) { - break; + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + String tablename = getUniqueNames(1)[0]; + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text("01a")); + splits.add(new Text("01m")); + splits.add(new Text("01z")); + + splits.add(new Text("02a")); + splits.add(new Text("02f")); + splits.add(new Text("02r")); + splits.add(new Text("02z")); + + splits.add(new Text("03a")); + splits.add(new Text("03f")); + splits.add(new Text("03m")); + splits.add(new Text("03r")); + + HashMap<String,String> props = new HashMap<>(); + props.put(RegexGroupBalancer.REGEX_PROPERTY, "(\\d\\d).*"); + props.put(RegexGroupBalancer.DEFAUT_GROUP_PROPERTY, "03"); + props.put(RegexGroupBalancer.WAIT_TIME_PROPERTY, "50ms"); + props.put(Property.TABLE_LOAD_BALANCER.getKey(), RegexGroupBalancer.class.getName()); + + client.tableOperations().create(tablename, + new NewTableConfiguration().setProperties(props).withSplits(splits)); + + while (true) { + Thread.sleep(250); + + Table<String,String,MutableInt> groupLocationCounts = getCounts(client, tablename); + + boolean allGood = true; + allGood &= checkGroup(groupLocationCounts, "01", 1, 1, 3); + allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); + allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); + allGood &= checkTabletsPerTserver(groupLocationCounts, 3, 3, 4); + + if (allGood) { + break; + } } - } - splits.clear(); - splits.add(new Text("01b")); - splits.add(new Text("01f")); - splits.add(new Text("01l")); - splits.add(new Text("01r")); - conn.tableOperations().addSplits(tablename, splits); + splits.clear(); + splits.add(new Text("01b")); + splits.add(new Text("01f")); + splits.add(new Text("01l")); + splits.add(new Text("01r")); + client.tableOperations().addSplits(tablename, splits); - while (true) { - Thread.sleep(250); + while (true) { + Thread.sleep(250); - Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename); + Table<String,String,MutableInt> groupLocationCounts = getCounts(client, tablename); - boolean allGood = true; - allGood &= checkGroup(groupLocationCounts, "01", 1, 2, 4); - allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); - allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); - allGood &= checkTabletsPerTserver(groupLocationCounts, 4, 4, 4); + boolean allGood = true; + allGood &= checkGroup(groupLocationCounts, "01", 1, 2, 4); + allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); + allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); + allGood &= checkTabletsPerTserver(groupLocationCounts, 4, 4, 4); - if (allGood) { - break; + if (allGood) { + break; + } } - } - // merge group 01 down to one tablet - conn.tableOperations().merge(tablename, null, new Text("01z")); + // merge group 01 down to one tablet + client.tableOperations().merge(tablename, null, new Text("01z")); - while (true) { - Thread.sleep(250); + while (true) { + Thread.sleep(250); - Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename); + Table<String,String,MutableInt> groupLocationCounts = getCounts(client, tablename); - boolean allGood = true; - allGood &= checkGroup(groupLocationCounts, "01", 1, 1, 1); - allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); - allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); - allGood &= checkTabletsPerTserver(groupLocationCounts, 2, 3, 4); + boolean allGood = true; + allGood &= checkGroup(groupLocationCounts, "01", 1, 1, 1); + allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); + allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); + allGood &= checkTabletsPerTserver(groupLocationCounts, 2, 3, 4); - if (allGood) { - break; + if (allGood) { + break; + } } } } diff --cc test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java index 645b833,a1a989d..2088ea2 --- a/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java @@@ -54,99 -49,102 +54,104 @@@ public class SessionDurabilityIT extend cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); } - @Test(timeout = 3 * 60 * 1000) + @Override + protected int defaultTimeoutSeconds() { + return 3 * 60; + } + + @Test public void nondurableTableHasDurableWrites() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - // table default has no durability - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); - // send durable writes - BatchWriterConfig cfg = new BatchWriterConfig(); - cfg.setDurability(Durability.SYNC); - writeSome(tableName, 10, cfg); - assertEquals(10, count(tableName)); - // verify writes servive restart - restartTServer(); - assertEquals(10, count(tableName)); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + String tableName = getUniqueNames(1)[0]; + // table default has no durability + c.tableOperations().create(tableName, new NewTableConfiguration() + .setProperties(singletonMap(Property.TABLE_DURABILITY.getKey(), "none"))); + // send durable writes + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setDurability(Durability.SYNC); + writeSome(c, tableName, 10, cfg); + assertEquals(10, count(c, tableName)); + // verify writes servive restart + restartTServer(); + assertEquals(10, count(c, tableName)); + } } - @Test(timeout = 3 * 60 * 1000) + @Test public void durableTableLosesNonDurableWrites() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - // table default is durable writes - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); - // write with no durability - BatchWriterConfig cfg = new BatchWriterConfig(); - cfg.setDurability(Durability.NONE); - writeSome(tableName, 10, cfg); - // verify writes are lost on restart - restartTServer(); - assertTrue(10 > count(tableName)); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName, new NewTableConfiguration() + .setProperties(singletonMap(Property.TABLE_DURABILITY.getKey(), "sync"))); + // write with no durability + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setDurability(Durability.NONE); + writeSome(c, tableName, 10, cfg); + // verify writes are lost on restart + restartTServer(); + assertTrue(count(c, tableName) < 10); + } } - private int count(String tableName) throws Exception { - return Iterators.size(getConnector().createScanner(tableName, Authorizations.EMPTY).iterator()); + private int count(AccumuloClient client, String tableName) throws Exception { + return Iterators.size(client.createScanner(tableName, Authorizations.EMPTY).iterator()); } - private void writeSome(String tableName, int n, BatchWriterConfig cfg) throws Exception { - Connector c = getConnector(); - BatchWriter bw = c.createBatchWriter(tableName, cfg); - for (int i = 0; i < n; i++) { - Mutation m = new Mutation(i + ""); - m.put("", "", ""); - bw.addMutation(m); + private void writeSome(AccumuloClient c, String tableName, int n, BatchWriterConfig cfg) + throws Exception { + try (BatchWriter bw = c.createBatchWriter(tableName, cfg)) { + for (int i = 0; i < n; i++) { + Mutation m = new Mutation(i + ""); + m.put("", "", ""); + bw.addMutation(m); + } } - bw.close(); } - @Test(timeout = 3 * 60 * 1000) + @Test public void testConditionDurability() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - // table default is durable writes - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); - // write without durability - ConditionalWriterConfig cfg = new ConditionalWriterConfig(); - cfg.setDurability(Durability.NONE); - conditionWriteSome(tableName, 10, cfg); - // everything in there? - assertEquals(10, count(tableName)); - // restart the server and verify the updates are lost - restartTServer(); - assertEquals(0, count(tableName)); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName, new NewTableConfiguration() + .setProperties(singletonMap(Property.TABLE_DURABILITY.getKey(), "sync"))); + // write without durability + ConditionalWriterConfig cfg = new ConditionalWriterConfig(); + cfg.setDurability(Durability.NONE); + conditionWriteSome(c, tableName, 10, cfg); + // everything in there? + assertEquals(10, count(c, tableName)); + // restart the server and verify the updates are lost + restartTServer(); + assertEquals(0, count(c, tableName)); + } } - @Test(timeout = 3 * 60 * 1000) + @Test public void testConditionDurability2() throws Exception { - Connector c = getConnector(); - String tableName = getUniqueNames(1)[0]; - // table default is durable writes - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); - // write with durability - ConditionalWriterConfig cfg = new ConditionalWriterConfig(); - cfg.setDurability(Durability.SYNC); - conditionWriteSome(tableName, 10, cfg); - // everything in there? - assertEquals(10, count(tableName)); - // restart the server and verify the updates are still there - restartTServer(); - assertEquals(10, count(tableName)); + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName, new NewTableConfiguration() + .setProperties(singletonMap(Property.TABLE_DURABILITY.getKey(), "none"))); + // write with durability + ConditionalWriterConfig cfg = new ConditionalWriterConfig(); + cfg.setDurability(Durability.SYNC); + conditionWriteSome(c, tableName, 10, cfg); + // everything in there? + assertEquals(10, count(c, tableName)); + // restart the server and verify the updates are still there + restartTServer(); + assertEquals(10, count(c, tableName)); + } } - private void conditionWriteSome(String tableName, int n, ConditionalWriterConfig cfg) - throws Exception { - Connector c = getConnector(); + private void conditionWriteSome(AccumuloClient c, String tableName, int n, + ConditionalWriterConfig cfg) throws Exception { ConditionalWriter cw = c.createConditionalWriter(tableName, cfg); for (int i = 0; i < n; i++) { - ConditionalMutation m = - new ConditionalMutation((CharSequence) (i + ""), new Condition("", "")); + ConditionalMutation m = new ConditionalMutation(i + "", new Condition("", "")); m.put("", "", "X"); assertEquals(Status.ACCEPTED, cw.write(m).getStatus()); }