http://git-wip-us.apache.org/repos/asf/accumulo/blob/3579b674/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java 
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
new file mode 100644
index 0000000..d592716
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * 
+ */
+public class ReplicationIT extends ConfigurableMacIT {
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Test//(timeout = 60 * 1000)
+  public void test() throws Exception {
+    MiniAccumuloConfigImpl peerCfg = new 
MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + 
this.testName.getMethodName() + "_peer"),
+        ROOT_PASSWORD);
+    peerCfg.setNumTservers(1);
+    peerCfg.setInstanceName("peer");
+    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10002");
+    MiniAccumuloClusterImpl peerCluster = peerCfg.build();
+
+    peerCluster.start();
+
+    Connector connMaster = getConnector();
+    Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
+
+    String peerClusterName = "peer";
+
+    // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+    
connMaster.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() 
+ peerClusterName,
+        AccumuloReplicaSystem.class.getName() + "," + 
peerCluster.getInstanceName() + "," + peerCluster.getZooKeepers());
+
+    String masterTable = "master", peerTable = "peer";
+
+    connMaster.tableOperations().create(masterTable);
+    String masterTableId = 
connMaster.tableOperations().tableIdMap().get(masterTable);
+    Assert.assertNotNull(masterTableId);
+
+    connPeer.tableOperations().create(peerTable);
+    String peerTableId = 
connPeer.tableOperations().tableIdMap().get(peerTable);
+    Assert.assertNotNull(peerTableId);
+
+    // Replicate this table to the peerClusterName in a table with the 
peerTableId table id 
+    connMaster.tableOperations().setProperty(masterTable, 
Property.TABLE_REPLICATION.getKey(), "true");
+    connMaster.tableOperations().setProperty(masterTable, 
Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
+
+    // Write some data to table1
+    BatchWriter bw = connMaster.createBatchWriter(masterTable, new 
BatchWriterConfig());
+    for (int rows = 0; rows < 250; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
+      Thread.sleep(500);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      
+      Scanner s = ReplicationTable.getScanner(connMaster);
+      for (Entry<Key,Value> e : s) {
+        System.out.println(e.getKey().toStringNoTruncate() + " " + 
TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
+      }
+
+      System.out.println();
+      System.out.println();
+
+      Thread.sleep(1000);
+    }
+
+    peerCluster.stop();
+  }
+
+}

Reply via email to