http://git-wip-us.apache.org/repos/asf/accumulo/blob/74e7103f/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java index 59bddc3,0000000..002ac97 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java @@@ -1,330 -1,0 +1,330 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.LongCombiner.Type; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.minicluster.impl.ZooKeeperBindException; +import org.apache.accumulo.test.functional.AbstractMacIT; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +/** + * + */ +public class CyclicReplicationIT { + private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class); + + @Rule + public Timeout getTimeout() { + int scalingFactor = 1; + try { + scalingFactor = Integer.parseInt(System.getProperty("timeout.factor")); + } catch (NumberFormatException exception) { + log.warn("Could not parse timeout.factor, not scaling timeout"); + } + + return new Timeout(scalingFactor * 5 * 60 * 1000); + } + + @Rule + public TestName testName = new TestName(); + + private File createTestDir(String name) { + File baseDir = new File(System.getProperty("user.dir") + "/target/mini-tests"); + baseDir.mkdirs(); + File testDir = new File(baseDir, this.getClass().getName() + "_" + testName.getMethodName() + "_" + name); + FileUtils.deleteQuietly(testDir); + testDir.mkdir(); + return testDir; + } + + private void setCoreSite(MiniAccumuloClusterImpl cluster) throws Exception { + File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml"); + if (csFile.exists()) + throw new RuntimeException(csFile + " already exist"); + + Configuration coreSite = new Configuration(false); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + OutputStream out = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "core-site.xml"))); + coreSite.writeXml(out); + out.close(); + } + + /** + * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication + */ + private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { + // Set the same SSL information from the primary when present + Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); + if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + Map<String,String> peerSiteConfig = new HashMap<String,String>(); + peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); + Assert.assertNotNull("Keystore Path was null", keystorePath); + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); + String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); + Assert.assertNotNull("Truststore Path was null", truststorePath); + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); + + // Passwords might be stored in CredentialProvider + String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); + if (null != keystorePassword) { + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); + } + String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); + if (null != truststorePassword) { + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + } + + System.out.println("Setting site configuration for peer " + peerSiteConfig); + peerCfg.setSiteConfig(peerSiteConfig); + } + + // Use the CredentialProvider if the primary also uses one + String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProvider) { + Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); + peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); + peerCfg.setSiteConfig(peerSiteConfig); + } + } + + @Test + public void dataIsNotOverReplicated() throws Exception { + File master1Dir = createTestDir("master1"), master2Dir = createTestDir("master2"); + String password = "password"; + + MiniAccumuloConfigImpl master1Cfg; + MiniAccumuloClusterImpl master1Cluster; + while (true) { + master1Cfg= new MiniAccumuloConfigImpl(master1Dir, password); + master1Cfg.setNumTservers(1); + master1Cfg.setInstanceName("master1"); + + // Set up SSL if needed - AbstractMacIT.configureForEnvironment(master1Cfg, AbstractMacIT.createSharedTestDir(this.getClass().getName() + "-ssl")); ++ AbstractMacIT.configureForEnvironment(master1Cfg, this.getClass(), AbstractMacIT.createSharedTestDir(this.getClass().getName() + "-ssl")); + + master1Cfg.setProperty(Property.REPLICATION_NAME, master1Cfg.getInstanceName()); + master1Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + master1Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); + master1Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + master1Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); - master1Cluster = master1Cfg.build(); ++ master1Cluster = new MiniAccumuloClusterImpl(master1Cfg); + setCoreSite(master1Cluster); + + try { + master1Cluster.start(); + break; + } catch (ZooKeeperBindException e) { + log.warn("Failed to start ZooKeeper on " + master1Cfg.getZooKeeperPort() + ", will retry"); + } + } + + MiniAccumuloConfigImpl master2Cfg; + MiniAccumuloClusterImpl master2Cluster; + while (true) { + master2Cfg = new MiniAccumuloConfigImpl(master2Dir, password); + master2Cfg.setNumTservers(1); + master2Cfg.setInstanceName("master2"); + + // Set up SSL if needed. Need to share the same SSL truststore as master1 + this.updatePeerConfigFromPrimary(master1Cfg, master2Cfg); + + master2Cfg.setProperty(Property.REPLICATION_NAME, master2Cfg.getInstanceName()); + master2Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + master2Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); + master2Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + master2Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); - master2Cluster = master2Cfg.build(); ++ master2Cluster = new MiniAccumuloClusterImpl(master2Cfg); + setCoreSite(master2Cluster); + + try { + master2Cluster.start(); + break; + } catch (ZooKeeperBindException e) { + log.warn("Failed to start ZooKeeper on " + master2Cfg.getZooKeeperPort() + ", will retry"); + } + } + + try { + Connector connMaster1 = master1Cluster.getConnector("root", password), connMaster2 = master2Cluster.getConnector("root", password); + + String master1UserName = "master1", master1Password = "foo"; + String master2UserName = "master2", master2Password = "bar"; + String master1Table = master1Cluster.getInstanceName(), master2Table = master2Cluster.getInstanceName(); + + connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password)); + connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password)); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master2Cluster.getInstanceName(), master2UserName); + connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master2Cluster.getInstanceName(), master2Password); + + connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master1Cluster.getInstanceName(), master1UserName); + connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master1Cluster.getInstanceName(), master1Password); + + connMaster1.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + master2Cluster.getInstanceName(), + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(master2Cluster.getInstanceName(), master2Cluster.getZooKeepers()))); + + connMaster2.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + master1Cluster.getInstanceName(), + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), master1Cluster.getZooKeepers()))); + + connMaster1.tableOperations().create(master1Table, false); + String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Table); + Assert.assertNotNull(master1TableId); + + connMaster2.tableOperations().create(master2Table, false); + String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Table); + Assert.assertNotNull(master2TableId); + + // Replicate master1 in the master1 cluster to master2 in the master2 cluster + connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster1.tableOperations().setProperty(master1Table, + Property.TABLE_REPLICATION_TARGET.getKey() + master2Cluster.getInstanceName(), master2TableId); + + // Replicate master2 in the master2 cluster to master1 in the master2 cluster + connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster2.tableOperations().setProperty(master2Table, + Property.TABLE_REPLICATION_TARGET.getKey() + master1Cluster.getInstanceName(), master1TableId); + + // Give our replication user the ability to write to the respective table + connMaster1.securityOperations().grantTablePermission(master1UserName, master1Table, TablePermission.WRITE); + connMaster2.securityOperations().grantTablePermission(master2UserName, master2Table, TablePermission.WRITE); + + IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class); + SummingCombiner.setEncodingType(summingCombiner, Type.STRING); + SummingCombiner.setCombineAllColumns(summingCombiner, true); + + // Set a combiner on both instances that will sum multiple values + // We can use this to verify that the mutation was not sent multiple times + connMaster1.tableOperations().attachIterator(master1Table, summingCombiner); + connMaster2.tableOperations().attachIterator(master2Table, summingCombiner); + + // Write a single entry + BatchWriter bw = connMaster1.createBatchWriter(master1Table, new BatchWriterConfig()); + Mutation m = new Mutation("row"); + m.put("count", "", "1"); + bw.addMutation(m); + bw.close(); + + Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Table); + + log.info("Found {} that need replication from master1", files); + + // Kill and restart the tserver to close the WAL on master1 + for (ProcessReference proc : master1Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + master1Cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + master1Cluster.exec(TabletServer.class); + + log.info("Restarted tserver on master1"); + + // Try to avoid ACCUMULO-2964 + Thread.sleep(1000); + + // Sanity check that the element is there on master1 + Scanner s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + // Wait for this table to replicate + connMaster1.replicationOperations().drain(master1Table, files); + + Thread.sleep(5000); + + // Check that the element made it to master2 only once + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + // Wait for master2 to finish replicating it back + files = connMaster2.replicationOperations().referencedFiles(master2Table); + + // Kill and restart the tserver to close the WAL on master2 + for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + master2Cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + master2Cluster.exec(TabletServer.class); + + // Try to avoid ACCUMULO-2964 + Thread.sleep(1000); + + // Check that the element made it to master2 only once + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + connMaster2.replicationOperations().drain(master2Table, files); + + Thread.sleep(5000); + + // Verify that the entry wasn't sent back to master1 + s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + } finally { + master1Cluster.stop(); + master2Cluster.stop(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/74e7103f/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java index 6ce5959,0000000..e78ba13 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java @@@ -1,704 -1,0 +1,704 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.StatusUtil; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.master.replication.SequentialWorkAssigner; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Replication tests which start at least two MAC instances and replicate data between them + */ +public class MultiInstanceReplicationIT extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(MultiInstanceReplicationIT.class); + + private ExecutorService executor; + + @Override + public int defaultTimeoutSeconds() { + return 10 * 60; + } + + @Before + public void createExecutor() { + executor = Executors.newSingleThreadExecutor(); + } + + @After + public void stopExecutor() { + if (null != executor) { + executor.shutdownNow(); + } + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName()); + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + /** + * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication + */ + private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { + // Set the same SSL information from the primary when present + Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); + if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + Map<String,String> peerSiteConfig = new HashMap<String,String>(); + peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); + Assert.assertNotNull("Keystore Path was null", keystorePath); + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); + String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); + Assert.assertNotNull("Truststore Path was null", truststorePath); + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); + + // Passwords might be stored in CredentialProvider + String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); + if (null != keystorePassword) { + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); + } + String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); + if (null != truststorePassword) { + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + } + + System.out.println("Setting site configuration for peer " + peerSiteConfig); + peerCfg.setSiteConfig(peerSiteConfig); + } + + // Use the CredentialProvider if the primary also uses one + String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProvider) { + Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); + peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); + peerCfg.setSiteConfig(peerSiteConfig); + } + } + + @Test + public void dataWasReplicatedToThePeer() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + - MiniAccumuloClusterImpl peerCluster = peerCfg.build(); ++ MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg); + + peerCluster.start(); + + try { + final Connector connMaster = getConnector(); + final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); + + ReplicationTable.setOnline(connMaster); + + String peerUserName = "peer", peerPassword = "foo"; + + String peerClusterName = "peer"; + + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); + + final String masterTable = "master", peerTable = "peer"; + + connMaster.tableOperations().create(masterTable); + String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); + Assert.assertNotNull(masterTableId); + + connPeer.tableOperations().create(peerTable); + String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); + Assert.assertNotNull(peerTableId); + + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.exec(TabletServer.class); + + log.info("TabletServer restarted"); + for (@SuppressWarnings("unused") + Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {} + log.info("TabletServer is online"); + + log.info(""); + log.info("Fetching metadata records:"); + for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } else { + log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); + } + } + + log.info(""); + log.info("Fetching replication records:"); + for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + Future<Boolean> future = executor.submit(new Callable<Boolean>() { + + @Override + public Boolean call() throws Exception { + connMaster.replicationOperations().drain(masterTable, filesNeedingReplication); + log.info("Drain completed"); + return true; + } + + }); + + try { + future.get(60, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + Assert.fail("Drain did not finish within 60 seconds"); + } finally { + executor.shutdownNow(); + } + + log.info("drain completed"); + + log.info(""); + log.info("Fetching metadata records:"); + for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } else { + log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); + } + } + + log.info(""); + log.info("Fetching replication records:"); + for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); + Entry<Key,Value> masterEntry = null, peerEntry = null; + while (masterIter.hasNext() && peerIter.hasNext()) { + masterEntry = masterIter.next(); + peerEntry = peerIter.next(); + Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, + masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); + Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); + } + + log.info("Last master entry: " + masterEntry); + log.info("Last peer entry: " + peerEntry); + + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); + Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); + } finally { + peerCluster.stop(); + } + } + + @Test + public void dataReplicatedToCorrectTable() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + - MiniAccumuloClusterImpl peer1Cluster = peerCfg.build(); ++ MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg); + + peer1Cluster.start(); + + try { + Connector connMaster = getConnector(); + Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD); + + String peerClusterName = "peer"; + String peerUserName = "peer", peerPassword = "foo"; + + // Create local user + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); + + String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; + + // Create tables + connMaster.tableOperations().create(masterTable1); + String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); + Assert.assertNotNull(masterTableId1); + + connMaster.tableOperations().create(masterTable2); + String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); + Assert.assertNotNull(masterTableId2); + + connPeer.tableOperations().create(peerTable1); + String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + connPeer.tableOperations().create(peerTable2); + String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); + Assert.assertNotNull(peerTableId2); + + // Grant write permission + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1); + + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); + long masterTable1Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable1Records++; + } + bw.addMutation(m); + } + + bw.close(); + + // Write some data to table2 + bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); + long masterTable2Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable2 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable2Records++; + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles( + masterTable2); + + // Restart the tserver to force a close on the WAL + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.exec(TabletServer.class); + + log.info("Restarted the tserver"); + + // Read the data -- the tserver is back up and running + for (@SuppressWarnings("unused") + Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {} + + // Wait for both tables to be replicated + log.info("Waiting for {} for {}", filesFor1, masterTable1); + connMaster.replicationOperations().drain(masterTable1, filesFor1); + + log.info("Waiting for {} for {}", filesFor2, masterTable2); + connMaster.replicationOperations().drain(masterTable2, filesFor2); + + long countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable1)); + } + + log.info("Found {} records in {}", countTable, peerTable1); + Assert.assertEquals(masterTable1Records, countTable); + + countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable2)); + } + + log.info("Found {} records in {}", countTable, peerTable2); + Assert.assertEquals(masterTable2Records, countTable); + + } finally { + peer1Cluster.stop(); + } + } + + @Test + public void dataWasReplicatedToThePeerWithoutDrain() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + - MiniAccumuloClusterImpl peerCluster = peerCfg.build(); ++ MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg); + + peerCluster.start(); + + Connector connMaster = getConnector(); + Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); + + String peerUserName = "repl"; + String peerPassword = "passwd"; + + // Create a user on the peer for replication to use + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + String peerClusterName = "peer"; + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + String masterTable = "master", peerTable = "peer"; + + connMaster.tableOperations().create(masterTable); + String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); + Assert.assertNotNull(masterTableId); + + connPeer.tableOperations().create(peerTable); + String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); + Assert.assertNotNull(peerTableId); + + // Give our replication user the ability to write to the table + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + Set<String> files = connMaster.replicationOperations().referencedFiles(masterTable); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + cluster.exec(TabletServer.class); + + for (@SuppressWarnings("unused") + Entry<Key,Value> kv : connMaster.createScanner(masterTable, Authorizations.EMPTY)) {} + + for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { + log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + connMaster.replicationOperations().drain(masterTable, files); + + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); + while (masterIter.hasNext() && peerIter.hasNext()) { + Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next(); + Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, + masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); + Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); + } + + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); + Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); + + peerCluster.stop(); + } + + @Test + public void dataReplicatedToCorrectTableWithoutDrain() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + - MiniAccumuloClusterImpl peer1Cluster = peerCfg.build(); ++ MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg); + + peer1Cluster.start(); + + try { + Connector connMaster = getConnector(); + Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD); + + String peerClusterName = "peer"; + + String peerUserName = "repl"; + String peerPassword = "passwd"; + + // Create a user on the peer for replication to use + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); + + String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; + + connMaster.tableOperations().create(masterTable1); + String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); + Assert.assertNotNull(masterTableId1); + + connMaster.tableOperations().create(masterTable2); + String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); + Assert.assertNotNull(masterTableId2); + + connPeer.tableOperations().create(peerTable1); + String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + connPeer.tableOperations().create(peerTable2); + String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); + Assert.assertNotNull(peerTableId2); + + // Give our replication user the ability to write to the tables + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1); + + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + // Write some data to table2 + bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable2 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + cluster.exec(TabletServer.class); + + // Wait until we fully replicated something + boolean fullyReplicated = false; + for (int i = 0; i < 10 && !fullyReplicated; i++) { + UtilWaitThread.sleep(2000); + + Scanner s = ReplicationTable.getScanner(connMaster); + WorkSection.limit(s); + for (Entry<Key,Value> entry : s) { + Status status = Status.parseFrom(entry.getValue().get()); + if (StatusUtil.isFullyReplicated(status)) { + fullyReplicated |= true; + } + } + } + + Assert.assertNotEquals(0, fullyReplicated); + + // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it + // Be cautious in how quickly we assert that the data is present on the peer + long countTable = 0l; + for (int i = 0; i < 10; i++) { + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable1)); + } + + log.info("Found {} records in {}", countTable, peerTable1); + + if (0l == countTable) { + Thread.sleep(5000); + } else { + break; + } + } + + Assert.assertTrue("Found no records in " + peerTable1 + " in the peer cluster", countTable > 0); + + // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it + // Be cautious in how quickly we assert that the data is present on the peer + for (int i = 0; i < 10; i++) { + countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable2)); + } + + log.info("Found {} records in {}", countTable, peerTable2); + + if (0l == countTable) { + Thread.sleep(5000); + } else { + break; + } + } + + Assert.assertTrue("Found no records in " + peerTable2 + " in the peer cluster", countTable > 0); + + } finally { + peer1Cluster.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/74e7103f/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java index ad9c9fc,0000000..0e7de30 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java @@@ -1,726 -1,0 +1,726 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.StatusUtil; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.master.replication.UnorderedWorkAssigner; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(UnorderedWorkAssignerReplicationIT.class); + + private ExecutorService executor; + private int timeoutFactor = 1; + + @Before + public void createExecutor() { + executor = Executors.newSingleThreadExecutor(); + + try { + timeoutFactor = Integer.parseInt(System.getProperty("timeout.factor")); + } catch (NumberFormatException exception) { + log.warn("Could not parse timeout.factor, not increasing timeout."); + } + + Assert.assertTrue("The timeout factor must be a positive, non-zero value", timeoutFactor > 0); + } + + @After + public void stopExecutor() { + if (null != executor) { + executor.shutdownNow(); + } + } + + @Override + public int defaultTimeoutSeconds() { + return 60 * 5; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, UnorderedWorkAssigner.class.getName()); + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + /** + * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication + */ + private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { + // Set the same SSL information from the primary when present + Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); + if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + Map<String,String> peerSiteConfig = new HashMap<String,String>(); + peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); + Assert.assertNotNull("Keystore Path was null", keystorePath); + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); + String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); + Assert.assertNotNull("Truststore Path was null", truststorePath); + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); + + // Passwords might be stored in CredentialProvider + String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); + if (null != keystorePassword) { + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); + } + String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); + if (null != truststorePassword) { + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + } + + System.out.println("Setting site configuration for peer " + peerSiteConfig); + peerCfg.setSiteConfig(peerSiteConfig); + } + + // Use the CredentialProvider if the primary also uses one + String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProvider) { + Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); + peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); + peerCfg.setSiteConfig(peerSiteConfig); + } + } + + @Test + public void dataWasReplicatedToThePeer() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - MiniAccumuloClusterImpl peerCluster = peerCfg.build(); ++ MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg); + + peerCluster.start(); + + try { + final Connector connMaster = getConnector(); + final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); + + ReplicationTable.setOnline(connMaster); + + String peerUserName = "peer", peerPassword = "foo"; + + String peerClusterName = "peer"; + + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); + + final String masterTable = "master", peerTable = "peer"; + + connMaster.tableOperations().create(masterTable); + String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); + Assert.assertNotNull(masterTableId); + + connPeer.tableOperations().create(peerTable); + String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); + Assert.assertNotNull(peerTableId); + + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); + + // Wait for zookeeper updates (configuration) to propagate + UtilWaitThread.sleep(3 * 1000); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.exec(TabletServer.class); + + log.info("TabletServer restarted"); + for (@SuppressWarnings("unused") + Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {} + log.info("TabletServer is online"); + + log.info(""); + log.info("Fetching metadata records:"); + for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } else { + log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); + } + } + + log.info(""); + log.info("Fetching replication records:"); + for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + Future<Boolean> future = executor.submit(new Callable<Boolean>() { + + @Override + public Boolean call() throws Exception { + connMaster.replicationOperations().drain(masterTable, filesNeedingReplication); + log.info("Drain completed"); + return true; + } + + }); + + long timeoutSeconds = timeoutFactor * 30; + try { + future.get(timeoutSeconds, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + Assert.fail("Drain did not finish within " + timeoutSeconds + " seconds"); + } + + log.info("drain completed"); + + log.info(""); + log.info("Fetching metadata records:"); + for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } else { + log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); + } + } + + log.info(""); + log.info("Fetching replication records:"); + for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { + log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); + Entry<Key,Value> masterEntry = null, peerEntry = null; + while (masterIter.hasNext() && peerIter.hasNext()) { + masterEntry = masterIter.next(); + peerEntry = peerIter.next(); + Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, + masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); + Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); + } + + log.info("Last master entry: " + masterEntry); + log.info("Last peer entry: " + peerEntry); + + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); + Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); + } finally { + peerCluster.stop(); + } + } + + @Test + public void dataReplicatedToCorrectTable() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - MiniAccumuloClusterImpl peer1Cluster = peerCfg.build(); ++ MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg); + + peer1Cluster.start(); + + try { + Connector connMaster = getConnector(); + Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD); + + String peerClusterName = "peer"; + String peerUserName = "peer", peerPassword = "foo"; + + // Create local user + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); + + String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; + + // Create tables + connMaster.tableOperations().create(masterTable1); + String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); + Assert.assertNotNull(masterTableId1); + + connMaster.tableOperations().create(masterTable2); + String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); + Assert.assertNotNull(masterTableId2); + + connPeer.tableOperations().create(peerTable1); + String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + connPeer.tableOperations().create(peerTable2); + String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); + Assert.assertNotNull(peerTableId2); + + // Grant write permission + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1); + + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2); + + // Wait for zookeeper updates (configuration) to propogate + UtilWaitThread.sleep(3 * 1000); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); + long masterTable1Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable1Records++; + } + bw.addMutation(m); + } + + bw.close(); + + // Write some data to table2 + bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); + long masterTable2Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable2 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable2Records++; + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles( + masterTable2); + + while (!ReplicationTable.isOnline(connMaster)) { + Thread.sleep(500); + } + + // Restart the tserver to force a close on the WAL + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.exec(TabletServer.class); + + log.info("Restarted the tserver"); + + // Read the data -- the tserver is back up and running + for (@SuppressWarnings("unused") + Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {} + + // Wait for both tables to be replicated + log.info("Waiting for {} for {}", filesFor1, masterTable1); + connMaster.replicationOperations().drain(masterTable1, filesFor1); + + log.info("Waiting for {} for {}", filesFor2, masterTable2); + connMaster.replicationOperations().drain(masterTable2, filesFor2); + + long countTable = 0l; + for (int i = 0; i < 5; i++) { + countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable1)); + } + + log.info("Found {} records in {}", countTable, peerTable1); + + if (masterTable1Records != countTable) { + log.warn("Did not find {} expected records in {}, only found {}", masterTable1Records, peerTable1, countTable); + } + } + + Assert.assertEquals(masterTable1Records, countTable); + + for (int i = 0; i < 5; i++) { + countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable2)); + } + + log.info("Found {} records in {}", countTable, peerTable2); + + if (masterTable2Records != countTable) { + log.warn("Did not find {} expected records in {}, only found {}", masterTable2Records, peerTable2, countTable); + } + } + + Assert.assertEquals(masterTable2Records, countTable); + + } finally { + peer1Cluster.stop(); + } + } + + @Test + public void dataWasReplicatedToThePeerWithoutDrain() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - MiniAccumuloClusterImpl peerCluster = peerCfg.build(); ++ MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg); + + peerCluster.start(); + + Connector connMaster = getConnector(); + Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); + + String peerUserName = "repl"; + String peerPassword = "passwd"; + + // Create a user on the peer for replication to use + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + String peerClusterName = "peer"; + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + String masterTable = "master", peerTable = "peer"; + + connMaster.tableOperations().create(masterTable); + String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); + Assert.assertNotNull(masterTableId); + + connPeer.tableOperations().create(peerTable); + String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); + Assert.assertNotNull(peerTableId); + + // Give our replication user the ability to write to the table + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + Set<String> files = connMaster.replicationOperations().referencedFiles(masterTable); + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + cluster.exec(TabletServer.class); + + for (@SuppressWarnings("unused") + Entry<Key,Value> kv : connMaster.createScanner(masterTable, Authorizations.EMPTY)) {} + + for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { + log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); + } + + connMaster.replicationOperations().drain(masterTable, files); + + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); + while (masterIter.hasNext() && peerIter.hasNext()) { + Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next(); + Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, + masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); + Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); + } + + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); + Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); + + peerCluster.stop(); + } + + @Test + public void dataReplicatedToCorrectTableWithoutDrain() throws Exception { + MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), + ROOT_PASSWORD); + peerCfg.setNumTservers(1); + peerCfg.setInstanceName("peer"); + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - MiniAccumuloClusterImpl peer1Cluster = peerCfg.build(); ++ MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg); + + peer1Cluster.start(); + + try { + Connector connMaster = getConnector(); + Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD); + + String peerClusterName = "peer"; + + String peerUserName = "repl"; + String peerPassword = "passwd"; + + // Create a user on the peer for replication to use + connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + connMaster.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + peerClusterName, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); + + String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; + + connMaster.tableOperations().create(masterTable1); + String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); + Assert.assertNotNull(masterTableId1); + + connMaster.tableOperations().create(masterTable2); + String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); + Assert.assertNotNull(masterTableId2); + + connPeer.tableOperations().create(peerTable1); + String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + connPeer.tableOperations().create(peerTable2); + String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); + Assert.assertNotNull(peerTableId2); + + // Give our replication user the ability to write to the tables + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); + connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1); + + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2); + + // Wait for zookeeper updates (configuration) to propagate + UtilWaitThread.sleep(3 * 1000); + + // Write some data to table1 + BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + // Write some data to table2 + bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(masterTable2 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to master cluster"); + + while (!ReplicationTable.isOnline(connMaster)) { + Thread.sleep(500); + } + + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + cluster.exec(TabletServer.class); + + // Wait until we fully replicated something + boolean fullyReplicated = false; + for (int i = 0; i < 10 && !fullyReplicated; i++) { + UtilWaitThread.sleep(timeoutFactor * 2000); + + Scanner s = ReplicationTable.getScanner(connMaster); + WorkSection.limit(s); + for (Entry<Key,Value> entry : s) { + Status status = Status.parseFrom(entry.getValue().get()); + if (StatusUtil.isFullyReplicated(status)) { + fullyReplicated |= true; + } + } + } + + Assert.assertNotEquals(0, fullyReplicated); + + long countTable = 0l; + + // Check a few times + for (int i = 0; i < 10; i++) { + countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable1)); + } + log.info("Found {} records in {}", countTable, peerTable1); + if (0 < countTable) { + break; + } + Thread.sleep(2000); + } + + Assert.assertTrue("Did not find any records in " + peerTable1 + " on peer", countTable > 0); + + for (int i = 0; i < 10; i++) { + countTable = 0l; + for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(masterTable2)); + } + + log.info("Found {} records in {}", countTable, peerTable2); + if (0 < countTable) { + break; + } + Thread.sleep(2000); + } + Assert.assertTrue("Did not find any records in " + peerTable2 + " on peer", countTable > 0); + + } finally { + peer1Cluster.stop(); + } + } +}
