http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TBinaryProxyIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TBinaryProxyIT.java 
b/test/src/main/java/org/apache/accumulo/test/proxy/TBinaryProxyIT.java
new file mode 100644
index 0000000..6359d1e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TBinaryProxyIT.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.BeforeClass;
+
+/**
+ *
+ */
+public class TBinaryProxyIT extends SimpleProxyBase {
+
+  @BeforeClass
+  public static void setProtocol() throws Exception {
+    SimpleProxyBase.factory = new TBinaryProtocol.Factory();
+    setUpProxy();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java 
b/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java
new file mode 100644
index 0000000..a92414a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.junit.BeforeClass;
+
+/**
+ *
+ */
+public class TCompactProxyIT extends SimpleProxyBase {
+
+  @BeforeClass
+  public static void setProtocol() throws Exception {
+    SimpleProxyBase.factory = new TCompactProtocol.Factory();
+    setUpProxy();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TJsonProtocolProxyIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TJsonProtocolProxyIT.java 
b/test/src/main/java/org/apache/accumulo/test/proxy/TJsonProtocolProxyIT.java
new file mode 100644
index 0000000..5fcbf53
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/proxy/TJsonProtocolProxyIT.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.junit.BeforeClass;
+
+/**
+ *
+ */
+public class TJsonProtocolProxyIT extends SimpleProxyBase {
+
+  @BeforeClass
+  public static void setProtocol() throws Exception {
+    SimpleProxyBase.factory = new TJSONProtocol.Factory();
+    setUpProxy();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TTupleProxyIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TTupleProxyIT.java 
b/test/src/main/java/org/apache/accumulo/test/proxy/TTupleProxyIT.java
new file mode 100644
index 0000000..cdecf2c
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TTupleProxyIT.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.junit.BeforeClass;
+
+/**
+ *
+ */
+public class TTupleProxyIT extends SimpleProxyBase {
+
+  @BeforeClass
+  public static void setProtocol() throws Exception {
+    SimpleProxyBase.factory = new TTupleProtocol.Factory();
+    setUpProxy();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyClient.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyClient.java 
b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyClient.java
new file mode 100644
index 0000000..ff92795
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyClient.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslException;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.rpc.UGIAssumingTransport;
+import org.apache.accumulo.proxy.Util;
+import org.apache.accumulo.proxy.thrift.AccumuloProxy;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.Key;
+import org.apache.accumulo.proxy.thrift.ScanResult;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class TestProxyClient {
+
+  protected AccumuloProxy.Client proxy;
+  protected TTransport transport;
+
+  public TestProxyClient(String host, int port) throws TTransportException {
+    this(host, port, new TCompactProtocol.Factory());
+  }
+
+  public TestProxyClient(String host, int port, TProtocolFactory protoFactory) 
throws TTransportException {
+    final TSocket socket = new TSocket(host, port);
+    socket.setTimeout(600000);
+    transport = new TFramedTransport(socket);
+    final TProtocol protocol = protoFactory.getProtocol(transport);
+    proxy = new AccumuloProxy.Client(protocol);
+    transport.open();
+  }
+
+  public TestProxyClient(String host, int port, TProtocolFactory protoFactory, 
String proxyPrimary, UserGroupInformation ugi) throws SaslException,
+      TTransportException {
+    TSocket socket = new TSocket(host, port);
+    TSaslClientTransport saslTransport = new TSaslClientTransport("GSSAPI", 
null, proxyPrimary, host, Collections.singletonMap("javax.security.sasl.qop",
+        "auth"), null, socket);
+
+    transport = new UGIAssumingTransport(saslTransport, ugi);
+
+    // UGI transport will perform the doAs for us
+    transport.open();
+
+    AccumuloProxy.Client.Factory factory = new AccumuloProxy.Client.Factory();
+    final TProtocol protocol = protoFactory.getProtocol(transport);
+    proxy = factory.getClient(protocol);
+  }
+
+  public synchronized void close() {
+    if (null != transport) {
+      transport.close();
+      transport = null;
+    }
+  }
+
+  public AccumuloProxy.Client proxy() {
+    return proxy;
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    TestProxyClient tpc = new TestProxyClient("localhost", 42424);
+    String principal = "root";
+    Map<String,String> props = new TreeMap<String,String>();
+    props.put("password", "secret");
+
+    System.out.println("Logging in");
+    ByteBuffer login = tpc.proxy.login(principal, props);
+
+    System.out.println("Creating user: ");
+    if (!tpc.proxy().listLocalUsers(login).contains("testuser")) {
+      tpc.proxy().createLocalUser(login, "testuser", 
ByteBuffer.wrap("testpass".getBytes(UTF_8)));
+    }
+    System.out.println("UserList: " + tpc.proxy().listLocalUsers(login));
+
+    System.out.println("Listing: " + tpc.proxy().listTables(login));
+
+    System.out.println("Deleting: ");
+    String testTable = "testtableOMGOMGOMG";
+
+    System.out.println("Creating: ");
+
+    if (tpc.proxy().tableExists(login, testTable))
+      tpc.proxy().deleteTable(login, testTable);
+
+    tpc.proxy().createTable(login, testTable, true, TimeType.MILLIS);
+
+    System.out.println("Listing: " + tpc.proxy().listTables(login));
+
+    System.out.println("Writing: ");
+    Date start = new Date();
+    Date then = new Date();
+    int maxInserts = 1000000;
+    String format = "%1$05d";
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    for (int i = 0; i < maxInserts; i++) {
+      String result = String.format(format, i);
+      ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(("cf" + 
i).getBytes(UTF_8)), ByteBuffer.wrap(("cq" + i).getBytes(UTF_8)));
+      update.setValue(Util.randStringBuffer(10));
+      mutations.put(ByteBuffer.wrap(result.getBytes(UTF_8)), 
Collections.singletonList(update));
+
+      if (i % 1000 == 0) {
+        tpc.proxy().updateAndFlush(login, testTable, mutations);
+        mutations.clear();
+      }
+    }
+    tpc.proxy().updateAndFlush(login, testTable, mutations);
+    Date end = new Date();
+    System.out.println(" End of writing: " + (end.getTime() - 
start.getTime()));
+
+    tpc.proxy().deleteTable(login, testTable);
+    tpc.proxy().createTable(login, testTable, true, TimeType.MILLIS);
+
+    // Thread.sleep(1000);
+
+    System.out.println("Writing async: ");
+    start = new Date();
+    then = new Date();
+    mutations.clear();
+    String writer = tpc.proxy().createWriter(login, testTable, null);
+    for (int i = 0; i < maxInserts; i++) {
+      String result = String.format(format, i);
+      Key pkey = new Key();
+      pkey.setRow(result.getBytes(UTF_8));
+      ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(("cf" + 
i).getBytes(UTF_8)), ByteBuffer.wrap(("cq" + i).getBytes(UTF_8)));
+      update.setValue(Util.randStringBuffer(10));
+      mutations.put(ByteBuffer.wrap(result.getBytes(UTF_8)), 
Collections.singletonList(update));
+      tpc.proxy().update(writer, mutations);
+      mutations.clear();
+    }
+
+    end = new Date();
+    System.out.println(" End of writing: " + (end.getTime() - 
start.getTime()));
+    start = end;
+    System.out.println("Closing...");
+    tpc.proxy().closeWriter(writer);
+    end = new Date();
+    System.out.println(" End of closing: " + (end.getTime() - 
start.getTime()));
+
+    System.out.println("Reading: ");
+
+    String regex = "cf1.*";
+
+    IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
+    RegExFilter.setRegexs(is, null, regex, null, null, false);
+
+    String cookie = tpc.proxy().createScanner(login, testTable, null);
+
+    int i = 0;
+    start = new Date();
+    then = new Date();
+    boolean hasNext = true;
+
+    int k = 1000;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+
+      Date now = new Date();
+      System.out.println(i + " " + (now.getTime() - then.getTime()));
+      then = now;
+
+      i += kvList.getResultsSize();
+      // for (TKeyValue kv:kvList.getResults()) System.out.println(new 
Key(kv.getKey()));
+      hasNext = kvList.isMore();
+    }
+    end = new Date();
+    System.out.println("Total entries: " + i + " total time " + (end.getTime() 
- start.getTime()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java
 
b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java
new file mode 100644
index 0000000..ff94dd4
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.net.HostAndPort;
+
+public class TestProxyInstanceOperations {
+  private static final Logger log = 
LoggerFactory.getLogger(TestProxyInstanceOperations.class);
+
+  protected static TServer proxy;
+  protected static TestProxyClient tpc;
+  protected static ByteBuffer userpass;
+  protected static final int port = 10197;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    Properties prop = new Properties();
+    prop.setProperty("useMockInstance", "true");
+    prop.put("tokenClass", PasswordToken.class.getName());
+
+    proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), 
new TCompactProtocol.Factory(), prop).server;
+    log.info("Waiting for proxy to start");
+    while (!proxy.isServing()) {
+      Thread.sleep(500);
+    }
+    log.info("Proxy started");
+    tpc = new TestProxyClient("localhost", port);
+    userpass = tpc.proxy.login("root", Collections.singletonMap("password", 
""));
+  }
+
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
+    proxy.stop();
+  }
+
+  @Test
+  public void properties() throws TException {
+    tpc.proxy().setProperty(userpass, "test.systemprop", "whistletips");
+
+    
assertEquals(tpc.proxy().getSystemConfiguration(userpass).get("test.systemprop"),
 "whistletips");
+    tpc.proxy().removeProperty(userpass, "test.systemprop");
+    
assertNull(tpc.proxy().getSystemConfiguration(userpass).get("test.systemprop"));
+
+  }
+
+  @Test
+  public void testClassLoad() throws TException {
+    assertTrue(tpc.proxy().testClassLoad(userpass, 
"org.apache.accumulo.core.iterators.user.RegExFilter", 
"org.apache.accumulo.core.iterators.Filter"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java 
b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java
new file mode 100644
index 0000000..1a75fea
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.Util;
+import org.apache.accumulo.proxy.thrift.BatchScanOptions;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.IteratorSetting;
+import org.apache.accumulo.proxy.thrift.Key;
+import org.apache.accumulo.proxy.thrift.KeyValue;
+import org.apache.accumulo.proxy.thrift.Range;
+import org.apache.accumulo.proxy.thrift.ScanColumn;
+import org.apache.accumulo.proxy.thrift.ScanOptions;
+import org.apache.accumulo.proxy.thrift.ScanResult;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class TestProxyReadWrite {
+  protected static TServer proxy;
+  protected static TestProxyClient tpc;
+  protected static ByteBuffer userpass;
+  protected static final int port = 10194;
+  protected static final String testtable = "testtable";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    Properties prop = new Properties();
+    prop.setProperty("useMockInstance", "true");
+    prop.put("tokenClass", PasswordToken.class.getName());
+
+    proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), 
new TCompactProtocol.Factory(), prop).server;
+    tpc = new TestProxyClient("localhost", port);
+    userpass = tpc.proxy().login("root", Collections.singletonMap("password", 
""));
+  }
+
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
+    proxy.stop();
+  }
+
+  @Before
+  public void makeTestTable() throws Exception {
+    tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
+  }
+
+  @After
+  public void deleteTestTable() throws Exception {
+    tpc.proxy().deleteTable(userpass, testtable);
+  }
+
+  private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> 
mutations, String row, String cf, String cq, String value) {
+    ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), 
ByteBuffer.wrap(cq.getBytes()));
+    update.setValue(value.getBytes());
+    mutations.put(ByteBuffer.wrap(row.getBytes()), 
Collections.singletonList(update));
+  }
+
+  private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> 
mutations, String row, String cf, String cq, String vis, String value) {
+    ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), 
ByteBuffer.wrap(cq.getBytes()));
+    update.setValue(value.getBytes());
+    update.setColVisibility(vis.getBytes());
+    mutations.put(ByteBuffer.wrap(row.getBytes()), 
Collections.singletonList(update));
+  }
+
+  /**
+   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). 
Set a range so only the entries between -Inf...5 come back (there should be
+   * 50,000)
+   */
+  @Test
+  public void readWriteBatchOneShotWithRange() throws Exception {
+    int maxInserts = 100000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$05d";
+    for (int i = 0; i < maxInserts; i++) {
+      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, 
Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+        mutations.clear();
+      }
+    }
+
+    Key stop = new Key();
+    stop.setRow("5".getBytes());
+    BatchScanOptions options = new BatchScanOptions();
+    options.ranges = Collections.singletonList(new Range(null, false, stop, 
false));
+    String cookie = tpc.proxy().createBatchScanner(userpass, testtable, 
options);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      i += kvList.getResultsSize();
+      hasNext = kvList.isMore();
+    }
+    assertEquals(i, 50000);
+  }
+
+  /**
+   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). 
Set a columnFamily so only the entries with specified column family come back
+   * (there should be 50,000)
+   */
+  @Test
+  public void readWriteBatchOneShotWithColumnFamilyOnly() throws Exception {
+    int maxInserts = 100000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$05d";
+    for (int i = 0; i < maxInserts; i++) {
+
+      addMutation(mutations, String.format(format, i), "cf" + (i % 2), "cq" + 
(i % 2), Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+        mutations.clear();
+      }
+    }
+
+    BatchScanOptions options = new BatchScanOptions();
+
+    ScanColumn sc = new ScanColumn();
+    sc.colFamily = ByteBuffer.wrap("cf0".getBytes());
+
+    options.columns = Collections.singletonList(sc);
+    String cookie = tpc.proxy().createBatchScanner(userpass, testtable, 
options);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      i += kvList.getResultsSize();
+      hasNext = kvList.isMore();
+    }
+    assertEquals(i, 50000);
+  }
+
+  /**
+   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). 
Set a columnFamily + columnQualififer so only the entries with specified column
+   * come back (there should be 50,000)
+   */
+  @Test
+  public void readWriteBatchOneShotWithFullColumn() throws Exception {
+    int maxInserts = 100000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$05d";
+    for (int i = 0; i < maxInserts; i++) {
+
+      addMutation(mutations, String.format(format, i), "cf" + (i % 2), "cq" + 
(i % 2), Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+        mutations.clear();
+      }
+    }
+
+    BatchScanOptions options = new BatchScanOptions();
+
+    ScanColumn sc = new ScanColumn();
+    sc.colFamily = ByteBuffer.wrap("cf0".getBytes());
+    sc.colQualifier = ByteBuffer.wrap("cq0".getBytes());
+
+    options.columns = Collections.singletonList(sc);
+    String cookie = tpc.proxy().createBatchScanner(userpass, testtable, 
options);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      i += kvList.getResultsSize();
+      hasNext = kvList.isMore();
+    }
+    assertEquals(i, 50000);
+  }
+
+  /**
+   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). 
Filter the results so only the even numbers come back.
+   */
+  @Test
+  public void readWriteBatchOneShotWithFilterIterator() throws Exception {
+    int maxInserts = 10000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$05d";
+    for (int i = 0; i < maxInserts; i++) {
+      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, 
Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+        mutations.clear();
+      }
+
+    }
+
+    String regex = ".*[02468]";
+
+    org.apache.accumulo.core.client.IteratorSetting is = new 
org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+    RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+    IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+    ScanOptions opts = new ScanOptions();
+    opts.iterators = Collections.singletonList(pis);
+    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      for (KeyValue kv : kvList.getResults()) {
+        assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+
+        i += 2;
+      }
+      hasNext = kvList.isMore();
+    }
+  }
+
+  @Test
+  public void readWriteOneShotWithRange() throws Exception {
+    int maxInserts = 100000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$05d";
+    for (int i = 0; i < maxInserts; i++) {
+      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, 
Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+        mutations.clear();
+      }
+    }
+
+    Key stop = new Key();
+    stop.setRow("5".getBytes());
+    ScanOptions opts = new ScanOptions();
+    opts.range = new Range(null, false, stop, false);
+    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      i += kvList.getResultsSize();
+      hasNext = kvList.isMore();
+    }
+    assertEquals(i, 50000);
+  }
+
+  /**
+   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). 
Filter the results so only the even numbers come back.
+   */
+  @Test
+  public void readWriteOneShotWithFilterIterator() throws Exception {
+    int maxInserts = 10000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$05d";
+    for (int i = 0; i < maxInserts; i++) {
+      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, 
Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+
+        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+        mutations.clear();
+
+      }
+
+    }
+
+    String regex = ".*[02468]";
+
+    org.apache.accumulo.core.client.IteratorSetting is = new 
org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+    RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+    IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+    ScanOptions opts = new ScanOptions();
+    opts.iterators = Collections.singletonList(pis);
+    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      for (KeyValue kv : kvList.getResults()) {
+        assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+
+        i += 2;
+      }
+      hasNext = kvList.isMore();
+    }
+  }
+
+  // @Test
+  // This test takes kind of a long time. Enable it if you think you may have 
memory issues.
+  public void manyWritesAndReads() throws Exception {
+    int maxInserts = 1000000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$06d";
+    String writer = tpc.proxy().createWriter(userpass, testtable, null);
+    for (int i = 0; i < maxInserts; i++) {
+      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, 
Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+
+        tpc.proxy().update(writer, mutations);
+        mutations.clear();
+
+      }
+
+    }
+
+    tpc.proxy().flush(writer);
+    tpc.proxy().closeWriter(writer);
+
+    String cookie = tpc.proxy().createScanner(userpass, testtable, null);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      for (KeyValue kv : kvList.getResults()) {
+        assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+        i++;
+      }
+      hasNext = kvList.isMore();
+      if (hasNext)
+        assertEquals(k, kvList.getResults().size());
+    }
+    assertEquals(maxInserts, i);
+  }
+
+  @Test
+  public void asynchReadWrite() throws Exception {
+    int maxInserts = 10000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$05d";
+    String writer = tpc.proxy().createWriter(userpass, testtable, null);
+    for (int i = 0; i < maxInserts; i++) {
+      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, 
Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+        tpc.proxy().update(writer, mutations);
+        mutations.clear();
+      }
+    }
+
+    tpc.proxy().flush(writer);
+    tpc.proxy().closeWriter(writer);
+
+    String regex = ".*[02468]";
+
+    org.apache.accumulo.core.client.IteratorSetting is = new 
org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+    RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+    IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+    ScanOptions opts = new ScanOptions();
+    opts.iterators = Collections.singletonList(pis);
+    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    int numRead = 0;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      for (KeyValue kv : kvList.getResults()) {
+        assertEquals(i, Integer.parseInt(new String(kv.getKey().getRow())));
+        numRead++;
+        i += 2;
+      }
+      hasNext = kvList.isMore();
+    }
+    assertEquals(maxInserts / 2, numRead);
+  }
+
+  @Test
+  public void testVisibility() throws Exception {
+
+    Set<ByteBuffer> auths = new HashSet<ByteBuffer>();
+    auths.add(ByteBuffer.wrap("even".getBytes()));
+    tpc.proxy().changeUserAuthorizations(userpass, "root", auths);
+
+    int maxInserts = 10000;
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    String format = "%1$05d";
+    String writer = tpc.proxy().createWriter(userpass, testtable, null);
+    for (int i = 0; i < maxInserts; i++) {
+      if (i % 2 == 0)
+        addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, 
"even", Util.randString(10));
+      else
+        addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, 
"odd", Util.randString(10));
+
+      if (i % 1000 == 0 || i == maxInserts - 1) {
+        tpc.proxy().update(writer, mutations);
+        mutations.clear();
+      }
+    }
+
+    tpc.proxy().flush(writer);
+    tpc.proxy().closeWriter(writer);
+    ScanOptions opts = new ScanOptions();
+    opts.authorizations = auths;
+    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+    int i = 0;
+    boolean hasNext = true;
+
+    int k = 1000;
+    int numRead = 0;
+    while (hasNext) {
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
+      for (KeyValue kv : kvList.getResults()) {
+        assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+        i += 2;
+        numRead++;
+      }
+      hasNext = kvList.isMore();
+
+    }
+    assertEquals(maxInserts / 2, numRead);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java
 
