Merge branch '1.6' into 1.7 Conflicts: test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/94f4a19c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/94f4a19c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/94f4a19c Branch: refs/heads/master Commit: 94f4a19c0d776f3a57e855863c34e9e3164b615b Parents: 6562828 29c6052 Author: Josh Elser <els...@apache.org> Authored: Tue Jan 12 12:25:34 2016 -0500 Committer: Josh Elser <els...@apache.org> Committed: Tue Jan 12 12:25:34 2016 -0500 ---------------------------------------------------------------------- test/src/test/java/org/apache/accumulo/test/CleanWalIT.java | 2 +- .../java/org/apache/accumulo/test/DetectDeadTabletServersIT.java | 2 +- test/src/test/java/org/apache/accumulo/test/ExistingMacIT.java | 2 +- .../org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java | 2 +- .../test/java/org/apache/accumulo/test/MultiTableRecoveryIT.java | 2 +- .../org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java | 2 +- .../test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java | 2 +- .../test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java | 2 +- .../java/org/apache/accumulo/test/functional/BinaryStressIT.java | 2 +- .../test/java/org/apache/accumulo/test/functional/CleanTmpIT.java | 2 +- .../java/org/apache/accumulo/test/functional/CompactionIT.java | 2 +- .../java/org/apache/accumulo/test/functional/DurabilityIT.java | 2 +- .../org/apache/accumulo/test/functional/GarbageCollectorIT.java | 2 +- .../org/apache/accumulo/test/functional/KerberosRenewalIT.java | 2 +- .../java/org/apache/accumulo/test/functional/MasterFailoverIT.java | 2 +- .../test/java/org/apache/accumulo/test/functional/RestartIT.java | 2 +- .../java/org/apache/accumulo/test/functional/RestartStressIT.java | 2 +- .../org/apache/accumulo/test/functional/SessionDurabilityIT.java | 2 +- .../java/org/apache/accumulo/test/functional/WriteAheadLogIT.java | 2 +- .../org/apache/accumulo/test/functional/ZookeeperRestartIT.java | 2 +- .../accumulo/test/replication/MultiInstanceReplicationIT.java | 2 +- .../test/replication/UnorderedWorkAssignerReplicationIT.java | 2 +- 22 files changed, 22 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/CleanWalIT.java index d754a14,b2298f7..08e3c09 --- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java +++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java @@@ -57,10 -54,9 +57,10 @@@ public class CleanWalIT extends Accumul @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { -- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); cfg.setNumTservers(1); - cfg.useMiniDFS(true); + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @Before http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java index 04c781b,1e65601..f7ee089 --- a/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java +++ b/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java @@@ -42,7 -41,7 +42,7 @@@ public class DetectDeadTabletServersIT @Override protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { -- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); } @Test http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/ExistingMacIT.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/MultiTableRecoveryIT.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java index 1a3c92f,0000000..57c2c34 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java +++ b/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java @@@ -1,101 -1,0 +1,101 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.Map.Entry; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +// Accumulo3010 +public class RecoveryCompactionsAreFlushesIT extends AccumuloClusterIT { + + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + // file system supports recovery + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Test + public void test() throws Exception { + // create a table + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100"); + c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "3"); + // create 3 flush files + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", new Value("v".getBytes())); + for (int i = 0; i < 3; i++) { + bw.addMutation(m); + bw.flush(); + c.tableOperations().flush(tableName, null, null, true); + } + // create an unsaved mutation + bw.addMutation(m); + bw.close(); + + ClusterControl control = cluster.getClusterControl(); + + // kill the tablet servers + control.stopAllServers(ServerType.TABLET_SERVER); + + // recover + control.startAllServers(ServerType.TABLET_SERVER); + + // ensure the table is readable + Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator()); + + // ensure that the recovery was not a merging minor compaction + Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + for (Entry<Key,Value> entry : s) { + String filename = entry.getKey().getColumnQualifier().toString(); + String parts[] = filename.split("/"); + Assert.assertFalse(parts[parts.length - 1].startsWith("M")); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java index 081ee85,0000000..68bd07b mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java +++ b/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java @@@ -1,75 -1,0 +1,75 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; + +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +// ACCUMULO-2480 +public class TabletServerGivesUpIT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.useMiniDFS(true); + cfg.setNumTservers(1); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES, "15"); + cfg.setProperty(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION, "0s"); + } + + @Test(timeout = 30 * 1000) + public void test() throws Exception { + final Connector conn = this.getConnector(); + // Yes, there's a tabletserver + assertEquals(1, conn.instanceOperations().getTabletServers().size()); + final String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + // Kill dfs + cluster.getMiniDfs().shutdown(); + // ask the tserver to do something + final AtomicReference<Exception> ex = new AtomicReference<>(); + Thread splitter = new Thread() { + @Override + public void run() { + try { + TreeSet<Text> splits = new TreeSet<>(); + splits.add(new Text("X")); + conn.tableOperations().addSplits(tableName, splits); + } catch (Exception e) { + ex.set(e); + } + } + }; + splitter.start(); + // wait for the tserver to give up on writing to the WAL + while (conn.instanceOperations().getTabletServers().size() == 1) { + UtilWaitThread.sleep(1000); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java index 8338021,0000000..c318075 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java +++ b/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java @@@ -1,107 -1,0 +1,107 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.test.functional.FunctionalTestUtils; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +public class VerifySerialRecoveryIT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "20"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Test(timeout = 4 * 60 * 1000) + public void testSerializedRecovery() throws Exception { + // make a table with many splits + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + c.tableOperations().create(tableName); + SortedSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < 200; i++) { + splits.add(new Text(AssignmentThreadsIT.randomHex(8))); + } + c.tableOperations().addSplits(tableName, splits); + // load data to give the recovery something to do + BatchWriter bw = c.createBatchWriter(tableName, null); + for (int i = 0; i < 50000; i++) { + Mutation m = new Mutation(AssignmentThreadsIT.randomHex(8)); + m.put("", "", ""); + bw.addMutation(m); + } + bw.close(); + // kill the tserver + for (ProcessReference ref : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) + getCluster().killProcess(ServerType.TABLET_SERVER, ref); + final Process ts = cluster.exec(TabletServer.class); + + // wait for recovery + Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator()); + assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor()); + ts.waitFor(); + String result = FunctionalTestUtils.readAll(cluster, TabletServer.class, ts); + for (String line : result.split("\n")) { + System.out.println(line); + } + // walk through the output, verifying that only a single normal recovery was running at one time + boolean started = false; + int recoveries = 0; + for (String line : result.split("\n")) { + // ignore metadata tables + if (line.contains("!0") || line.contains("+r")) + continue; + if (line.contains("Starting Write-Ahead Log")) { + assertFalse(started); + started = true; + recoveries++; + } + if (line.contains("Write-Ahead Log recovery complete")) { + assertTrue(started); + started = false; + } + } + assertFalse(started); + assertTrue(recoveries > 0); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java index 62d8738,fbe504e..f1bca1b --- a/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java @@@ -50,9 -51,10 +50,9 @@@ public class BinaryStressIT extends Acc @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); - Map<String,String> siteConfig = new HashMap<String,String>(); - siteConfig.put(Property.TSERV_MAXMEM.getKey(), "50K"); - siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0"); - cfg.setSiteConfig(siteConfig); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_MAXMEM, "50K"); + cfg.setProperty(Property.TSERV_MAJC_DELAY, "0"); } private String majcDelay, maxMem; http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java index 921d661,65422c9..d03007e --- a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java @@@ -51,11 -53,12 +51,11 @@@ public class CleanTmpIT extends Configu private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class); @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - Map<String,String> props = new HashMap<String,String>(); - props.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s"); - cfg.setSiteConfig(props); + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); cfg.setNumTservers(1); - cfg.useMiniDFS(true); + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java index 2fe5470,907d17d..818dbc4 --- a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java @@@ -58,12 -51,11 +58,12 @@@ public class CompactionIT extends Accum @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - Map<String,String> map = new HashMap<String,String>(); - map.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4"); - map.put(Property.TSERV_MAJC_DELAY.getKey(), "1"); - map.put(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1"); - cfg.setSiteConfig(map); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN, "4"); + cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); + cfg.setProperty(Property.TSERV_MAJC_MAXCONCURRENT, "1"); + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java index 9a262a1,0000000..819347e mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java @@@ -1,222 -1,0 +1,222 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +public class DurabilityIT extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(DurabilityIT.class); + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setNumTservers(1); + } + + static final long N = 100000; + + private String[] init() throws Exception { + String[] tableNames = getUniqueNames(4); + Connector c = getConnector(); + TableOperations tableOps = c.tableOperations(); + createTable(tableNames[0]); + createTable(tableNames[1]); + createTable(tableNames[2]); + createTable(tableNames[3]); + // default is sync + tableOps.setProperty(tableNames[1], Property.TABLE_DURABILITY.getKey(), "flush"); + tableOps.setProperty(tableNames[2], Property.TABLE_DURABILITY.getKey(), "log"); + tableOps.setProperty(tableNames[3], Property.TABLE_DURABILITY.getKey(), "none"); + return tableNames; + } + + private void cleanup(String[] tableNames) throws Exception { + Connector c = getConnector(); + for (String tableName : tableNames) { + c.tableOperations().delete(tableName); + } + } + + private void createTable(String tableName) throws Exception { + TableOperations tableOps = getConnector().tableOperations(); + tableOps.create(tableName); + } + + @Test(timeout = 2 * 60 * 1000) + public void testWriteSpeed() throws Exception { + TableOperations tableOps = getConnector().tableOperations(); + String tableNames[] = init(); + // write some gunk, delete the table to keep that table from messing with the performance numbers of successive calls + // sync + long t0 = writeSome(tableNames[0], N); + tableOps.delete(tableNames[0]); + // flush + long t1 = writeSome(tableNames[1], N); + tableOps.delete(tableNames[1]); + // log + long t2 = writeSome(tableNames[2], N); + tableOps.delete(tableNames[2]); + // none + long t3 = writeSome(tableNames[3], N); + tableOps.delete(tableNames[3]); + System.out.println(String.format("sync %d flush %d log %d none %d", t0, t1, t2, t3)); + assertTrue("flush should be faster than sync", t0 > t1); + assertTrue("log should be faster than flush", t1 > t2); + assertTrue("no durability should be faster than log", t2 > t3); + } + + @Test(timeout = 4 * 60 * 1000) + public void testSync() throws Exception { + String tableNames[] = init(); + // sync table should lose nothing + writeSome(tableNames[0], N); + restartTServer(); + assertEquals(N, readSome(tableNames[0])); + cleanup(tableNames); + } + + @Test(timeout = 4 * 60 * 1000) + public void testFlush() throws Exception { + String tableNames[] = init(); + // flush table won't lose anything since we're not losing power/dfs + writeSome(tableNames[1], N); + restartTServer(); + assertEquals(N, readSome(tableNames[1])); + cleanup(tableNames); + } + + @Test(timeout = 4 * 60 * 1000) + public void testLog() throws Exception { + String tableNames[] = init(); + // we're probably going to lose something the the log setting + writeSome(tableNames[2], N); + restartTServer(); + long numResults = readSome(tableNames[2]); + assertTrue("Expected " + N + " >= " + numResults, N >= numResults); + cleanup(tableNames); + } + + @Test(timeout = 4 * 60 * 1000) + public void testNone() throws Exception { + String tableNames[] = init(); + // probably won't get any data back without logging + writeSome(tableNames[3], N); + restartTServer(); + long numResults = readSome(tableNames[3]); + assertTrue("Expected " + N + " >= " + numResults, N >= numResults); + cleanup(tableNames); + } + + @Test(timeout = 4 * 60 * 1000) + public void testIncreaseDurability() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); + writeSome(tableName, N); + restartTServer(); + long numResults = readSome(tableName); + assertTrue("Expected " + N + " >= " + numResults, N >= numResults); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); + writeSome(tableName, N); + restartTServer(); + assertTrue(N == readSome(tableName)); + } + + private static Map<String,String> map(Iterable<Entry<String,String>> entries) { + Map<String,String> result = new HashMap<String,String>(); + for (Entry<String,String> entry : entries) { + result.put(entry.getKey(), entry.getValue()); + } + return result; + } + + @Test(timeout = 4 * 60 * 1000) + public void testMetaDurability() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.instanceOperations().setProperty(Property.TABLE_DURABILITY.getKey(), "none"); + Map<String,String> props = map(c.tableOperations().getProperties(MetadataTable.NAME)); + assertEquals("sync", props.get(Property.TABLE_DURABILITY.getKey())); + c.tableOperations().create(tableName); + props = map(c.tableOperations().getProperties(tableName)); + assertEquals("none", props.get(Property.TABLE_DURABILITY.getKey())); + restartTServer(); + assertTrue(c.tableOperations().exists(tableName)); + } + + private long readSome(String table) throws Exception { + return Iterators.size(getConnector().createScanner(table, Authorizations.EMPTY).iterator()); + } + + private void restartTServer() throws Exception { + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.start(); + } + + private long writeSome(String table, long count) throws Exception { + int iterations = 5; + long[] attempts = new long[iterations]; + for (int attempt = 0; attempt < iterations; attempt++) { + long now = System.currentTimeMillis(); + Connector c = getConnector(); + BatchWriter bw = c.createBatchWriter(table, null); + for (int i = 1; i < count + 1; i++) { + Mutation m = new Mutation("" + i); + m.put("", "", ""); + bw.addMutation(m); + if (i % (Math.max(1, count / 100)) == 0) { + bw.flush(); + } + } + bw.close(); + attempts[attempt] = System.currentTimeMillis() - now; + } + Arrays.sort(attempts); + log.info("Attempt durations: {}", Arrays.toString(attempts)); + // Return the median duration + return attempts[2]; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java index 132cbcc,d5b92cf..6ab0541 --- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java @@@ -80,13 -80,14 +80,13 @@@ public class GarbageCollectorIT extend @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - Map<String,String> settings = new HashMap<String,String>(); - settings.put(Property.INSTANCE_SECRET.getKey(), OUR_SECRET); - settings.put(Property.GC_CYCLE_START.getKey(), "1"); - settings.put(Property.GC_CYCLE_DELAY.getKey(), "1"); - settings.put(Property.GC_PORT.getKey(), "0"); - settings.put(Property.TSERV_MAXMEM.getKey(), "5K"); - settings.put(Property.TSERV_MAJC_DELAY.getKey(), "1"); - cfg.setSiteConfig(settings); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET); + cfg.setProperty(Property.GC_CYCLE_START, "1"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); + cfg.setProperty(Property.GC_PORT, "0"); + cfg.setProperty(Property.TSERV_MAXMEM, "5K"); + cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java index 19908f6,0000000..28c1dfc mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java @@@ -1,188 -1,0 +1,188 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloIT; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.TestingKdc; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +/** + * MAC test which uses {@link MiniKdc} to simulate ta secure environment. Can be used as a sanity check for Kerberos/SASL testing. + */ +public class KerberosRenewalIT extends AccumuloIT { + private static final Logger log = LoggerFactory.getLogger(KerberosRenewalIT.class); + + private static TestingKdc kdc; + private static String krbEnabledForITs = null; + private static ClusterUser rootUser; + + private static final long TICKET_LIFETIME = 6 * 60 * 1000; // Anything less seems to fail when generating the ticket + private static final long TICKET_TEST_LIFETIME = 8 * 60 * 1000; // Run a test for 8 mins + private static final long TEST_DURATION = 9 * 60 * 1000; // The test should finish within 9 mins + + @BeforeClass + public static void startKdc() throws Exception { + // 30s renewal time window + kdc = new TestingKdc(TestingKdc.computeKdcDir(), TestingKdc.computeKeytabDir(), TICKET_LIFETIME); + kdc.start(); + krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION); + if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true"); + } + rootUser = kdc.getRootUser(); + } + + @AfterClass + public static void stopKdc() throws Exception { + if (null != kdc) { + kdc.stop(); + } + if (null != krbEnabledForITs) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs); + } + } + + @Override + public int defaultTimeoutSeconds() { + return (int) TEST_DURATION / 1000; + } + + private MiniAccumuloClusterImpl mac; + + @Before + public void startMac() throws Exception { + MiniClusterHarness harness = new MiniClusterHarness(); + mac = harness.create(this, new PasswordToken("unused"), kdc, new MiniClusterConfigurationCallback() { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + Map<String,String> site = cfg.getSiteConfig(); - site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); ++ site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); + // Reduce the period just to make sure we trigger renewal fast + site.put(Property.GENERAL_KERBEROS_RENEWAL_PERIOD.getKey(), "5s"); + cfg.setSiteConfig(site); + } + + }); + + mac.getConfig().setNumTservers(1); + mac.start(); + // Enabled kerberos auth + Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + @After + public void stopMac() throws Exception { + if (null != mac) { + mac.stop(); + } + } + + // Intentially setting the Test annotation timeout. We do not want to scale the timeout. + @Test(timeout = TEST_DURATION) + public void testReadAndWriteThroughTicketLifetime() throws Exception { + // Attempt to use Accumulo for a duration of time that exceeds the Kerberos ticket lifetime. + // This is a functional test to verify that Accumulo services renew their ticket. + // If the test doesn't finish on its own, this signifies that Accumulo services failed + // and the test should fail. If Accumulo services renew their ticket, the test case + // should exit gracefully on its own. + + // Login as the "root" user + UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + long duration = 0; + long last = System.currentTimeMillis(); + // Make sure we have a couple renewals happen + while (duration < TICKET_TEST_LIFETIME) { + // Create a table, write a record, compact, read the record, drop the table. + createReadWriteDrop(conn); + // Wait a bit after + Thread.sleep(5000); + + // Update the duration + long now = System.currentTimeMillis(); + duration += now - last; + last = now; + } + } + + /** + * Creates a table, adds a record to it, and then compacts the table. A simple way to make sure that the system user exists (since the master does an RPC to + * the tserver which will create the system user if it doesn't already exist). + */ + private void createReadWriteDrop(Connector conn) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, TableExistsException { + final String table = testName.getMethodName() + "_table"; + conn.tableOperations().create(table); + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", "d"); + bw.addMutation(m); + bw.close(); + conn.tableOperations().compact(table, new CompactionConfig().setFlush(true).setWait(true)); + Scanner s = conn.createScanner(table, Authorizations.EMPTY); + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + assertEquals("Did not find the expected key", 0, new Key("a", "b", "c").compareTo(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL)); + assertEquals("d", entry.getValue().toString()); + conn.tableOperations().delete(table); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java index dd83574,0c2631f..49160aa --- a/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java @@@ -37,9 -35,7 +37,9 @@@ public class MasterFailoverIT extends A @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setSiteConfig(Collections.singletonMap(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s")); + Map<String,String> siteConfig = cfg.getSiteConfig(); - siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s"); ++ siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); + cfg.setSiteConfig(siteConfig); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java index 2ba6d31,b498412..d1fb9f9 --- a/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java @@@ -70,10 -66,12 +70,10 @@@ public class RestartIT extends Accumulo @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - Map<String,String> props = new HashMap<String,String>(); - props.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); - props.put(Property.GC_CYCLE_DELAY.getKey(), "1s"); - props.put(Property.GC_CYCLE_START.getKey(), "1s"); - cfg.setSiteConfig(props); - cfg.useMiniDFS(true); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } private static final ScannerOpts SOPTS = new ScannerOpts(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java index 3f7d67d,c4b3afd..68448eb --- a/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java @@@ -58,10 -55,10 +58,10 @@@ public class RestartStressIT extends Ac opts.put(Property.TSERV_MAXMEM.getKey(), "100K"); opts.put(Property.TSERV_MAJC_DELAY.getKey(), "100ms"); opts.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1M"); - opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s"); + opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); opts.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s"); cfg.setSiteConfig(opts); - cfg.useMiniDFS(true); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java index aec6bae,0000000..ca45382 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java @@@ -1,153 -1,0 +1,153 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriter.Status; +import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Durability; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +public class SessionDurabilityIT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + } + + @Test(timeout = 3 * 60 * 1000) + public void nondurableTableHasDurableWrites() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + // table default has no durability + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); + // send durable writes + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setDurability(Durability.SYNC); + writeSome(tableName, 10, cfg); + assertEquals(10, count(tableName)); + // verify writes servive restart + restartTServer(); + assertEquals(10, count(tableName)); + } + + @Test(timeout = 3 * 60 * 1000) + public void durableTableLosesNonDurableWrites() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); + // write with no durability + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setDurability(Durability.NONE); + writeSome(tableName, 10, cfg); + // verify writes are lost on restart + restartTServer(); + assertTrue(10 > count(tableName)); + } + + private int count(String tableName) throws Exception { + return Iterators.size(getConnector().createScanner(tableName, Authorizations.EMPTY).iterator()); + } + + private void writeSome(String tableName, int n, BatchWriterConfig cfg) throws Exception { + Connector c = getConnector(); + BatchWriter bw = c.createBatchWriter(tableName, cfg); + for (int i = 0; i < n; i++) { + Mutation m = new Mutation(i + ""); + m.put("", "", ""); + bw.addMutation(m); + } + bw.close(); + } + + @Test(timeout = 3 * 60 * 1000) + public void testConditionDurability() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); + // write without durability + ConditionalWriterConfig cfg = new ConditionalWriterConfig(); + cfg.setDurability(Durability.NONE); + conditionWriteSome(tableName, 10, cfg); + // everything in there? + assertEquals(10, count(tableName)); + // restart the server and verify the updates are lost + restartTServer(); + assertEquals(0, count(tableName)); + } + + @Test(timeout = 3 * 60 * 1000) + public void testConditionDurability2() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + // table default is durable writes + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); + // write with durability + ConditionalWriterConfig cfg = new ConditionalWriterConfig(); + cfg.setDurability(Durability.SYNC); + conditionWriteSome(tableName, 10, cfg); + // everything in there? + assertEquals(10, count(tableName)); + // restart the server and verify the updates are still there + restartTServer(); + assertEquals(10, count(tableName)); + } + + private void conditionWriteSome(String tableName, int n, ConditionalWriterConfig cfg) throws Exception { + Connector c = getConnector(); + ConditionalWriter cw = c.createConditionalWriter(tableName, cfg); + for (int i = 0; i < n; i++) { + ConditionalMutation m = new ConditionalMutation((CharSequence) (i + ""), new Condition("", "")); + m.put("", "", "X"); + assertEquals(Status.ACCEPTED, cw.write(m).getStatus()); + } + } + + private void restartTServer() throws Exception { + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.start(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java index 44473b0,bfca75b..bc36257 --- a/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java @@@ -35,13 -35,15 +35,13 @@@ public class WriteAheadLogIT extends Ac @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - Map<String,String> siteConfig = new HashMap<String,String>(); - siteConfig.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "2M"); - siteConfig.put(Property.GC_CYCLE_DELAY.getKey(), "1"); - siteConfig.put(Property.GC_CYCLE_START.getKey(), "1"); - siteConfig.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s"); - siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "1"); - siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "4s"); - cfg.setSiteConfig(siteConfig); - cfg.useMiniDFS(true); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); + cfg.setProperty(Property.GC_CYCLE_START, "1"); + cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s"); + cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "4s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java index f852901,f852901..fefb9a6 --- a/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java @@@ -45,7 -45,7 +45,7 @@@ public class ZookeeperRestartIT extend @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { Map<String,String> siteConfig = new HashMap<String,String>(); -- siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s"); ++ siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); cfg.setSiteConfig(siteConfig); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java index bd10f90,0000000..35bc0fe mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java @@@ -1,733 -1,0 +1,733 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.master.replication.SequentialWorkAssigner; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.replication.ReplicaSystemFactory; +import org.apache.accumulo.server.replication.StatusUtil; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +/** + * Replication tests which start at least two MAC instances and replicate data between them + */ +public class MultiInstanceReplicationIT extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(MultiInstanceReplicationIT.class); + + private ExecutorService executor; + + @Override + public int defaultTimeoutSeconds() { + return 10 * 60; + } + + @Before + public void createExecutor() { + executor = Executors.newSingleThreadExecutor(); + } + + @After + public void stopExecutor() { + if (null != executor) { + executor.shutdownNow(); + } + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName()); + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + /** + * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication + */ + private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { + // Set the same SSL information from the primary when present + Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); + if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + Map<String,String> peerSiteConfig = new HashMap<String,String>(); + peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); + Assert.assertNotNull("Keystore Path was null", keystorePath); + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); + String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); + Assert.assertNotNull("Truststore Path was null", truststorePath); + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); + + // Passwords might be stored in CredentialProvider + String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); + if (null != keystorePassword) { + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); + } + String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); + if (null != truststorePassword) { + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + } + + System.out.println("Setting site configuration for peer " + peerSiteConfig); + peerCfg.setSiteConfig(peerSiteConfig); + } + + // Use the CredentialProvider if the primary also uses one + String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProvider) { + Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); + peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); + peerCfg.setSiteConfig(peerSiteConfig); + } + } + + @Test + public void dataWasReplicatedToThePeer() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + + MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg); + + peerCluster.start(); + + try { + final Connector connMaster = getConnector(); + final Connector connPeer = peerCluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)); + + ReplicationTable.setOnline(connMaster); + + String peerUserName = "peer", peerPassword = "foo"; + + String peerClusterName = "peer"; + + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); + + final String masterTable = "master", peerTable = "peer"; + + connMaster.tableOperations().create(masterTable); + String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); + Assert.assertNotNull(masterTableId); + + connPeer.tableOperations().create(peerTable); + String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); + Assert.assertNotNull(peerTableId); + + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable); + + log.info("Files to replicate: " + filesNeedingReplication); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.exec(TabletServer.class); + + log.info("TabletServer restarted"); + Iterators.size(ReplicationTable.getScanner(connMaster).iterator()); + log.info("TabletServer is online"); + + while (!ReplicationTable.isOnline(connMaster)) { + log.info("Replication table still offline, waiting"); + Thread.sleep(5000); + } + + log.info(""); + log.info("Fetching metadata records:"); + for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } else { + log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); + } + } + + log.info(""); + log.info("Fetching replication records:"); + for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + Future<Boolean> future = executor.submit(new Callable<Boolean>() { + + @Override + public Boolean call() throws Exception { + long then = System.currentTimeMillis(); + connMaster.replicationOperations().drain(masterTable, filesNeedingReplication); + long now = System.currentTimeMillis(); + log.info("Drain completed in " + (now - then) + "ms"); + return true; + } + + }); + + try { + future.get(60, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + Assert.fail("Drain did not finish within 60 seconds"); + } finally { + executor.shutdownNow(); + } + + log.info("drain completed"); + + log.info(""); + log.info("Fetching metadata records:"); + for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } else { + log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); + } + } + + log.info(""); + log.info("Fetching replication records:"); + for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); + Entry<Key,Value> masterEntry = null, peerEntry = null; + while (masterIter.hasNext() && peerIter.hasNext()) { + masterEntry = masterIter.next(); + peerEntry = peerIter.next(); + Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, + masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); + Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); + } + + log.info("Last master entry: " + masterEntry); + log.info("Last peer entry: " + peerEntry); + + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); + Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); + } finally { + peerCluster.stop(); + } + } + + @Test + public void dataReplicatedToCorrectTable() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + + MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg); + + peer1Cluster.start(); + + try { + Connector connMaster = getConnector(); + Connector connPeer = peer1Cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)); + + String peerClusterName = "peer"; + String peerUserName = "peer", peerPassword = "foo"; + + // Create local user + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); + + String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; + + // Create tables + connMaster.tableOperations().create(masterTable1); + String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); + Assert.assertNotNull(masterTableId1); + + connMaster.tableOperations().create(masterTable2); + String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); + Assert.assertNotNull(masterTableId2); + + connPeer.tableOperations().create(peerTable1); + String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + connPeer.tableOperations().create(peerTable2); + String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); + Assert.assertNotNull(peerTableId2); + + // Grant write permission + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1); + + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); + long masterTable1Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable1Records++; + } + bw.addMutation(m); + } + + bw.close(); + + // Write some data to table2 + bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); + long masterTable2Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable2 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable2Records++; + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles( + masterTable2); + + log.info("Files to replicate for table1: " + filesFor1); + log.info("Files to replicate for table2: " + filesFor2); + + // Restart the tserver to force a close on the WAL + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.exec(TabletServer.class); + + log.info("Restarted the tserver"); + + // Read the data -- the tserver is back up and running + Iterators.size(connMaster.createScanner(masterTable1, Authorizations.EMPTY).iterator()); + + while (!ReplicationTable.isOnline(connMaster)) { + log.info("Replication table still offline, waiting"); + Thread.sleep(5000); + } + + // Wait for both tables to be replicated + log.info("Waiting for {} for {}", filesFor1, masterTable1); + connMaster.replicationOperations().drain(masterTable1, filesFor1); + + log.info("Waiting for {} for {}", filesFor2, masterTable2); + connMaster.replicationOperations().drain(masterTable2, filesFor2); + + long countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable1)); + } + + log.info("Found {} records in {}", countTable, peerTable1); + Assert.assertEquals(masterTable1Records, countTable); + + countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable2)); + } + + log.info("Found {} records in {}", countTable, peerTable2); + Assert.assertEquals(masterTable2Records, countTable); + + } finally { + peer1Cluster.stop(); + } + } + + @Test + public void dataWasReplicatedToThePeerWithoutDrain() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + + MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg); + + peerCluster.start(); + + Connector connMaster = getConnector(); + Connector connPeer = peerCluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)); + + String peerUserName = "repl"; + String peerPassword = "passwd"; + + // Create a user on the peer for replication to use + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + String peerClusterName = "peer"; + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + String masterTable = "master", peerTable = "peer"; + + connMaster.tableOperations().create(masterTable); + String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); + Assert.assertNotNull(masterTableId); + + connPeer.tableOperations().create(peerTable); + String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); + Assert.assertNotNull(peerTableId); + + // Give our replication user the ability to write to the table + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + Set<String> files = connMaster.replicationOperations().referencedFiles(masterTable); + + log.info("Files to replicate:" + files); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + cluster.exec(TabletServer.class); + + while (!ReplicationTable.isOnline(connMaster)) { + log.info("Replication table still offline, waiting"); + Thread.sleep(5000); + } + + Iterators.size(connMaster.createScanner(masterTable, Authorizations.EMPTY).iterator()); + + for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { + log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + connMaster.replicationOperations().drain(masterTable, files); + + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); + while (masterIter.hasNext() && peerIter.hasNext()) { + Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next(); + Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, + masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); + Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); + } + + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); + Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); + + peerCluster.stop(); + } + + @Test + public void dataReplicatedToCorrectTableWithoutDrain() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + + MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg); + + peer1Cluster.start(); + + try { + Connector connMaster = getConnector(); + Connector connPeer = peer1Cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)); + + String peerClusterName = "peer"; + + String peerUserName = "repl"; + String peerPassword = "passwd"; + + // Create a user on the peer for replication to use + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); + + String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; + + connMaster.tableOperations().create(masterTable1); + String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); + Assert.assertNotNull(masterTableId1); + + connMaster.tableOperations().create(masterTable2); + String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); + Assert.assertNotNull(masterTableId2); + + connPeer.tableOperations().create(peerTable1); + String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + connPeer.tableOperations().create(peerTable2); + String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); + Assert.assertNotNull(peerTableId2); + + // Give our replication user the ability to write to the tables + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1); + + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + // Write some data to table2 + bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable2 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + cluster.exec(TabletServer.class); + + while (!ReplicationTable.isOnline(connMaster)) { + log.info("Replication table still offline, waiting"); + Thread.sleep(5000); + } + + // Wait until we fully replicated something + boolean fullyReplicated = false; + for (int i = 0; i < 10 && !fullyReplicated; i++) { + UtilWaitThread.sleep(2000); + + Scanner s = ReplicationTable.getScanner(connMaster); + WorkSection.limit(s); + for (Entry<Key,Value> entry : s) { + Status status = Status.parseFrom(entry.getValue().get()); + if (StatusUtil.isFullyReplicated(status)) { + fullyReplicated |= true; + } + } + } + + Assert.assertNotEquals(0, fullyReplicated); + + // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it + // Be cautious in how quickly we assert that the data is present on the peer + long countTable = 0l; + for (int i = 0; i < 10; i++) { + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable1)); + } + + log.info("Found {} records in {}", countTable, peerTable1); + + if (0l == countTable) { + Thread.sleep(5000); + } else { + break; + } + } + + Assert.assertTrue("Found no records in " + peerTable1 + " in the peer cluster", countTable > 0); + + // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it + // Be cautious in how quickly we assert that the data is present on the peer + for (int i = 0; i < 10; i++) { + countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable2)); + } + + log.info("Found {} records in {}", countTable, peerTable2); + + if (0l == countTable) { + Thread.sleep(5000); + } else { + break; + } + } + + Assert.assertTrue("Found no records in " + peerTable2 + " in the peer cluster", countTable > 0); + + } finally { + peer1Cluster.stop(); + } + } +}