http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TBinaryProxyIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TBinaryProxyIT.java b/test/src/main/java/org/apache/accumulo/test/proxy/TBinaryProxyIT.java new file mode 100644 index 0000000..6359d1e --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TBinaryProxyIT.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import org.apache.thrift.protocol.TBinaryProtocol; +import org.junit.BeforeClass; + +/** + * + */ +public class TBinaryProxyIT extends SimpleProxyBase { + + @BeforeClass + public static void setProtocol() throws Exception { + SimpleProxyBase.factory = new TBinaryProtocol.Factory(); + setUpProxy(); + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java b/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java new file mode 100644 index 0000000..a92414a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import org.apache.thrift.protocol.TCompactProtocol; +import org.junit.BeforeClass; + +/** + * + */ +public class TCompactProxyIT extends SimpleProxyBase { + + @BeforeClass + public static void setProtocol() throws Exception { + SimpleProxyBase.factory = new TCompactProtocol.Factory(); + setUpProxy(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TJsonProtocolProxyIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TJsonProtocolProxyIT.java b/test/src/main/java/org/apache/accumulo/test/proxy/TJsonProtocolProxyIT.java new file mode 100644 index 0000000..5fcbf53 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TJsonProtocolProxyIT.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import org.apache.thrift.protocol.TJSONProtocol; +import org.junit.BeforeClass; + +/** + * + */ +public class TJsonProtocolProxyIT extends SimpleProxyBase { + + @BeforeClass + public static void setProtocol() throws Exception { + SimpleProxyBase.factory = new TJSONProtocol.Factory(); + setUpProxy(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TTupleProxyIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TTupleProxyIT.java b/test/src/main/java/org/apache/accumulo/test/proxy/TTupleProxyIT.java new file mode 100644 index 0000000..cdecf2c --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TTupleProxyIT.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import org.apache.thrift.protocol.TTupleProtocol; +import org.junit.BeforeClass; + +/** + * + */ +public class TTupleProxyIT extends SimpleProxyBase { + + @BeforeClass + public static void setProtocol() throws Exception { + SimpleProxyBase.factory = new TTupleProtocol.Factory(); + setUpProxy(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyClient.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyClient.java b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyClient.java new file mode 100644 index 0000000..ff92795 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyClient.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.sasl.SaslException; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.rpc.UGIAssumingTransport; +import org.apache.accumulo.proxy.Util; +import org.apache.accumulo.proxy.thrift.AccumuloProxy; +import org.apache.accumulo.proxy.thrift.ColumnUpdate; +import org.apache.accumulo.proxy.thrift.Key; +import org.apache.accumulo.proxy.thrift.ScanResult; +import org.apache.accumulo.proxy.thrift.TimeType; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class TestProxyClient { + + protected AccumuloProxy.Client proxy; + protected TTransport transport; + + public TestProxyClient(String host, int port) throws TTransportException { + this(host, port, new TCompactProtocol.Factory()); + } + + public TestProxyClient(String host, int port, TProtocolFactory protoFactory) throws TTransportException { + final TSocket socket = new TSocket(host, port); + socket.setTimeout(600000); + transport = new TFramedTransport(socket); + final TProtocol protocol = protoFactory.getProtocol(transport); + proxy = new AccumuloProxy.Client(protocol); + transport.open(); + } + + public TestProxyClient(String host, int port, TProtocolFactory protoFactory, String proxyPrimary, UserGroupInformation ugi) throws SaslException, + TTransportException { + TSocket socket = new TSocket(host, port); + TSaslClientTransport saslTransport = new TSaslClientTransport("GSSAPI", null, proxyPrimary, host, Collections.singletonMap("javax.security.sasl.qop", + "auth"), null, socket); + + transport = new UGIAssumingTransport(saslTransport, ugi); + + // UGI transport will perform the doAs for us + transport.open(); + + AccumuloProxy.Client.Factory factory = new AccumuloProxy.Client.Factory(); + final TProtocol protocol = protoFactory.getProtocol(transport); + proxy = factory.getClient(protocol); + } + + public synchronized void close() { + if (null != transport) { + transport.close(); + transport = null; + } + } + + public AccumuloProxy.Client proxy() { + return proxy; + } + + public static void main(String[] args) throws Exception { + + TestProxyClient tpc = new TestProxyClient("localhost", 42424); + String principal = "root"; + Map<String,String> props = new TreeMap<String,String>(); + props.put("password", "secret"); + + System.out.println("Logging in"); + ByteBuffer login = tpc.proxy.login(principal, props); + + System.out.println("Creating user: "); + if (!tpc.proxy().listLocalUsers(login).contains("testuser")) { + tpc.proxy().createLocalUser(login, "testuser", ByteBuffer.wrap("testpass".getBytes(UTF_8))); + } + System.out.println("UserList: " + tpc.proxy().listLocalUsers(login)); + + System.out.println("Listing: " + tpc.proxy().listTables(login)); + + System.out.println("Deleting: "); + String testTable = "testtableOMGOMGOMG"; + + System.out.println("Creating: "); + + if (tpc.proxy().tableExists(login, testTable)) + tpc.proxy().deleteTable(login, testTable); + + tpc.proxy().createTable(login, testTable, true, TimeType.MILLIS); + + System.out.println("Listing: " + tpc.proxy().listTables(login)); + + System.out.println("Writing: "); + Date start = new Date(); + Date then = new Date(); + int maxInserts = 1000000; + String format = "%1$05d"; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + for (int i = 0; i < maxInserts; i++) { + String result = String.format(format, i); + ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(("cf" + i).getBytes(UTF_8)), ByteBuffer.wrap(("cq" + i).getBytes(UTF_8))); + update.setValue(Util.randStringBuffer(10)); + mutations.put(ByteBuffer.wrap(result.getBytes(UTF_8)), Collections.singletonList(update)); + + if (i % 1000 == 0) { + tpc.proxy().updateAndFlush(login, testTable, mutations); + mutations.clear(); + } + } + tpc.proxy().updateAndFlush(login, testTable, mutations); + Date end = new Date(); + System.out.println(" End of writing: " + (end.getTime() - start.getTime())); + + tpc.proxy().deleteTable(login, testTable); + tpc.proxy().createTable(login, testTable, true, TimeType.MILLIS); + + // Thread.sleep(1000); + + System.out.println("Writing async: "); + start = new Date(); + then = new Date(); + mutations.clear(); + String writer = tpc.proxy().createWriter(login, testTable, null); + for (int i = 0; i < maxInserts; i++) { + String result = String.format(format, i); + Key pkey = new Key(); + pkey.setRow(result.getBytes(UTF_8)); + ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(("cf" + i).getBytes(UTF_8)), ByteBuffer.wrap(("cq" + i).getBytes(UTF_8))); + update.setValue(Util.randStringBuffer(10)); + mutations.put(ByteBuffer.wrap(result.getBytes(UTF_8)), Collections.singletonList(update)); + tpc.proxy().update(writer, mutations); + mutations.clear(); + } + + end = new Date(); + System.out.println(" End of writing: " + (end.getTime() - start.getTime())); + start = end; + System.out.println("Closing..."); + tpc.proxy().closeWriter(writer); + end = new Date(); + System.out.println(" End of closing: " + (end.getTime() - start.getTime())); + + System.out.println("Reading: "); + + String regex = "cf1.*"; + + IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class); + RegExFilter.setRegexs(is, null, regex, null, null, false); + + String cookie = tpc.proxy().createScanner(login, testTable, null); + + int i = 0; + start = new Date(); + then = new Date(); + boolean hasNext = true; + + int k = 1000; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + + Date now = new Date(); + System.out.println(i + " " + (now.getTime() - then.getTime())); + then = now; + + i += kvList.getResultsSize(); + // for (TKeyValue kv:kvList.getResults()) System.out.println(new Key(kv.getKey())); + hasNext = kvList.isMore(); + } + end = new Date(); + System.out.println("Total entries: " + i + " total time " + (end.getTime() - start.getTime())); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java new file mode 100644 index 0000000..ff94dd4 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Properties; + +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.proxy.Proxy; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; + +public class TestProxyInstanceOperations { + private static final Logger log = LoggerFactory.getLogger(TestProxyInstanceOperations.class); + + protected static TServer proxy; + protected static TestProxyClient tpc; + protected static ByteBuffer userpass; + protected static final int port = 10197; + + @BeforeClass + public static void setup() throws Exception { + Properties prop = new Properties(); + prop.setProperty("useMockInstance", "true"); + prop.put("tokenClass", PasswordToken.class.getName()); + + proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server; + log.info("Waiting for proxy to start"); + while (!proxy.isServing()) { + Thread.sleep(500); + } + log.info("Proxy started"); + tpc = new TestProxyClient("localhost", port); + userpass = tpc.proxy.login("root", Collections.singletonMap("password", "")); + } + + @AfterClass + public static void tearDown() throws InterruptedException { + proxy.stop(); + } + + @Test + public void properties() throws TException { + tpc.proxy().setProperty(userpass, "test.systemprop", "whistletips"); + + assertEquals(tpc.proxy().getSystemConfiguration(userpass).get("test.systemprop"), "whistletips"); + tpc.proxy().removeProperty(userpass, "test.systemprop"); + assertNull(tpc.proxy().getSystemConfiguration(userpass).get("test.systemprop")); + + } + + @Test + public void testClassLoad() throws TException { + assertTrue(tpc.proxy().testClassLoad(userpass, "org.apache.accumulo.core.iterators.user.RegExFilter", "org.apache.accumulo.core.iterators.Filter")); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java new file mode 100644 index 0000000..1a75fea --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.proxy.Proxy; +import org.apache.accumulo.proxy.Util; +import org.apache.accumulo.proxy.thrift.BatchScanOptions; +import org.apache.accumulo.proxy.thrift.ColumnUpdate; +import org.apache.accumulo.proxy.thrift.IteratorSetting; +import org.apache.accumulo.proxy.thrift.Key; +import org.apache.accumulo.proxy.thrift.KeyValue; +import org.apache.accumulo.proxy.thrift.Range; +import org.apache.accumulo.proxy.thrift.ScanColumn; +import org.apache.accumulo.proxy.thrift.ScanOptions; +import org.apache.accumulo.proxy.thrift.ScanResult; +import org.apache.accumulo.proxy.thrift.TimeType; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.net.HostAndPort; + +public class TestProxyReadWrite { + protected static TServer proxy; + protected static TestProxyClient tpc; + protected static ByteBuffer userpass; + protected static final int port = 10194; + protected static final String testtable = "testtable"; + + @BeforeClass + public static void setup() throws Exception { + Properties prop = new Properties(); + prop.setProperty("useMockInstance", "true"); + prop.put("tokenClass", PasswordToken.class.getName()); + + proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server; + tpc = new TestProxyClient("localhost", port); + userpass = tpc.proxy().login("root", Collections.singletonMap("password", "")); + } + + @AfterClass + public static void tearDown() throws InterruptedException { + proxy.stop(); + } + + @Before + public void makeTestTable() throws Exception { + tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS); + } + + @After + public void deleteTestTable() throws Exception { + tpc.proxy().deleteTable(userpass, testtable); + } + + private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String value) { + ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes())); + update.setValue(value.getBytes()); + mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update)); + } + + private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String vis, String value) { + ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes())); + update.setValue(value.getBytes()); + update.setColVisibility(vis.getBytes()); + mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update)); + } + + /** + * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a range so only the entries between -Inf...5 come back (there should be + * 50,000) + */ + @Test + public void readWriteBatchOneShotWithRange() throws Exception { + int maxInserts = 100000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$05d"; + for (int i = 0; i < maxInserts; i++) { + addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + tpc.proxy().updateAndFlush(userpass, testtable, mutations); + mutations.clear(); + } + } + + Key stop = new Key(); + stop.setRow("5".getBytes()); + BatchScanOptions options = new BatchScanOptions(); + options.ranges = Collections.singletonList(new Range(null, false, stop, false)); + String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + i += kvList.getResultsSize(); + hasNext = kvList.isMore(); + } + assertEquals(i, 50000); + } + + /** + * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a columnFamily so only the entries with specified column family come back + * (there should be 50,000) + */ + @Test + public void readWriteBatchOneShotWithColumnFamilyOnly() throws Exception { + int maxInserts = 100000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$05d"; + for (int i = 0; i < maxInserts; i++) { + + addMutation(mutations, String.format(format, i), "cf" + (i % 2), "cq" + (i % 2), Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + tpc.proxy().updateAndFlush(userpass, testtable, mutations); + mutations.clear(); + } + } + + BatchScanOptions options = new BatchScanOptions(); + + ScanColumn sc = new ScanColumn(); + sc.colFamily = ByteBuffer.wrap("cf0".getBytes()); + + options.columns = Collections.singletonList(sc); + String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + i += kvList.getResultsSize(); + hasNext = kvList.isMore(); + } + assertEquals(i, 50000); + } + + /** + * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a columnFamily + columnQualififer so only the entries with specified column + * come back (there should be 50,000) + */ + @Test + public void readWriteBatchOneShotWithFullColumn() throws Exception { + int maxInserts = 100000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$05d"; + for (int i = 0; i < maxInserts; i++) { + + addMutation(mutations, String.format(format, i), "cf" + (i % 2), "cq" + (i % 2), Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + tpc.proxy().updateAndFlush(userpass, testtable, mutations); + mutations.clear(); + } + } + + BatchScanOptions options = new BatchScanOptions(); + + ScanColumn sc = new ScanColumn(); + sc.colFamily = ByteBuffer.wrap("cf0".getBytes()); + sc.colQualifier = ByteBuffer.wrap("cq0".getBytes()); + + options.columns = Collections.singletonList(sc); + String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + i += kvList.getResultsSize(); + hasNext = kvList.isMore(); + } + assertEquals(i, 50000); + } + + /** + * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Filter the results so only the even numbers come back. + */ + @Test + public void readWriteBatchOneShotWithFilterIterator() throws Exception { + int maxInserts = 10000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$05d"; + for (int i = 0; i < maxInserts; i++) { + addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + tpc.proxy().updateAndFlush(userpass, testtable, mutations); + mutations.clear(); + } + + } + + String regex = ".*[02468]"; + + org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class); + RegExFilter.setRegexs(is, regex, null, null, null, false); + + IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is); + ScanOptions opts = new ScanOptions(); + opts.iterators = Collections.singletonList(pis); + String cookie = tpc.proxy().createScanner(userpass, testtable, opts); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + for (KeyValue kv : kvList.getResults()) { + assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i); + + i += 2; + } + hasNext = kvList.isMore(); + } + } + + @Test + public void readWriteOneShotWithRange() throws Exception { + int maxInserts = 100000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$05d"; + for (int i = 0; i < maxInserts; i++) { + addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + tpc.proxy().updateAndFlush(userpass, testtable, mutations); + mutations.clear(); + } + } + + Key stop = new Key(); + stop.setRow("5".getBytes()); + ScanOptions opts = new ScanOptions(); + opts.range = new Range(null, false, stop, false); + String cookie = tpc.proxy().createScanner(userpass, testtable, opts); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + i += kvList.getResultsSize(); + hasNext = kvList.isMore(); + } + assertEquals(i, 50000); + } + + /** + * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Filter the results so only the even numbers come back. + */ + @Test + public void readWriteOneShotWithFilterIterator() throws Exception { + int maxInserts = 10000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$05d"; + for (int i = 0; i < maxInserts; i++) { + addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + + tpc.proxy().updateAndFlush(userpass, testtable, mutations); + mutations.clear(); + + } + + } + + String regex = ".*[02468]"; + + org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class); + RegExFilter.setRegexs(is, regex, null, null, null, false); + + IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is); + ScanOptions opts = new ScanOptions(); + opts.iterators = Collections.singletonList(pis); + String cookie = tpc.proxy().createScanner(userpass, testtable, opts); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + for (KeyValue kv : kvList.getResults()) { + assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i); + + i += 2; + } + hasNext = kvList.isMore(); + } + } + + // @Test + // This test takes kind of a long time. Enable it if you think you may have memory issues. + public void manyWritesAndReads() throws Exception { + int maxInserts = 1000000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$06d"; + String writer = tpc.proxy().createWriter(userpass, testtable, null); + for (int i = 0; i < maxInserts; i++) { + addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + + tpc.proxy().update(writer, mutations); + mutations.clear(); + + } + + } + + tpc.proxy().flush(writer); + tpc.proxy().closeWriter(writer); + + String cookie = tpc.proxy().createScanner(userpass, testtable, null); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + for (KeyValue kv : kvList.getResults()) { + assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i); + i++; + } + hasNext = kvList.isMore(); + if (hasNext) + assertEquals(k, kvList.getResults().size()); + } + assertEquals(maxInserts, i); + } + + @Test + public void asynchReadWrite() throws Exception { + int maxInserts = 10000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$05d"; + String writer = tpc.proxy().createWriter(userpass, testtable, null); + for (int i = 0; i < maxInserts; i++) { + addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + tpc.proxy().update(writer, mutations); + mutations.clear(); + } + } + + tpc.proxy().flush(writer); + tpc.proxy().closeWriter(writer); + + String regex = ".*[02468]"; + + org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class); + RegExFilter.setRegexs(is, regex, null, null, null, false); + + IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is); + ScanOptions opts = new ScanOptions(); + opts.iterators = Collections.singletonList(pis); + String cookie = tpc.proxy().createScanner(userpass, testtable, opts); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + int numRead = 0; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + for (KeyValue kv : kvList.getResults()) { + assertEquals(i, Integer.parseInt(new String(kv.getKey().getRow()))); + numRead++; + i += 2; + } + hasNext = kvList.isMore(); + } + assertEquals(maxInserts / 2, numRead); + } + + @Test + public void testVisibility() throws Exception { + + Set<ByteBuffer> auths = new HashSet<ByteBuffer>(); + auths.add(ByteBuffer.wrap("even".getBytes())); + tpc.proxy().changeUserAuthorizations(userpass, "root", auths); + + int maxInserts = 10000; + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + String format = "%1$05d"; + String writer = tpc.proxy().createWriter(userpass, testtable, null); + for (int i = 0; i < maxInserts; i++) { + if (i % 2 == 0) + addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, "even", Util.randString(10)); + else + addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, "odd", Util.randString(10)); + + if (i % 1000 == 0 || i == maxInserts - 1) { + tpc.proxy().update(writer, mutations); + mutations.clear(); + } + } + + tpc.proxy().flush(writer); + tpc.proxy().closeWriter(writer); + ScanOptions opts = new ScanOptions(); + opts.authorizations = auths; + String cookie = tpc.proxy().createScanner(userpass, testtable, opts); + + int i = 0; + boolean hasNext = true; + + int k = 1000; + int numRead = 0; + while (hasNext) { + ScanResult kvList = tpc.proxy().nextK(cookie, k); + for (KeyValue kv : kvList.getResults()) { + assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i); + i += 2; + numRead++; + } + hasNext = kvList.isMore(); + + } + assertEquals(maxInserts / 2, numRead); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java new file mode 100644 index 0000000..eda38e5 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; + +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.proxy.Proxy; +import org.apache.accumulo.proxy.thrift.SystemPermission; +import org.apache.accumulo.proxy.thrift.TablePermission; +import org.apache.accumulo.proxy.thrift.TimeType; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.net.HostAndPort; + +public class TestProxySecurityOperations { + protected static TServer proxy; + protected static TestProxyClient tpc; + protected static ByteBuffer userpass; + protected static final int port = 10196; + protected static final String testtable = "testtable"; + protected static final String testuser = "VonJines"; + protected static final ByteBuffer testpw = ByteBuffer.wrap("fiveones".getBytes()); + + @BeforeClass + public static void setup() throws Exception { + Properties prop = new Properties(); + prop.setProperty("useMockInstance", "true"); + prop.put("tokenClass", PasswordToken.class.getName()); + + proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server; + while (!proxy.isServing()) { + Thread.sleep(500); + } + tpc = new TestProxyClient("localhost", port); + userpass = tpc.proxy().login("root", Collections.singletonMap("password", "")); + } + + @AfterClass + public static void tearDown() throws InterruptedException { + proxy.stop(); + } + + @Before + public void makeTestTableAndUser() throws Exception { + tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS); + tpc.proxy().createLocalUser(userpass, testuser, testpw); + } + + @After + public void deleteTestTable() throws Exception { + tpc.proxy().deleteTable(userpass, testtable); + tpc.proxy().dropLocalUser(userpass, testuser); + } + + @Test + public void create() throws TException { + tpc.proxy().createLocalUser(userpass, testuser + "2", testpw); + assertTrue(tpc.proxy().listLocalUsers(userpass).contains(testuser + "2")); + tpc.proxy().dropLocalUser(userpass, testuser + "2"); + assertTrue(!tpc.proxy().listLocalUsers(userpass).contains(testuser + "2")); + } + + @Test + public void authenticate() throws TException { + assertTrue(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(testpw))); + assertFalse(tpc.proxy().authenticateUser(userpass, "EvilUser", bb2pp(testpw))); + + tpc.proxy().changeLocalUserPassword(userpass, testuser, ByteBuffer.wrap("newpass".getBytes())); + assertFalse(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(testpw))); + assertTrue(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(ByteBuffer.wrap("newpass".getBytes())))); + + } + + @Test + public void tablePermissions() throws TException { + tpc.proxy().grantTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE); + assertTrue(tpc.proxy().hasTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE)); + + tpc.proxy().revokeTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE); + assertFalse(tpc.proxy().hasTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE)); + + } + + @Test + public void systemPermissions() throws TException { + tpc.proxy().grantSystemPermission(userpass, testuser, SystemPermission.ALTER_USER); + assertTrue(tpc.proxy().hasSystemPermission(userpass, testuser, SystemPermission.ALTER_USER)); + + tpc.proxy().revokeSystemPermission(userpass, testuser, SystemPermission.ALTER_USER); + assertFalse(tpc.proxy().hasSystemPermission(userpass, testuser, SystemPermission.ALTER_USER)); + + } + + @Test + public void auths() throws TException { + HashSet<ByteBuffer> newauths = new HashSet<ByteBuffer>(); + newauths.add(ByteBuffer.wrap("BBR".getBytes())); + newauths.add(ByteBuffer.wrap("Barney".getBytes())); + tpc.proxy().changeUserAuthorizations(userpass, testuser, newauths); + List<ByteBuffer> actualauths = tpc.proxy().getUserAuthorizations(userpass, testuser); + assertEquals(actualauths.size(), newauths.size()); + + for (ByteBuffer auth : actualauths) { + assertTrue(newauths.contains(auth)); + } + } + + private Map<String,String> bb2pp(ByteBuffer cf) { + Map<String,String> toRet = new TreeMap<String,String>(); + toRet.put("password", ByteBufferUtil.toString(cf)); + return toRet; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java new file mode 100644 index 0000000..e8d7b1e --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.proxy.Proxy; +import org.apache.accumulo.proxy.thrift.ColumnUpdate; +import org.apache.accumulo.proxy.thrift.TimeType; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.net.HostAndPort; + +public class TestProxyTableOperations { + + protected static TServer proxy; + protected static TestProxyClient tpc; + protected static ByteBuffer userpass; + protected static final int port = 10195; + protected static final String testtable = "testtable"; + + @BeforeClass + public static void setup() throws Exception { + Properties prop = new Properties(); + prop.setProperty("useMockInstance", "true"); + prop.put("tokenClass", PasswordToken.class.getName()); + + proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server; + while (!proxy.isServing()) { + Thread.sleep(500); + } + tpc = new TestProxyClient("localhost", port); + userpass = tpc.proxy().login("root", Collections.singletonMap("password", "")); + } + + @AfterClass + public static void tearDown() throws InterruptedException { + proxy.stop(); + } + + @Before + public void makeTestTable() throws Exception { + tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS); + } + + @After + public void deleteTestTable() throws Exception { + tpc.proxy().deleteTable(userpass, testtable); + } + + @Test + public void createExistsDelete() throws TException { + assertFalse(tpc.proxy().tableExists(userpass, "testtable2")); + tpc.proxy().createTable(userpass, "testtable2", true, TimeType.MILLIS); + assertTrue(tpc.proxy().tableExists(userpass, "testtable2")); + tpc.proxy().deleteTable(userpass, "testtable2"); + assertFalse(tpc.proxy().tableExists(userpass, "testtable2")); + } + + @Test + public void listRename() throws TException { + assertFalse(tpc.proxy().tableExists(userpass, "testtable2")); + tpc.proxy().renameTable(userpass, testtable, "testtable2"); + assertTrue(tpc.proxy().tableExists(userpass, "testtable2")); + tpc.proxy().renameTable(userpass, "testtable2", testtable); + assertTrue(tpc.proxy().listTables(userpass).contains("testtable")); + + } + + // This test does not yet function because the backing Mock instance does not yet support merging + @Test + public void merge() throws TException { + Set<ByteBuffer> splits = new HashSet<ByteBuffer>(); + splits.add(ByteBuffer.wrap("a".getBytes())); + splits.add(ByteBuffer.wrap("c".getBytes())); + splits.add(ByteBuffer.wrap("z".getBytes())); + tpc.proxy().addSplits(userpass, testtable, splits); + + tpc.proxy().mergeTablets(userpass, testtable, ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap("d".getBytes())); + + splits.remove(ByteBuffer.wrap("c".getBytes())); + + List<ByteBuffer> tableSplits = tpc.proxy().listSplits(userpass, testtable, 10); + + for (ByteBuffer split : tableSplits) + assertTrue(splits.contains(split)); + assertTrue(tableSplits.size() == splits.size()); + + } + + @Test + public void splits() throws TException { + Set<ByteBuffer> splits = new HashSet<ByteBuffer>(); + splits.add(ByteBuffer.wrap("a".getBytes())); + splits.add(ByteBuffer.wrap("b".getBytes())); + splits.add(ByteBuffer.wrap("z".getBytes())); + tpc.proxy().addSplits(userpass, testtable, splits); + + List<ByteBuffer> tableSplits = tpc.proxy().listSplits(userpass, testtable, 10); + + for (ByteBuffer split : tableSplits) + assertTrue(splits.contains(split)); + assertTrue(tableSplits.size() == splits.size()); + } + + @Test + public void constraints() throws TException { + int cid = tpc.proxy().addConstraint(userpass, testtable, "org.apache.accumulo.TestConstraint"); + Map<String,Integer> constraints = tpc.proxy().listConstraints(userpass, testtable); + assertEquals((int) constraints.get("org.apache.accumulo.TestConstraint"), cid); + tpc.proxy().removeConstraint(userpass, testtable, cid); + constraints = tpc.proxy().listConstraints(userpass, testtable); + assertNull(constraints.get("org.apache.accumulo.TestConstraint")); + } + + @Test + public void localityGroups() throws TException { + Map<String,Set<String>> groups = new HashMap<String,Set<String>>(); + Set<String> group1 = new HashSet<String>(); + group1.add("cf1"); + groups.put("group1", group1); + Set<String> group2 = new HashSet<String>(); + group2.add("cf2"); + group2.add("cf3"); + groups.put("group2", group2); + tpc.proxy().setLocalityGroups(userpass, testtable, groups); + + Map<String,Set<String>> actualGroups = tpc.proxy().getLocalityGroups(userpass, testtable); + + assertEquals(groups.size(), actualGroups.size()); + for (String groupName : groups.keySet()) { + assertTrue(actualGroups.containsKey(groupName)); + assertEquals(groups.get(groupName).size(), actualGroups.get(groupName).size()); + for (String cf : groups.get(groupName)) { + assertTrue(actualGroups.get(groupName).contains(cf)); + } + } + } + + @Test + public void tableProperties() throws TException { + tpc.proxy().setTableProperty(userpass, testtable, "test.property1", "wharrrgarbl"); + assertEquals(tpc.proxy().getTableProperties(userpass, testtable).get("test.property1"), "wharrrgarbl"); + tpc.proxy().removeTableProperty(userpass, testtable, "test.property1"); + assertNull(tpc.proxy().getTableProperties(userpass, testtable).get("test.property1")); + } + + private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String value) { + ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes())); + update.setValue(value.getBytes()); + mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update)); + } + + @Test + public void tableOperationsRowMethods() throws TException { + Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>(); + for (int i = 0; i < 10; i++) { + addMutation(mutations, "" + i, "cf", "cq", ""); + } + tpc.proxy().updateAndFlush(userpass, testtable, mutations); + + assertEquals(tpc.proxy().getMaxRow(userpass, testtable, null, null, true, null, true), ByteBuffer.wrap("9".getBytes())); + + tpc.proxy().deleteRows(userpass, testtable, ByteBuffer.wrap("51".getBytes()), ByteBuffer.wrap("99".getBytes())); + assertEquals(tpc.proxy().getMaxRow(userpass, testtable, null, null, true, null, true), ByteBuffer.wrap("5".getBytes())); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java b/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java new file mode 100644 index 0000000..3a1d413 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import static org.junit.Assert.assertTrue; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.LongCombiner.Type; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.minicluster.impl.ZooKeeperBindException; +import org.apache.accumulo.server.replication.ReplicaSystemFactory; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +/** + * + */ +public class CyclicReplicationIT { + private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class); + + @Rule + public Timeout getTimeout() { + int scalingFactor = 1; + try { + scalingFactor = Integer.parseInt(System.getProperty("timeout.factor")); + } catch (NumberFormatException exception) { + log.warn("Could not parse timeout.factor, not scaling timeout"); + } + + return new Timeout(scalingFactor * 5 * 60 * 1000); + } + + @Rule + public TestName testName = new TestName(); + + private File createTestDir(String name) { + File baseDir = new File(System.getProperty("user.dir") + "/target/mini-tests"); + assertTrue(baseDir.mkdirs() || baseDir.isDirectory()); + File testDir = new File(baseDir, this.getClass().getName() + "_" + testName.getMethodName() + "_" + name); + FileUtils.deleteQuietly(testDir); + assertTrue(testDir.mkdir()); + return testDir; + } + + private void setCoreSite(MiniAccumuloClusterImpl cluster) throws Exception { + File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml"); + if (csFile.exists()) + throw new RuntimeException(csFile + " already exist"); + + Configuration coreSite = new Configuration(false); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + OutputStream out = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "core-site.xml"))); + coreSite.writeXml(out); + out.close(); + } + + /** + * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication + */ + private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { + // Set the same SSL information from the primary when present + Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); + if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + Map<String,String> peerSiteConfig = new HashMap<String,String>(); + peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); + Assert.assertNotNull("Keystore Path was null", keystorePath); + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); + String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); + Assert.assertNotNull("Truststore Path was null", truststorePath); + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); + + // Passwords might be stored in CredentialProvider + String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); + if (null != keystorePassword) { + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); + } + String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); + if (null != truststorePassword) { + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + } + + System.out.println("Setting site configuration for peer " + peerSiteConfig); + peerCfg.setSiteConfig(peerSiteConfig); + } + + // Use the CredentialProvider if the primary also uses one + String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProvider) { + Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); + peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); + peerCfg.setSiteConfig(peerSiteConfig); + } + } + + @Test + public void dataIsNotOverReplicated() throws Exception { + File master1Dir = createTestDir("master1"), master2Dir = createTestDir("master2"); + String password = "password"; + + MiniAccumuloConfigImpl master1Cfg; + MiniAccumuloClusterImpl master1Cluster; + while (true) { + master1Cfg = new MiniAccumuloConfigImpl(master1Dir, password); + master1Cfg.setNumTservers(1); + master1Cfg.setInstanceName("master1"); + + // Set up SSL if needed + ConfigurableMacBase.configureForEnvironment(master1Cfg, this.getClass(), ConfigurableMacBase.getSslDir(master1Dir)); + + master1Cfg.setProperty(Property.REPLICATION_NAME, master1Cfg.getInstanceName()); + master1Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + master1Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); + master1Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + master1Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + master1Cluster = new MiniAccumuloClusterImpl(master1Cfg); + setCoreSite(master1Cluster); + + try { + master1Cluster.start(); + break; + } catch (ZooKeeperBindException e) { + log.warn("Failed to start ZooKeeper on " + master1Cfg.getZooKeeperPort() + ", will retry"); + } + } + + MiniAccumuloConfigImpl master2Cfg; + MiniAccumuloClusterImpl master2Cluster; + while (true) { + master2Cfg = new MiniAccumuloConfigImpl(master2Dir, password); + master2Cfg.setNumTservers(1); + master2Cfg.setInstanceName("master2"); + + // Set up SSL if needed. Need to share the same SSL truststore as master1 + this.updatePeerConfigFromPrimary(master1Cfg, master2Cfg); + + master2Cfg.setProperty(Property.REPLICATION_NAME, master2Cfg.getInstanceName()); + master2Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + master2Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); + master2Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + master2Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + master2Cluster = new MiniAccumuloClusterImpl(master2Cfg); + setCoreSite(master2Cluster); + + try { + master2Cluster.start(); + break; + } catch (ZooKeeperBindException e) { + log.warn("Failed to start ZooKeeper on " + master2Cfg.getZooKeeperPort() + ", will retry"); + } + } + + try { + Connector connMaster1 = master1Cluster.getConnector("root", new PasswordToken(password)), connMaster2 = master2Cluster.getConnector("root", + new PasswordToken(password)); + + String master1UserName = "master1", master1Password = "foo"; + String master2UserName = "master2", master2Password = "bar"; + String master1Table = master1Cluster.getInstanceName(), master2Table = master2Cluster.getInstanceName(); + + connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password)); + connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password)); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master2Cluster.getInstanceName(), master2UserName); + connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master2Cluster.getInstanceName(), master2Password); + + connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master1Cluster.getInstanceName(), master1UserName); + connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master1Cluster.getInstanceName(), master1Password); + + connMaster1.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + master2Cluster.getInstanceName(), + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(master2Cluster.getInstanceName(), master2Cluster.getZooKeepers()))); + + connMaster2.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + master1Cluster.getInstanceName(), + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), master1Cluster.getZooKeepers()))); + + connMaster1.tableOperations().create(master1Table, new NewTableConfiguration().withoutDefaultIterators()); + String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Table); + Assert.assertNotNull(master1TableId); + + connMaster2.tableOperations().create(master2Table, new NewTableConfiguration().withoutDefaultIterators()); + String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Table); + Assert.assertNotNull(master2TableId); + + // Replicate master1 in the master1 cluster to master2 in the master2 cluster + connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION_TARGET.getKey() + master2Cluster.getInstanceName(), master2TableId); + + // Replicate master2 in the master2 cluster to master1 in the master2 cluster + connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION_TARGET.getKey() + master1Cluster.getInstanceName(), master1TableId); + + // Give our replication user the ability to write to the respective table + connMaster1.securityOperations().grantTablePermission(master1UserName, master1Table, TablePermission.WRITE); + connMaster2.securityOperations().grantTablePermission(master2UserName, master2Table, TablePermission.WRITE); + + IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class); + SummingCombiner.setEncodingType(summingCombiner, Type.STRING); + SummingCombiner.setCombineAllColumns(summingCombiner, true); + + // Set a combiner on both instances that will sum multiple values + // We can use this to verify that the mutation was not sent multiple times + connMaster1.tableOperations().attachIterator(master1Table, summingCombiner); + connMaster2.tableOperations().attachIterator(master2Table, summingCombiner); + + // Write a single entry + BatchWriter bw = connMaster1.createBatchWriter(master1Table, new BatchWriterConfig()); + Mutation m = new Mutation("row"); + m.put("count", "", "1"); + bw.addMutation(m); + bw.close(); + + Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Table); + + log.info("Found {} that need replication from master1", files); + + // Kill and restart the tserver to close the WAL on master1 + for (ProcessReference proc : master1Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + master1Cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + master1Cluster.exec(TabletServer.class); + + log.info("Restarted tserver on master1"); + + // Try to avoid ACCUMULO-2964 + Thread.sleep(1000); + + // Sanity check that the element is there on master1 + Scanner s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + // Wait for this table to replicate + connMaster1.replicationOperations().drain(master1Table, files); + + Thread.sleep(5000); + + // Check that the element made it to master2 only once + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + // Wait for master2 to finish replicating it back + files = connMaster2.replicationOperations().referencedFiles(master2Table); + + // Kill and restart the tserver to close the WAL on master2 + for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + master2Cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + master2Cluster.exec(TabletServer.class); + + // Try to avoid ACCUMULO-2964 + Thread.sleep(1000); + + // Check that the element made it to master2 only once + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + connMaster2.replicationOperations().drain(master2Table, files); + + Thread.sleep(5000); + + // Verify that the entry wasn't sent back to master1 + s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + } finally { + master1Cluster.stop(); + master2Cluster.stop(); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java new file mode 100644 index 0000000..ab142d0 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.Credentials; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.log.WalStateManager; +import org.apache.accumulo.server.log.WalStateManager.WalState; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; + +/** + * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer may still continue to use it. Checking that no tablet references a + * WAL is insufficient to determine if a WAL will never be used in the future. + */ +public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacBase { + private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServersIT.class); + + private final int GC_PERIOD_SECONDS = 1; + + @Override + public int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s"); + // Wait longer to try to let the replication table come online before a cycle runs + cfg.setProperty(Property.GC_CYCLE_START, "10s"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + // Set really long delays for the master to do stuff for replication. We don't need + // it to be doing anything, so just let it sleep + cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s"); + cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s"); + // Pull down the maximum size of the wal so we can test close()'ing it. + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + /** + * Fetch all of the WALs referenced by tablets in the metadata table for this table + */ + private Set<String> getWalsForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Instance i = conn.getInstance(); + ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), ""); + WalStateManager wals = new WalStateManager(conn.getInstance(), zk); + + Set<String> result = new HashSet<String>(); + for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) { + log.debug("Reading WALs: {}={}", entry.getKey(), entry.getValue()); + result.add(entry.getKey().toString()); + } + return result; + } + + /** + * Fetch all of the rfiles referenced by tablets in the metadata table for this table + */ + private Set<String> getFilesForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.TabletsSection.getRange(tableId); + s.setRange(r); + s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + Set<String> rfiles = new HashSet<String>(); + for (Entry<Key,Value> entry : s) { + log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + // uri://path/to/wal + String cq = entry.getKey().getColumnQualifier().toString(); + String path = new Path(cq).toString(); + log.debug("Normalize path to rfile: {}", path); + rfiles.add(path); + } + + return rfiles; + } + + /** + * Get the replication status messages for the given table that exist in the metadata table (~repl entries) + */ + private Map<String,Status> getMetadataStatusForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.ReplicationSection.getRange(); + s.setRange(r); + s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId)); + + Map<String,Status> fileToStatus = new HashMap<String,Status>(); + for (Entry<Key,Value> entry : s) { + Text file = new Text(); + MetadataSchema.ReplicationSection.getFile(entry.getKey(), file); + Status status = Status.parseFrom(entry.getValue().get()); + log.info("Got status for {}: {}", file, ProtobufUtil.toString(status)); + fileToStatus.put(file.toString(), status); + } + + return fileToStatus; + } + + @Test + public void testActiveWalPrecludesClosing() throws Exception { + final String table = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + + // Bring the replication table online first and foremost + ReplicationTable.setOnline(conn); + + log.info("Creating {}", table); + conn.tableOperations().create(table); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + log.info("Writing a few mutations to the table"); + + BatchWriter bw = conn.createBatchWriter(table, null); + + byte[] empty = new byte[0]; + for (int i = 0; i < 5; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } + + log.info("Flushing mutations to the server"); + bw.flush(); + + log.info("Checking that metadata only has one WAL recorded for this table"); + + Set<String> wals = getWalsForTable(table); + Assert.assertEquals("Expected to only find two WALs for the table", 2, wals.size()); + + log.info("Compacting the table which will remove all WALs from the tablets"); + + // Flush our test table to remove the WAL references in it + conn.tableOperations().flush(table, null, null, true); + // Flush the metadata table too because it will have a reference to the WAL + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + + log.info("Waiting for replication table to come online"); + + log.info("Fetching replication statuses from metadata table"); + + Map<String,Status> fileToStatus = getMetadataStatusForTable(table); + + Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); + + String walName = fileToStatus.keySet().iterator().next(); + wals.retainAll(fileToStatus.keySet()); + Assert.assertEquals(1, wals.size()); + + Status status = fileToStatus.get(walName); + + Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); + + Set<String> filesForTable = getFilesForTable(table); + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); + log.info("Files for table before MajC: {}", filesForTable); + + // Issue a MajC to roll a new file in HDFS + conn.tableOperations().compact(table, null, null, false, true); + + Set<String> filesForTableAfterCompaction = getFilesForTable(table); + + log.info("Files for table after MajC: {}", filesForTableAfterCompaction); + + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); + Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); + + // Use the rfile which was just replaced by the MajC to determine when the GC has ran + Path fileToBeDeleted = new Path(filesForTable.iterator().next()); + FileSystem fs = getCluster().getFileSystem(); + + boolean fileExists = fs.exists(fileToBeDeleted); + while (fileExists) { + log.info("File which should get deleted still exists: {}", fileToBeDeleted); + Thread.sleep(2000); + fileExists = fs.exists(fileToBeDeleted); + } + + Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); + Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); + + Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); + } + + @Test(timeout = 2 * 60 * 1000) + public void testUnreferencedWalInTserverIsClosed() throws Exception { + final String[] names = getUniqueNames(2); + // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver + final String table = names[0], otherTable = names[1]; + final Connector conn = getConnector(); + + // Bring the replication table online first and foremost + ReplicationTable.setOnline(conn); + + log.info("Creating {}", table); + conn.tableOperations().create(table); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + log.info("Writing a few mutations to the table"); + + BatchWriter bw = conn.createBatchWriter(table, null); + + byte[] empty = new byte[0]; + for (int i = 0; i < 5; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } + + log.info("Flushing mutations to the server"); + bw.close(); + + log.info("Checking that metadata only has one WAL recorded for this table"); + + Set<String> wals = getWalsForTable(table); + Assert.assertEquals("Expected to only find two WAL for the table", 2, wals.size()); + + log.info("Compacting the table which will remove all WALs from the tablets"); + + // Flush our test table to remove the WAL references in it + conn.tableOperations().flush(table, null, null, true); + // Flush the metadata table too because it will have a reference to the WAL + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + + log.info("Fetching replication statuses from metadata table"); + + Map<String,Status> fileToStatus = getMetadataStatusForTable(table); + + Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); + + String walName = fileToStatus.keySet().iterator().next(); + Assert.assertTrue("Expected log file name from tablet to equal replication entry", wals.contains(walName)); + + Status status = fileToStatus.get(walName); + + Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); + + Set<String> filesForTable = getFilesForTable(table); + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); + log.info("Files for table before MajC: {}", filesForTable); + + // Issue a MajC to roll a new file in HDFS + conn.tableOperations().compact(table, null, null, false, true); + + Set<String> filesForTableAfterCompaction = getFilesForTable(table); + + log.info("Files for table after MajC: {}", filesForTableAfterCompaction); + + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); + Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); + + // Use the rfile which was just replaced by the MajC to determine when the GC has ran + Path fileToBeDeleted = new Path(filesForTable.iterator().next()); + FileSystem fs = getCluster().getFileSystem(); + + boolean fileExists = fs.exists(fileToBeDeleted); + while (fileExists) { + log.info("File which should get deleted still exists: {}", fileToBeDeleted); + Thread.sleep(2000); + fileExists = fs.exists(fileToBeDeleted); + } + + // At this point in time, we *know* that the GarbageCollector has run which means that the Status + // for our WAL should not be altered. + + Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); + Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); + + /* + * To verify that the WALs is still getting closed, we have to force the tserver to close the existing WAL and open a new one instead. The easiest way to do + * this is to write a load of data that will exceed the 1.33% full threshold that the logger keeps track of + */ + + conn.tableOperations().create(otherTable); + bw = conn.createBatchWriter(otherTable, null); + // 500k + byte[] bigValue = new byte[1024 * 500]; + Arrays.fill(bigValue, (byte) 1); + // 500k * 50 + for (int i = 0; i < 50; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, bigValue); + bw.addMutation(m); + if (i % 10 == 0) { + bw.flush(); + } + } + + bw.close(); + + conn.tableOperations().flush(otherTable, null, null, true); + + // Get the tservers which the master deems as active + final ClientContext context = new ClientContext(conn.getInstance(), new Credentials("root", new PasswordToken(ConfigurableMacBase.ROOT_PASSWORD)), + getClientConfig()); + List<String> tservers = MasterClient.execute(context, new ClientExecReturn<List<String>,MasterClientService.Client>() { + @Override + public List<String> execute(MasterClientService.Client client) throws Exception { + return client.getActiveTservers(Tracer.traceInfo(), context.rpcCreds()); + } + }); + + Assert.assertEquals("Expected only one active tservers", 1, tservers.size()); + + HostAndPort tserver = HostAndPort.fromString(tservers.get(0)); + + // Get the active WALs from that server + log.info("Fetching active WALs from {}", tserver); + + Client client = ThriftUtil.getTServerClient(tserver, context); + List<String> activeWalsForTserver = client.getActiveLogs(Tracer.traceInfo(), context.rpcCreds()); + + log.info("Active wals: {}", activeWalsForTserver); + + Assert.assertEquals("Expected to find only one active WAL", 1, activeWalsForTserver.size()); + + String activeWal = new Path(activeWalsForTserver.get(0)).toString(); + + Assert.assertNotEquals("Current active WAL on tserver should not be the original WAL we saw", walName, activeWal); + + log.info("Ensuring that replication status does get closed after WAL is no longer in use by Tserver"); + + do { + Map<String,Status> replicationStatuses = getMetadataStatusForTable(table); + + log.info("Got replication status messages {}", replicationStatuses); + Assert.assertEquals("Did not expect to find additional status records", 1, replicationStatuses.size()); + + status = replicationStatuses.values().iterator().next(); + log.info("Current status: {}", ProtobufUtil.toString(status)); + + if (status.getClosed()) { + return; + } + + log.info("Status is not yet closed, waiting for garbage collector to close it"); + + Thread.sleep(2000); + } while (true); + } +}