b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java
new file mode 100644
index 0000000..eda38e5
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.thrift.SystemPermission;
+import org.apache.accumulo.proxy.thrift.TablePermission;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class TestProxySecurityOperations {
+  protected static TServer proxy;
+  protected static TestProxyClient tpc;
+  protected static ByteBuffer userpass;
+  protected static final int port = 10196;
+  protected static final String testtable = "testtable";
+  protected static final String testuser = "VonJines";
+  protected static final ByteBuffer testpw = 
ByteBuffer.wrap("fiveones".getBytes());
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    Properties prop = new Properties();
+    prop.setProperty("useMockInstance", "true");
+    prop.put("tokenClass", PasswordToken.class.getName());
+
+    proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), 
new TCompactProtocol.Factory(), prop).server;
+    while (!proxy.isServing()) {
+      Thread.sleep(500);
+    }
+    tpc = new TestProxyClient("localhost", port);
+    userpass = tpc.proxy().login("root", Collections.singletonMap("password", 
""));
+  }
+
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
+    proxy.stop();
+  }
+
+  @Before
+  public void makeTestTableAndUser() throws Exception {
+    tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
+    tpc.proxy().createLocalUser(userpass, testuser, testpw);
+  }
+
+  @After
+  public void deleteTestTable() throws Exception {
+    tpc.proxy().deleteTable(userpass, testtable);
+    tpc.proxy().dropLocalUser(userpass, testuser);
+  }
+
+  @Test
+  public void create() throws TException {
+    tpc.proxy().createLocalUser(userpass, testuser + "2", testpw);
+    assertTrue(tpc.proxy().listLocalUsers(userpass).contains(testuser + "2"));
+    tpc.proxy().dropLocalUser(userpass, testuser + "2");
+    assertTrue(!tpc.proxy().listLocalUsers(userpass).contains(testuser + "2"));
+  }
+
+  @Test
+  public void authenticate() throws TException {
+    assertTrue(tpc.proxy().authenticateUser(userpass, testuser, 
bb2pp(testpw)));
+    assertFalse(tpc.proxy().authenticateUser(userpass, "EvilUser", 
bb2pp(testpw)));
+
+    tpc.proxy().changeLocalUserPassword(userpass, testuser, 
ByteBuffer.wrap("newpass".getBytes()));
+    assertFalse(tpc.proxy().authenticateUser(userpass, testuser, 
bb2pp(testpw)));
+    assertTrue(tpc.proxy().authenticateUser(userpass, testuser, 
bb2pp(ByteBuffer.wrap("newpass".getBytes()))));
+
+  }
+
+  @Test
+  public void tablePermissions() throws TException {
+    tpc.proxy().grantTablePermission(userpass, testuser, testtable, 
TablePermission.ALTER_TABLE);
+    assertTrue(tpc.proxy().hasTablePermission(userpass, testuser, testtable, 
TablePermission.ALTER_TABLE));
+
+    tpc.proxy().revokeTablePermission(userpass, testuser, testtable, 
TablePermission.ALTER_TABLE);
+    assertFalse(tpc.proxy().hasTablePermission(userpass, testuser, testtable, 
TablePermission.ALTER_TABLE));
+
+  }
+
+  @Test
+  public void systemPermissions() throws TException {
+    tpc.proxy().grantSystemPermission(userpass, testuser, 
SystemPermission.ALTER_USER);
+    assertTrue(tpc.proxy().hasSystemPermission(userpass, testuser, 
SystemPermission.ALTER_USER));
+
+    tpc.proxy().revokeSystemPermission(userpass, testuser, 
SystemPermission.ALTER_USER);
+    assertFalse(tpc.proxy().hasSystemPermission(userpass, testuser, 
SystemPermission.ALTER_USER));
+
+  }
+
+  @Test
+  public void auths() throws TException {
+    HashSet<ByteBuffer> newauths = new HashSet<ByteBuffer>();
+    newauths.add(ByteBuffer.wrap("BBR".getBytes()));
+    newauths.add(ByteBuffer.wrap("Barney".getBytes()));
+    tpc.proxy().changeUserAuthorizations(userpass, testuser, newauths);
+    List<ByteBuffer> actualauths = tpc.proxy().getUserAuthorizations(userpass, 
testuser);
+    assertEquals(actualauths.size(), newauths.size());
+
+    for (ByteBuffer auth : actualauths) {
+      assertTrue(newauths.contains(auth));
+    }
+  }
+
+  private Map<String,String> bb2pp(ByteBuffer cf) {
+    Map<String,String> toRet = new TreeMap<String,String>();
+    toRet.put("password", ByteBufferUtil.toString(cf));
+    return toRet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java
 
b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java
new file mode 100644
index 0000000..e8d7b1e
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class TestProxyTableOperations {
+
+  protected static TServer proxy;
+  protected static TestProxyClient tpc;
+  protected static ByteBuffer userpass;
+  protected static final int port = 10195;
+  protected static final String testtable = "testtable";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    Properties prop = new Properties();
+    prop.setProperty("useMockInstance", "true");
+    prop.put("tokenClass", PasswordToken.class.getName());
+
+    proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), 
new TCompactProtocol.Factory(), prop).server;
+    while (!proxy.isServing()) {
+      Thread.sleep(500);
+    }
+    tpc = new TestProxyClient("localhost", port);
+    userpass = tpc.proxy().login("root", Collections.singletonMap("password", 
""));
+  }
+
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
+    proxy.stop();
+  }
+
+  @Before
+  public void makeTestTable() throws Exception {
+    tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
+  }
+
+  @After
+  public void deleteTestTable() throws Exception {
+    tpc.proxy().deleteTable(userpass, testtable);
+  }
+
+  @Test
+  public void createExistsDelete() throws TException {
+    assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
+    tpc.proxy().createTable(userpass, "testtable2", true, TimeType.MILLIS);
+    assertTrue(tpc.proxy().tableExists(userpass, "testtable2"));
+    tpc.proxy().deleteTable(userpass, "testtable2");
+    assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
+  }
+
+  @Test
+  public void listRename() throws TException {
+    assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
+    tpc.proxy().renameTable(userpass, testtable, "testtable2");
+    assertTrue(tpc.proxy().tableExists(userpass, "testtable2"));
+    tpc.proxy().renameTable(userpass, "testtable2", testtable);
+    assertTrue(tpc.proxy().listTables(userpass).contains("testtable"));
+
+  }
+
+  // This test does not yet function because the backing Mock instance does 
not yet support merging
+  @Test
+  public void merge() throws TException {
+    Set<ByteBuffer> splits = new HashSet<ByteBuffer>();
+    splits.add(ByteBuffer.wrap("a".getBytes()));
+    splits.add(ByteBuffer.wrap("c".getBytes()));
+    splits.add(ByteBuffer.wrap("z".getBytes()));
+    tpc.proxy().addSplits(userpass, testtable, splits);
+
+    tpc.proxy().mergeTablets(userpass, testtable, 
ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap("d".getBytes()));
+
+    splits.remove(ByteBuffer.wrap("c".getBytes()));
+
+    List<ByteBuffer> tableSplits = tpc.proxy().listSplits(userpass, testtable, 
10);
+
+    for (ByteBuffer split : tableSplits)
+      assertTrue(splits.contains(split));
+    assertTrue(tableSplits.size() == splits.size());
+
+  }
+
+  @Test
+  public void splits() throws TException {
+    Set<ByteBuffer> splits = new HashSet<ByteBuffer>();
+    splits.add(ByteBuffer.wrap("a".getBytes()));
+    splits.add(ByteBuffer.wrap("b".getBytes()));
+    splits.add(ByteBuffer.wrap("z".getBytes()));
+    tpc.proxy().addSplits(userpass, testtable, splits);
+
+    List<ByteBuffer> tableSplits = tpc.proxy().listSplits(userpass, testtable, 
10);
+
+    for (ByteBuffer split : tableSplits)
+      assertTrue(splits.contains(split));
+    assertTrue(tableSplits.size() == splits.size());
+  }
+
+  @Test
+  public void constraints() throws TException {
+    int cid = tpc.proxy().addConstraint(userpass, testtable, 
"org.apache.accumulo.TestConstraint");
+    Map<String,Integer> constraints = tpc.proxy().listConstraints(userpass, 
testtable);
+    assertEquals((int) constraints.get("org.apache.accumulo.TestConstraint"), 
cid);
+    tpc.proxy().removeConstraint(userpass, testtable, cid);
+    constraints = tpc.proxy().listConstraints(userpass, testtable);
+    assertNull(constraints.get("org.apache.accumulo.TestConstraint"));
+  }
+
+  @Test
+  public void localityGroups() throws TException {
+    Map<String,Set<String>> groups = new HashMap<String,Set<String>>();
+    Set<String> group1 = new HashSet<String>();
+    group1.add("cf1");
+    groups.put("group1", group1);
+    Set<String> group2 = new HashSet<String>();
+    group2.add("cf2");
+    group2.add("cf3");
+    groups.put("group2", group2);
+    tpc.proxy().setLocalityGroups(userpass, testtable, groups);
+
+    Map<String,Set<String>> actualGroups = 
tpc.proxy().getLocalityGroups(userpass, testtable);
+
+    assertEquals(groups.size(), actualGroups.size());
+    for (String groupName : groups.keySet()) {
+      assertTrue(actualGroups.containsKey(groupName));
+      assertEquals(groups.get(groupName).size(), 
actualGroups.get(groupName).size());
+      for (String cf : groups.get(groupName)) {
+        assertTrue(actualGroups.get(groupName).contains(cf));
+      }
+    }
+  }
+
+  @Test
+  public void tableProperties() throws TException {
+    tpc.proxy().setTableProperty(userpass, testtable, "test.property1", 
"wharrrgarbl");
+    assertEquals(tpc.proxy().getTableProperties(userpass, 
testtable).get("test.property1"), "wharrrgarbl");
+    tpc.proxy().removeTableProperty(userpass, testtable, "test.property1");
+    assertNull(tpc.proxy().getTableProperties(userpass, 
testtable).get("test.property1"));
+  }
+
+  private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> 
mutations, String row, String cf, String cq, String value) {
+    ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), 
ByteBuffer.wrap(cq.getBytes()));
+    update.setValue(value.getBytes());
+    mutations.put(ByteBuffer.wrap(row.getBytes()), 
Collections.singletonList(update));
+  }
+
+  @Test
+  public void tableOperationsRowMethods() throws TException {
+    Map<ByteBuffer,List<ColumnUpdate>> mutations = new 
HashMap<ByteBuffer,List<ColumnUpdate>>();
+    for (int i = 0; i < 10; i++) {
+      addMutation(mutations, "" + i, "cf", "cq", "");
+    }
+    tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+
+    assertEquals(tpc.proxy().getMaxRow(userpass, testtable, null, null, true, 
null, true), ByteBuffer.wrap("9".getBytes()));
+
+    tpc.proxy().deleteRows(userpass, testtable, 
ByteBuffer.wrap("51".getBytes()), ByteBuffer.wrap("99".getBytes()));
+    assertEquals(tpc.proxy().getMaxRow(userpass, testtable, null, null, true, 
null, true), ByteBuffer.wrap("5".getBytes()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
 
b/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
new file mode 100644
index 0000000..3a1d413
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.LongCombiner.Type;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.minicluster.impl.ZooKeeperBindException;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ */
+public class CyclicReplicationIT {
+  private static final Logger log = 
LoggerFactory.getLogger(CyclicReplicationIT.class);
+
+  @Rule
+  public Timeout getTimeout() {
+    int scalingFactor = 1;
+    try {
+      scalingFactor = Integer.parseInt(System.getProperty("timeout.factor"));
+    } catch (NumberFormatException exception) {
+      log.warn("Could not parse timeout.factor, not scaling timeout");
+    }
+
+    return new Timeout(scalingFactor * 5 * 60 * 1000);
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private File createTestDir(String name) {
+    File baseDir = new File(System.getProperty("user.dir") + 
"/target/mini-tests");
+    assertTrue(baseDir.mkdirs() || baseDir.isDirectory());
+    File testDir = new File(baseDir, this.getClass().getName() + "_" + 
testName.getMethodName() + "_" + name);
+    FileUtils.deleteQuietly(testDir);
+    assertTrue(testDir.mkdir());
+    return testDir;
+  }
+
+  private void setCoreSite(MiniAccumuloClusterImpl cluster) throws Exception {
+    File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml");
+    if (csFile.exists())
+      throw new RuntimeException(csFile + " already exist");
+
+    Configuration coreSite = new Configuration(false);
+    coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    OutputStream out = new BufferedOutputStream(new FileOutputStream(new 
File(cluster.getConfig().getConfDir(), "core-site.xml")));
+    coreSite.writeXml(out);
+    out.close();
+  }
+
+  /**
+   * Use the same SSL and credential provider configuration that is set up by 
AbstractMacIT for the other MAC used for replication
+   */
+  private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, 
MiniAccumuloConfigImpl peerCfg) {
+    // Set the same SSL information from the primary when present
+    Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig();
+    if 
("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey())))
 {
+      Map<String,String> peerSiteConfig = new HashMap<String,String>();
+      peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
+      String keystorePath = 
primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
+      Assert.assertNotNull("Keystore Path was null", keystorePath);
+      peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), 
keystorePath);
+      String truststorePath = 
primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
+      Assert.assertNotNull("Truststore Path was null", truststorePath);
+      peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), 
truststorePath);
+
+      // Passwords might be stored in CredentialProvider
+      String keystorePassword = 
primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
+      if (null != keystorePassword) {
+        peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), 
keystorePassword);
+      }
+      String truststorePassword = 
primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey());
+      if (null != truststorePassword) {
+        peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), 
truststorePassword);
+      }
+
+      System.out.println("Setting site configuration for peer " + 
peerSiteConfig);
+      peerCfg.setSiteConfig(peerSiteConfig);
+    }
+
+    // Use the CredentialProvider if the primary also uses one
+    String credProvider = 
primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey());
+    if (null != credProvider) {
+      Map<String,String> peerSiteConfig = peerCfg.getSiteConfig();
+      
peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(),
 credProvider);
+      peerCfg.setSiteConfig(peerSiteConfig);
+    }
+  }
+
+  @Test
+  public void dataIsNotOverReplicated() throws Exception {
+    File master1Dir = createTestDir("master1"), master2Dir = 
createTestDir("master2");
+    String password = "password";
+
+    MiniAccumuloConfigImpl master1Cfg;
+    MiniAccumuloClusterImpl master1Cluster;
+    while (true) {
+      master1Cfg = new MiniAccumuloConfigImpl(master1Dir, password);
+      master1Cfg.setNumTservers(1);
+      master1Cfg.setInstanceName("master1");
+
+      // Set up SSL if needed
+      ConfigurableMacBase.configureForEnvironment(master1Cfg, this.getClass(), 
ConfigurableMacBase.getSslDir(master1Dir));
+
+      master1Cfg.setProperty(Property.REPLICATION_NAME, 
master1Cfg.getInstanceName());
+      master1Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+      master1Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m");
+      master1Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+      master1Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+      master1Cluster = new MiniAccumuloClusterImpl(master1Cfg);
+      setCoreSite(master1Cluster);
+
+      try {
+        master1Cluster.start();
+        break;
+      } catch (ZooKeeperBindException e) {
+        log.warn("Failed to start ZooKeeper on " + 
master1Cfg.getZooKeeperPort() + ", will retry");
+      }
+    }
+
+    MiniAccumuloConfigImpl master2Cfg;
+    MiniAccumuloClusterImpl master2Cluster;
+    while (true) {
+      master2Cfg = new MiniAccumuloConfigImpl(master2Dir, password);
+      master2Cfg.setNumTservers(1);
+      master2Cfg.setInstanceName("master2");
+
+      // Set up SSL if needed. Need to share the same SSL truststore as master1
+      this.updatePeerConfigFromPrimary(master1Cfg, master2Cfg);
+
+      master2Cfg.setProperty(Property.REPLICATION_NAME, 
master2Cfg.getInstanceName());
+      master2Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+      master2Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m");
+      master2Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+      master2Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+      master2Cluster = new MiniAccumuloClusterImpl(master2Cfg);
+      setCoreSite(master2Cluster);
+
+      try {
+        master2Cluster.start();
+        break;
+      } catch (ZooKeeperBindException e) {
+        log.warn("Failed to start ZooKeeper on " + 
master2Cfg.getZooKeeperPort() + ", will retry");
+      }
+    }
+
+    try {
+      Connector connMaster1 = master1Cluster.getConnector("root", new 
PasswordToken(password)), connMaster2 = master2Cluster.getConnector("root",
+          new PasswordToken(password));
+
+      String master1UserName = "master1", master1Password = "foo";
+      String master2UserName = "master2", master2Password = "bar";
+      String master1Table = master1Cluster.getInstanceName(), master2Table = 
master2Cluster.getInstanceName();
+
+      connMaster1.securityOperations().createLocalUser(master1UserName, new 
PasswordToken(master1Password));
+      connMaster2.securityOperations().createLocalUser(master2UserName, new 
PasswordToken(master2Password));
+
+      // Configure the credentials we should use to authenticate ourselves to 
the peer for replication
+      
connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey()
 + master2Cluster.getInstanceName(), master2UserName);
+      
connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey()
 + master2Cluster.getInstanceName(), master2Password);
+
+      
connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey()
 + master1Cluster.getInstanceName(), master1UserName);
+      
connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey()
 + master1Cluster.getInstanceName(), master1Password);
+
+      connMaster1.instanceOperations().setProperty(
+          Property.REPLICATION_PEERS.getKey() + 
master2Cluster.getInstanceName(),
+          
ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+              
AccumuloReplicaSystem.buildConfiguration(master2Cluster.getInstanceName(), 
master2Cluster.getZooKeepers())));
+
+      connMaster2.instanceOperations().setProperty(
+          Property.REPLICATION_PEERS.getKey() + 
master1Cluster.getInstanceName(),
+          
ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+              
AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), 
master1Cluster.getZooKeepers())));
+
+      connMaster1.tableOperations().create(master1Table, new 
NewTableConfiguration().withoutDefaultIterators());
+      String master1TableId = 
connMaster1.tableOperations().tableIdMap().get(master1Table);
+      Assert.assertNotNull(master1TableId);
+
+      connMaster2.tableOperations().create(master2Table, new 
NewTableConfiguration().withoutDefaultIterators());
+      String master2TableId = 
connMaster2.tableOperations().tableIdMap().get(master2Table);
+      Assert.assertNotNull(master2TableId);
+
+      // Replicate master1 in the master1 cluster to master2 in the master2 
cluster
+      connMaster1.tableOperations().setProperty(master1Table, 
Property.TABLE_REPLICATION.getKey(), "true");
+      connMaster1.tableOperations().setProperty(master1Table, 
Property.TABLE_REPLICATION_TARGET.getKey() + master2Cluster.getInstanceName(), 
master2TableId);
+
+      // Replicate master2 in the master2 cluster to master1 in the master2 
cluster
+      connMaster2.tableOperations().setProperty(master2Table, 
Property.TABLE_REPLICATION.getKey(), "true");
+      connMaster2.tableOperations().setProperty(master2Table, 
Property.TABLE_REPLICATION_TARGET.getKey() + master1Cluster.getInstanceName(), 
master1TableId);
+
+      // Give our replication user the ability to write to the respective table
+      connMaster1.securityOperations().grantTablePermission(master1UserName, 
master1Table, TablePermission.WRITE);
+      connMaster2.securityOperations().grantTablePermission(master2UserName, 
master2Table, TablePermission.WRITE);
+
+      IteratorSetting summingCombiner = new IteratorSetting(50, 
SummingCombiner.class);
+      SummingCombiner.setEncodingType(summingCombiner, Type.STRING);
+      SummingCombiner.setCombineAllColumns(summingCombiner, true);
+
+      // Set a combiner on both instances that will sum multiple values
+      // We can use this to verify that the mutation was not sent multiple 
times
+      connMaster1.tableOperations().attachIterator(master1Table, 
summingCombiner);
+      connMaster2.tableOperations().attachIterator(master2Table, 
summingCombiner);
+
+      // Write a single entry
+      BatchWriter bw = connMaster1.createBatchWriter(master1Table, new 
BatchWriterConfig());
+      Mutation m = new Mutation("row");
+      m.put("count", "", "1");
+      bw.addMutation(m);
+      bw.close();
+
+      Set<String> files = 
connMaster1.replicationOperations().referencedFiles(master1Table);
+
+      log.info("Found {} that need replication from master1", files);
+
+      // Kill and restart the tserver to close the WAL on master1
+      for (ProcessReference proc : 
master1Cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+        master1Cluster.killProcess(ServerType.TABLET_SERVER, proc);
+      }
+
+      master1Cluster.exec(TabletServer.class);
+
+      log.info("Restarted tserver on master1");
+
+      // Try to avoid ACCUMULO-2964
+      Thread.sleep(1000);
+
+      // Sanity check that the element is there on master1
+      Scanner s = connMaster1.createScanner(master1Table, 
Authorizations.EMPTY);
+      Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+      Assert.assertEquals("1", entry.getValue().toString());
+
+      // Wait for this table to replicate
+      connMaster1.replicationOperations().drain(master1Table, files);
+
+      Thread.sleep(5000);
+
+      // Check that the element made it to master2 only once
+      s = connMaster2.createScanner(master2Table, Authorizations.EMPTY);
+      entry = Iterables.getOnlyElement(s);
+      Assert.assertEquals("1", entry.getValue().toString());
+
+      // Wait for master2 to finish replicating it back
+      files = 
connMaster2.replicationOperations().referencedFiles(master2Table);
+
+      // Kill and restart the tserver to close the WAL on master2
+      for (ProcessReference proc : 
master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+        master2Cluster.killProcess(ServerType.TABLET_SERVER, proc);
+      }
+
+      master2Cluster.exec(TabletServer.class);
+
+      // Try to avoid ACCUMULO-2964
+      Thread.sleep(1000);
+
+      // Check that the element made it to master2 only once
+      s = connMaster2.createScanner(master2Table, Authorizations.EMPTY);
+      entry = Iterables.getOnlyElement(s);
+      Assert.assertEquals("1", entry.getValue().toString());
+
+      connMaster2.replicationOperations().drain(master2Table, files);
+
+      Thread.sleep(5000);
+
+      // Verify that the entry wasn't sent back to master1
+      s = connMaster1.createScanner(master1Table, Authorizations.EMPTY);
+      entry = Iterables.getOnlyElement(s);
+      Assert.assertEquals("1", entry.getValue().toString());
+    } finally {
+      master1Cluster.stop();
+      master2Cluster.stop();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
 
b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
new file mode 100644
index 0000000..ab142d0
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ClientExecReturn;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed 
when a TServer may still continue to use it. Checking that no tablet references 
a
+ * WAL is insufficient to determine if a WAL will never be used in the future.
+ */
+public class GarbageCollectorCommunicatesWithTServersIT extends 
ConfigurableMacBase {
+  private static final Logger log = 
LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServersIT.class);
+
+  private final int GC_PERIOD_SECONDS = 1;
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
+    // Wait longer to try to let the replication table come online before a 
cycle runs
+    cfg.setProperty(Property.GC_CYCLE_START, "10s");
+    cfg.setProperty(Property.REPLICATION_NAME, "master");
+    // Set really long delays for the master to do stuff for replication. We 
don't need
+    // it to be doing anything, so just let it sleep
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s");
+    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s");
+    cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s");
+    // Pull down the maximum size of the wal so we can test close()'ing it.
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  /**
+   * Fetch all of the WALs referenced by tablets in the metadata table for 
this table
+   */
+  private Set<String> getWalsForTable(String tableName) throws Exception {
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, 
tableId);
+
+    Instance i = conn.getInstance();
+    ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), 
i.getZooKeepersSessionTimeOut(), "");
+    WalStateManager wals = new WalStateManager(conn.getInstance(), zk);
+
+    Set<String> result = new HashSet<String>();
+    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      log.debug("Reading WALs: {}={}", entry.getKey(), entry.getValue());
+      result.add(entry.getKey().toString());
+    }
+    return result;
+  }
+
+  /**
+   * Fetch all of the rfiles referenced by tablets in the metadata table for 
this table
+   */
+  private Set<String> getFilesForTable(String tableName) throws Exception {
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, 
tableId);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    Range r = MetadataSchema.TabletsSection.getRange(tableId);
+    s.setRange(r);
+    
s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+    Set<String> rfiles = new HashSet<String>();
+    for (Entry<Key,Value> entry : s) {
+      log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), 
entry.getValue());
+      // uri://path/to/wal
+      String cq = entry.getKey().getColumnQualifier().toString();
+      String path = new Path(cq).toString();
+      log.debug("Normalize path to rfile: {}", path);
+      rfiles.add(path);
+    }
+
+    return rfiles;
+  }
+
+  /**
+   * Get the replication status messages for the given table that exist in the 
metadata table (~repl entries)
+   */
+  private Map<String,Status> getMetadataStatusForTable(String tableName) 
throws Exception {
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, 
tableId);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    Range r = MetadataSchema.ReplicationSection.getRange();
+    s.setRange(r);
+    s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId));
+
+    Map<String,Status> fileToStatus = new HashMap<String,Status>();
+    for (Entry<Key,Value> entry : s) {
+      Text file = new Text();
+      MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
+      Status status = Status.parseFrom(entry.getValue().get());
+      log.info("Got status for {}: {}", file, ProtobufUtil.toString(status));
+      fileToStatus.put(file.toString(), status);
+    }
+
+    return fileToStatus;
+  }
+
+  @Test
+  public void testActiveWalPrecludesClosing() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    final Connector conn = getConnector();
+
+    // Bring the replication table online first and foremost
+    ReplicationTable.setOnline(conn);
+
+    log.info("Creating {}", table);
+    conn.tableOperations().create(table);
+
+    conn.tableOperations().setProperty(table, 
Property.TABLE_REPLICATION.getKey(), "true");
+
+    log.info("Writing a few mutations to the table");
+
+    BatchWriter bw = conn.createBatchWriter(table, null);
+
+    byte[] empty = new byte[0];
+    for (int i = 0; i < 5; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, empty);
+      bw.addMutation(m);
+    }
+
+    log.info("Flushing mutations to the server");
+    bw.flush();
+
+    log.info("Checking that metadata only has one WAL recorded for this 
table");
+
+    Set<String> wals = getWalsForTable(table);
+    Assert.assertEquals("Expected to only find two WALs for the table", 2, 
wals.size());
+
+    log.info("Compacting the table which will remove all WALs from the 
tablets");
+
+    // Flush our test table to remove the WAL references in it
+    conn.tableOperations().flush(table, null, null, true);
+    // Flush the metadata table too because it will have a reference to the WAL
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+
+    log.info("Waiting for replication table to come online");
+
+    log.info("Fetching replication statuses from metadata table");
+
+    Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
+
+    Assert.assertEquals("Expected to only find one replication status 
message", 1, fileToStatus.size());
+
+    String walName = fileToStatus.keySet().iterator().next();
+    wals.retainAll(fileToStatus.keySet());
+    Assert.assertEquals(1, wals.size());
+
+    Status status = fileToStatus.get(walName);
+
+    Assert.assertEquals("Expected Status for file to not be closed", false, 
status.getClosed());
+
+    Set<String> filesForTable = getFilesForTable(table);
+    Assert.assertEquals("Expected to only find one rfile for table", 1, 
filesForTable.size());
+    log.info("Files for table before MajC: {}", filesForTable);
+
+    // Issue a MajC to roll a new file in HDFS
+    conn.tableOperations().compact(table, null, null, false, true);
+
+    Set<String> filesForTableAfterCompaction = getFilesForTable(table);
+
+    log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
+
+    Assert.assertEquals("Expected to only find one rfile for table", 1, 
filesForTableAfterCompaction.size());
+    Assert.assertNotEquals("Expected the files before and after compaction to 
differ", filesForTableAfterCompaction, filesForTable);
+
+    // Use the rfile which was just replaced by the MajC to determine when the 
GC has ran
+    Path fileToBeDeleted = new Path(filesForTable.iterator().next());
+    FileSystem fs = getCluster().getFileSystem();
+
+    boolean fileExists = fs.exists(fileToBeDeleted);
+    while (fileExists) {
+      log.info("File which should get deleted still exists: {}", 
fileToBeDeleted);
+      Thread.sleep(2000);
+      fileExists = fs.exists(fileToBeDeleted);
+    }
+
+    Map<String,Status> fileToStatusAfterMinc = 
getMetadataStatusForTable(table);
+    Assert.assertEquals("Expected to still find only one replication status 
message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
+
+    Assert.assertEquals("Status before and after MinC should be identical", 
fileToStatus, fileToStatusAfterMinc);
+  }
+
+  @Test(timeout = 2 * 60 * 1000)
+  public void testUnreferencedWalInTserverIsClosed() throws Exception {
+    final String[] names = getUniqueNames(2);
+    // `table` will be replicated, `otherTable` is only used to roll the WAL 
on the tserver
+    final String table = names[0], otherTable = names[1];
+    final Connector conn = getConnector();
+
+    // Bring the replication table online first and foremost
+    ReplicationTable.setOnline(conn);
+
+    log.info("Creating {}", table);
+    conn.tableOperations().create(table);
+
+    conn.tableOperations().setProperty(table, 
Property.TABLE_REPLICATION.getKey(), "true");
+
+    log.info("Writing a few mutations to the table");
+
+    BatchWriter bw = conn.createBatchWriter(table, null);
+
+    byte[] empty = new byte[0];
+    for (int i = 0; i < 5; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, empty);
+      bw.addMutation(m);
+    }
+
+    log.info("Flushing mutations to the server");
+    bw.close();
+
+    log.info("Checking that metadata only has one WAL recorded for this 
table");
+
+    Set<String> wals = getWalsForTable(table);
+    Assert.assertEquals("Expected to only find two WAL for the table", 2, 
wals.size());
+
+    log.info("Compacting the table which will remove all WALs from the 
tablets");
+
+    // Flush our test table to remove the WAL references in it
+    conn.tableOperations().flush(table, null, null, true);
+    // Flush the metadata table too because it will have a reference to the WAL
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+
+    log.info("Fetching replication statuses from metadata table");
+
+    Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
+
+    Assert.assertEquals("Expected to only find one replication status 
message", 1, fileToStatus.size());
+
+    String walName = fileToStatus.keySet().iterator().next();
+    Assert.assertTrue("Expected log file name from tablet to equal replication 
entry", wals.contains(walName));
+
+    Status status = fileToStatus.get(walName);
+
+    Assert.assertEquals("Expected Status for file to not be closed", false, 
status.getClosed());
+
+    Set<String> filesForTable = getFilesForTable(table);
+    Assert.assertEquals("Expected to only find one rfile for table", 1, 
filesForTable.size());
+    log.info("Files for table before MajC: {}", filesForTable);
+
+    // Issue a MajC to roll a new file in HDFS
+    conn.tableOperations().compact(table, null, null, false, true);
+
+    Set<String> filesForTableAfterCompaction = getFilesForTable(table);
+
+    log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
+
+    Assert.assertEquals("Expected to only find one rfile for table", 1, 
filesForTableAfterCompaction.size());
+    Assert.assertNotEquals("Expected the files before and after compaction to 
differ", filesForTableAfterCompaction, filesForTable);
+
+    // Use the rfile which was just replaced by the MajC to determine when the 
GC has ran
+    Path fileToBeDeleted = new Path(filesForTable.iterator().next());
+    FileSystem fs = getCluster().getFileSystem();
+
+    boolean fileExists = fs.exists(fileToBeDeleted);
+    while (fileExists) {
+      log.info("File which should get deleted still exists: {}", 
fileToBeDeleted);
+      Thread.sleep(2000);
+      fileExists = fs.exists(fileToBeDeleted);
+    }
+
+    // At this point in time, we *know* that the GarbageCollector has run 
which means that the Status
+    // for our WAL should not be altered.
+
+    Map<String,Status> fileToStatusAfterMinc = 
getMetadataStatusForTable(table);
+    Assert.assertEquals("Expected to still find only one replication status 
message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
+
+    /*
+     * To verify that the WALs is still getting closed, we have to force the 
tserver to close the existing WAL and open a new one instead. The easiest way 
to do
+     * this is to write a load of data that will exceed the 1.33% full 
threshold that the logger keeps track of
+     */
+
+    conn.tableOperations().create(otherTable);
+    bw = conn.createBatchWriter(otherTable, null);
+    // 500k
+    byte[] bigValue = new byte[1024 * 500];
+    Arrays.fill(bigValue, (byte) 1);
+    // 500k * 50
+    for (int i = 0; i < 50; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, bigValue);
+      bw.addMutation(m);
+      if (i % 10 == 0) {
+        bw.flush();
+      }
+    }
+
+    bw.close();
+
+    conn.tableOperations().flush(otherTable, null, null, true);
+
+    // Get the tservers which the master deems as active
+    final ClientContext context = new ClientContext(conn.getInstance(), new 
Credentials("root", new PasswordToken(ConfigurableMacBase.ROOT_PASSWORD)),
+        getClientConfig());
+    List<String> tservers = MasterClient.execute(context, new 
ClientExecReturn<List<String>,MasterClientService.Client>() {
+      @Override
+      public List<String> execute(MasterClientService.Client client) throws 
Exception {
+        return client.getActiveTservers(Tracer.traceInfo(), 
context.rpcCreds());
+      }
+    });
+
+    Assert.assertEquals("Expected only one active tservers", 1, 
tservers.size());
+
+    HostAndPort tserver = HostAndPort.fromString(tservers.get(0));
+
+    // Get the active WALs from that server
+    log.info("Fetching active WALs from {}", tserver);
+
+    Client client = ThriftUtil.getTServerClient(tserver, context);
+    List<String> activeWalsForTserver = 
client.getActiveLogs(Tracer.traceInfo(), context.rpcCreds());
+
+    log.info("Active wals: {}", activeWalsForTserver);
+
+    Assert.assertEquals("Expected to find only one active WAL", 1, 
activeWalsForTserver.size());
+
+    String activeWal = new Path(activeWalsForTserver.get(0)).toString();
+
+    Assert.assertNotEquals("Current active WAL on tserver should not be the 
original WAL we saw", walName, activeWal);
+
+    log.info("Ensuring that replication status does get closed after WAL is no 
longer in use by Tserver");
+
+    do {
+      Map<String,Status> replicationStatuses = 
getMetadataStatusForTable(table);
+
+      log.info("Got replication status messages {}", replicationStatuses);
+      Assert.assertEquals("Did not expect to find additional status records", 
1, replicationStatuses.size());
+
+      status = replicationStatuses.values().iterator().next();
+      log.info("Current status: {}", ProtobufUtil.toString(status));
+
+      if (status.getClosed()) {
+        return;
+      }
+
+      log.info("Status is not yet closed, waiting for garbage collector to 
close it");
+
+      Thread.sleep(2000);
+    } while (true);
+  }
+}

Reply via email to