http://git-wip-us.apache.org/repos/asf/accumulo/blob/a4afd1bf/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java index 30200ec,0000000..a799cb3 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@@ -1,510 -1,0 +1,514 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URL; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.PrintInfo; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.MonitorUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestMultiTableIngest; +import org.apache.accumulo.test.VerifyIngest; ++import org.apache.accumulo.test.categories.StandaloneCapableClusterTests; ++import org.apache.accumulo.test.categories.SunnyDayTests; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.Test; ++import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + ++@Category({StandaloneCapableClusterTests.class, SunnyDayTests.class}) +public class ReadWriteIT extends AccumuloClusterHarness { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + } + + private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class); + + static final int ROWS = 100000; + static final int COLS = 1; + static final String COLF = "colf"; + + @Override + protected int defaultTimeoutSeconds() { + return 6 * 60; + } + + @Test(expected = RuntimeException.class) + public void invalidInstanceName() throws Exception { + final Connector conn = getConnector(); + new ZooKeeperInstance("fake_instance_name", conn.getInstance().getZooKeepers()); + } + + @Test + public void sunnyDay() throws Exception { + // Start accumulo, create a table, insert some data, verify we can read it out. + // Shutdown cleanly. + log.debug("Starting Monitor"); + cluster.getClusterControl().startAllServers(ServerType.MONITOR); + Connector connector = getConnector(); + String tableName = getUniqueNames(1)[0]; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS, COLS, 50, 0, tableName); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS, COLS, 50, 0, tableName); + String monitorLocation = null; + while (null == monitorLocation) { + monitorLocation = MonitorUtil.getLocation(getConnector().getInstance()); + if (null == monitorLocation) { + log.debug("Could not fetch monitor HTTP address from zookeeper"); + Thread.sleep(2000); + } + } + String scheme = "http://"; + if (getCluster() instanceof StandaloneAccumuloCluster) { + StandaloneAccumuloCluster standaloneCluster = (StandaloneAccumuloCluster) getCluster(); + File accumuloSite = new File(standaloneCluster.getServerAccumuloConfDir(), "accumulo-site.xml"); + if (accumuloSite.isFile()) { + Configuration conf = new Configuration(false); + conf.addResource(new Path(accumuloSite.toURI())); + String monitorSslKeystore = conf.get(Property.MONITOR_SSL_KEYSTORE.getKey()); + if (null != monitorSslKeystore) { + log.info("Setting scheme to HTTPS since monitor ssl keystore configuration was observed in {}", accumuloSite); + scheme = "https://"; + SSLContext ctx = SSLContext.getInstance("SSL"); + TrustManager[] tm = new TrustManager[] {new TestTrustManager()}; + ctx.init(new KeyManager[0], tm, new SecureRandom()); + SSLContext.setDefault(ctx); + HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory()); + HttpsURLConnection.setDefaultHostnameVerifier(new TestHostnameVerifier()); + } + } else { + log.info("{} is not a normal file, not checking for monitor running with SSL", accumuloSite); + } + } + URL url = new URL(scheme + monitorLocation); + log.debug("Fetching web page " + url); + String result = FunctionalTestUtils.readAll(url.openStream()); + assertTrue(result.length() > 100); + log.debug("Stopping accumulo cluster"); + ClusterControl control = cluster.getClusterControl(); + control.adminStopAll(); + ZooReader zreader = new ZooReader(connector.getInstance().getZooKeepers(), connector.getInstance().getZooKeepersSessionTimeOut()); + ZooCache zcache = new ZooCache(zreader, null); + byte[] masterLockData; + do { + masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(connector.getInstance()) + Constants.ZMASTER_LOCK, null); + if (null != masterLockData) { + log.info("Master lock is still held"); + Thread.sleep(1000); + } + } while (null != masterLockData); + + control.stopAllServers(ServerType.GARBAGE_COLLECTOR); + control.stopAllServers(ServerType.MONITOR); + control.stopAllServers(ServerType.TRACER); + log.debug("success!"); + // Restarting everything + cluster.start(); + } + + public static void ingest(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String tableName) + throws Exception { + ingest(connector, clientConfig, principal, rows, cols, width, offset, COLF, tableName); + } + + public static void ingest(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String colf, + String tableName) throws Exception { + TestIngest.Opts opts = new TestIngest.Opts(); + opts.rows = rows; + opts.cols = cols; + opts.dataSize = width; + opts.startRow = offset; + opts.columnFamily = colf; + opts.createTable = true; + opts.setTableName(tableName); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(principal); + } + + TestIngest.ingest(connector, opts, new BatchWriterOpts()); + } + + public static void verify(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String tableName) + throws Exception { + verify(connector, clientConfig, principal, rows, cols, width, offset, COLF, tableName); + } + + private static void verify(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String colf, + String tableName) throws Exception { + ScannerOpts scannerOpts = new ScannerOpts(); + VerifyIngest.Opts opts = new VerifyIngest.Opts(); + opts.rows = rows; + opts.cols = cols; + opts.dataSize = width; + opts.startRow = offset; + opts.columnFamily = colf; + opts.setTableName(tableName); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(principal); + } + + VerifyIngest.verifyIngest(connector, opts, scannerOpts); + } + + public static String[] args(String... args) { + return args; + } + + @Test + public void multiTableTest() throws Exception { + // Write to multiple tables + final String instance = cluster.getInstanceName(); + final String keepers = cluster.getZooKeepers(); + final ClusterControl control = cluster.getClusterControl(); + final String prefix = getClass().getSimpleName() + "_" + testName.getMethodName(); + ExecutorService svc = Executors.newFixedThreadPool(2); + Future<Integer> p1 = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + ClientConfiguration clientConf = cluster.getClientConfig(); + // Invocation is different for SASL. We're only logged in via this processes memory (not via some credentials cache on disk) + // Need to pass along the keytab because of that. + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + String principal = getAdminPrincipal(); + AuthenticationToken token = getAdminToken(); + assertTrue("Expected KerberosToken, but was " + token.getClass(), token instanceof KerberosToken); + KerberosToken kt = (KerberosToken) token; + assertNotNull("Expected keytab in token", kt.getKeytab()); + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "-i", instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab().getAbsolutePath(), + "-u", principal)); + } + + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "-u", getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String( + ((PasswordToken) getAdminToken()).getPassword(), UTF_8), "--tablePrefix", prefix)); + } catch (IOException e) { + log.error("Error running MultiTableIngest", e); + return -1; + } + } + }); + Future<Integer> p2 = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + ClientConfiguration clientConf = cluster.getClientConfig(); + // Invocation is different for SASL. We're only logged in via this processes memory (not via some credentials cache on disk) + // Need to pass along the keytab because of that. + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + String principal = getAdminPrincipal(); + AuthenticationToken token = getAdminToken(); + assertTrue("Expected KerberosToken, but was " + token.getClass(), token instanceof KerberosToken); + KerberosToken kt = (KerberosToken) token; + assertNotNull("Expected keytab in token", kt.getKeytab()); + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "--readonly", "-i", instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab() + .getAbsolutePath(), "-u", principal)); + } + + return control.exec( + TestMultiTableIngest.class, + args("--count", Integer.toString(ROWS), "--readonly", "-u", getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String( + ((PasswordToken) getAdminToken()).getPassword(), UTF_8), "--tablePrefix", prefix)); + } catch (IOException e) { + log.error("Error running MultiTableIngest", e); + return -1; + } + } + }); + svc.shutdown(); + while (!svc.isTerminated()) { + svc.awaitTermination(15, TimeUnit.SECONDS); + } + assertEquals(0, p1.get().intValue()); + assertEquals(0, p2.get().intValue()); + } + + @Test + public void largeTest() throws Exception { + // write a few large values + Connector connector = getConnector(); + String table = getUniqueNames(1)[0]; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 1, 500000, 0, table); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 1, 500000, 0, table); + } + + @Test + public void interleaved() throws Exception { + // read and write concurrently + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + interleaveTest(connector, tableName); + } + + static void interleaveTest(final Connector connector, final String tableName) throws Exception { + final AtomicBoolean fail = new AtomicBoolean(false); + final int CHUNKSIZE = ROWS / 10; + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, 0, tableName); + int i; + for (i = 0; i < ROWS; i += CHUNKSIZE) { + final int start = i; + Thread verify = new Thread() { + @Override + public void run() { + try { + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, start, tableName); + } catch (Exception ex) { + fail.set(true); + } + } + }; + verify.start(); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, i + CHUNKSIZE, tableName); + verify.join(); + assertFalse(fail.get()); + } + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, i, tableName); + } + + public static Text t(String s) { + return new Text(s); + } + + public static Mutation m(String row, String cf, String cq, String value) { + Mutation m = new Mutation(t(row)); + m.put(t(cf), t(cq), new Value(value.getBytes())); + return m; + } + + @Test + public void localityGroupPerf() throws Exception { + // verify that locality groups can make look-ups faster + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + connector.tableOperations().setProperty(tableName, "table.group.g1", "colf"); + connector.tableOperations().setProperty(tableName, "table.groups.enabled", "g1"); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + connector.tableOperations().compact(tableName, null, null, true, true); + BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); + bw.addMutation(m("zzzzzzzzzzz", "colf2", "cq", "value")); + bw.close(); + long now = System.currentTimeMillis(); + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + scanner.fetchColumnFamily(new Text("colf")); + Iterators.size(scanner.iterator()); + long diff = System.currentTimeMillis() - now; + now = System.currentTimeMillis(); + scanner = connector.createScanner(tableName, Authorizations.EMPTY); + scanner.fetchColumnFamily(new Text("colf2")); + Iterators.size(scanner.iterator()); + bw.close(); + long diff2 = System.currentTimeMillis() - now; + assertTrue(diff2 < diff); + } + + @Test + public void sunnyLG() throws Exception { + // create a locality group, write to it and ensure it exists in the RFiles that result + final Connector connector = getConnector(); + final String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + Map<String,Set<Text>> groups = new TreeMap<>(); + groups.put("g1", Collections.singleton(t("colf"))); + connector.tableOperations().setLocalityGroups(tableName, groups); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); + connector.tableOperations().flush(tableName, null, null, true); + try (BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 1)) { + String tableId = connector.tableOperations().tableIdMap().get(tableName); + bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + ";"), new Text(tableId + "<")))); + bscanner.fetchColumnFamily(DataFileColumnFamily.NAME); + boolean foundFile = false; + for (Entry<Key,Value> entry : bscanner) { + foundFile = true; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream newOut = new PrintStream(baos); + PrintStream oldOut = System.out; + try { + System.setOut(newOut); + List<String> args = new ArrayList<>(); + args.add(entry.getKey().getColumnQualifier().toString()); + if (ClusterType.STANDALONE == getClusterType() && cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + args.add("--config"); + StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster; + String hadoopConfDir = sac.getHadoopConfDir(); + args.add(new Path(hadoopConfDir, "core-site.xml").toString()); + args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString()); + } + log.info("Invoking PrintInfo with " + args); + PrintInfo.main(args.toArray(new String[args.size()])); + newOut.flush(); + String stdout = baos.toString(); + assertTrue(stdout.contains("Locality group : g1")); + assertTrue(stdout.contains("families : [colf]")); + } finally { + newOut.close(); + System.setOut(oldOut); + } + } + assertTrue(foundFile); + } + } + + @Test + public void localityGroupChange() throws Exception { + // Make changes to locality groups and ensure nothing is lost + final Connector connector = getConnector(); + String table = getUniqueNames(1)[0]; + TableOperations to = connector.tableOperations(); + to.create(table); + String[] config = new String[] {"lg1:colf", null, "lg1:colf,xyz", "lg1:colf,xyz;lg2:c1,c2"}; + int i = 0; + for (String cfg : config) { + to.setLocalityGroups(table, getGroups(cfg)); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * (i + 1), 1, 50, ROWS * i, table); + to.flush(table, null, null, true); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 0, 1, 50, ROWS * (i + 1), table); + i++; + } + to.delete(table); + to.create(table); + config = new String[] {"lg1:colf", null, "lg1:colf,xyz", "lg1:colf;lg2:colf",}; + i = 1; + for (String cfg : config) { + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, table); + ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, "xyz", table); + to.setLocalityGroups(table, getGroups(cfg)); + to.flush(table, null, null, true); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, table); + verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, "xyz", table); + i++; + } + } + + private Map<String,Set<Text>> getGroups(String cfg) { + Map<String,Set<Text>> groups = new TreeMap<>(); + if (cfg != null) { + for (String group : cfg.split(";")) { + String[] parts = group.split(":"); + Set<Text> cols = new HashSet<>(); + for (String col : parts[1].split(",")) { + cols.add(t(col)); + } + groups.put(parts[1], cols); + } + } + return groups; + } + + private static class TestTrustManager implements X509TrustManager { + @Override + public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {} + + @Override + public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {} + + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + } + + private static class TestHostnameVerifier implements HostnameVerifier { + @Override + public boolean verify(String hostname, SSLSession session) { + return true; + } + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a4afd1bf/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/TableIT.java index 22fbf18,0000000..d3036f7 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java @@@ -1,110 -1,0 +1,110 @@@ +package org.apache.accumulo.test.functional; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; + +import org.apache.accumulo.cluster.AccumuloCluster; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; - import org.apache.accumulo.test.categories.MiniClusterOnlyTest; ++import org.apache.accumulo.test.categories.MiniClusterOnlyTests; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.hamcrest.CoreMatchers; +import org.junit.Assume; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Iterators; + - @Category(MiniClusterOnlyTest.class) ++@Category(MiniClusterOnlyTests.class) +public class TableIT extends AccumuloClusterHarness { + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test + public void test() throws Exception { + Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI)); + + AccumuloCluster cluster = getCluster(); + MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster; + String rootPath = mac.getConfig().getDir().getAbsolutePath(); + + Connector c = getConnector(); + TableOperations to = c.tableOperations(); + String tableName = getUniqueNames(1)[0]; + to.create(tableName); + + TestIngest.Opts opts = new TestIngest.Opts(); + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + ClientConfiguration clientConfig = getCluster().getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + vopts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(getAdminPrincipal()); + vopts.setPrincipal(getAdminPrincipal()); + } + + opts.setTableName(tableName); + TestIngest.ingest(c, opts, new BatchWriterOpts()); + to.flush(tableName, null, null, true); + vopts.setTableName(tableName); + VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); + String id = to.tableIdMap().get(tableName); + Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(new KeyExtent(id, null, null).toMetadataRange()); + s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + assertTrue(Iterators.size(s.iterator()) > 0); + + FileSystem fs = getCluster().getFileSystem(); + assertTrue(fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)).length > 0); + to.delete(tableName); + assertEquals(0, Iterators.size(s.iterator())); + try { + assertEquals(0, fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)).length); + } catch (FileNotFoundException ex) { + // that's fine, too + } + assertNull(to.tableIdMap().get(tableName)); + to.create(tableName); + TestIngest.ingest(c, opts, new BatchWriterOpts()); + VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); + to.delete(tableName); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a4afd1bf/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java index 157574b,0000000..e985bf8 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/proxy/TCompactProxyIT.java @@@ -1,34 -1,0 +1,35 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.proxy; + +import org.apache.accumulo.harness.SharedMiniClusterBase; ++import org.apache.accumulo.test.categories.MiniClusterOnlyTests; ++import org.apache.accumulo.test.categories.SunnyDayTests; +import org.apache.thrift.protocol.TCompactProtocol; +import org.junit.BeforeClass; ++import org.junit.experimental.categories.Category; + - /** - * - */ ++@Category({MiniClusterOnlyTests.class, SunnyDayTests.class}) +public class TCompactProxyIT extends SimpleProxyBase { + + @BeforeClass + public static void setProtocol() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + SimpleProxyBase.factory = new TCompactProtocol.Factory(); + setUpProxy(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a4afd1bf/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java index 8603cd6,0000000..8e8cc2c mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java @@@ -1,332 -1,0 +1,333 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import static org.junit.Assert.assertTrue; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; ++import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.LongCombiner.Type; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.minicluster.impl.ZooKeeperBindException; +import org.apache.accumulo.server.replication.ReplicaSystemFactory; ++import org.apache.accumulo.test.categories.MiniClusterOnlyTests; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; ++import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + - /** - * - */ ++@Category(MiniClusterOnlyTests.class) +public class CyclicReplicationIT { + private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class); + + @Rule + public Timeout getTimeout() { + int scalingFactor = 1; + try { + scalingFactor = Integer.parseInt(System.getProperty("timeout.factor")); + } catch (NumberFormatException exception) { + log.warn("Could not parse timeout.factor, not scaling timeout"); + } + - return Timeout.millis(scalingFactor * 10 * 60 * 1000); ++ return new Timeout(scalingFactor * 10, TimeUnit.MINUTES); + } + + @Rule + public TestName testName = new TestName(); + + private File createTestDir(String name) { + File baseDir = new File(System.getProperty("user.dir") + "/target/mini-tests"); + assertTrue(baseDir.mkdirs() || baseDir.isDirectory()); + File testDir = new File(baseDir, this.getClass().getName() + "_" + testName.getMethodName() + "_" + name); + FileUtils.deleteQuietly(testDir); + assertTrue(testDir.mkdir()); + return testDir; + } + + private void setCoreSite(MiniAccumuloClusterImpl cluster) throws Exception { + File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml"); + if (csFile.exists()) + throw new RuntimeException(csFile + " already exist"); + + Configuration coreSite = new Configuration(false); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + OutputStream out = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "core-site.xml"))); + coreSite.writeXml(out); + out.close(); + } + + /** + * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication + */ + private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { + // Set the same SSL information from the primary when present + Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); + if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + Map<String,String> peerSiteConfig = new HashMap<>(); + peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); + Assert.assertNotNull("Keystore Path was null", keystorePath); + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); + String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); + Assert.assertNotNull("Truststore Path was null", truststorePath); + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); + + // Passwords might be stored in CredentialProvider + String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); + if (null != keystorePassword) { + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); + } + String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); + if (null != truststorePassword) { + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + } + + System.out.println("Setting site configuration for peer " + peerSiteConfig); + peerCfg.setSiteConfig(peerSiteConfig); + } + + // Use the CredentialProvider if the primary also uses one + String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProvider) { + Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); + peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); + peerCfg.setSiteConfig(peerSiteConfig); + } + } + + @Test + public void dataIsNotOverReplicated() throws Exception { + File master1Dir = createTestDir("master1"), master2Dir = createTestDir("master2"); + String password = "password"; + + MiniAccumuloConfigImpl master1Cfg; + MiniAccumuloClusterImpl master1Cluster; + while (true) { + master1Cfg = new MiniAccumuloConfigImpl(master1Dir, password); + master1Cfg.setNumTservers(1); + master1Cfg.setInstanceName("master1"); + + // Set up SSL if needed + ConfigurableMacBase.configureForEnvironment(master1Cfg, this.getClass(), ConfigurableMacBase.getSslDir(master1Dir)); + + master1Cfg.setProperty(Property.REPLICATION_NAME, master1Cfg.getInstanceName()); + master1Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + master1Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); + master1Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + master1Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + master1Cluster = new MiniAccumuloClusterImpl(master1Cfg); + setCoreSite(master1Cluster); + + try { + master1Cluster.start(); + break; + } catch (ZooKeeperBindException e) { + log.warn("Failed to start ZooKeeper on " + master1Cfg.getZooKeeperPort() + ", will retry"); + } + } + + MiniAccumuloConfigImpl master2Cfg; + MiniAccumuloClusterImpl master2Cluster; + while (true) { + master2Cfg = new MiniAccumuloConfigImpl(master2Dir, password); + master2Cfg.setNumTservers(1); + master2Cfg.setInstanceName("master2"); + + // Set up SSL if needed. Need to share the same SSL truststore as master1 + this.updatePeerConfigFromPrimary(master1Cfg, master2Cfg); + + master2Cfg.setProperty(Property.REPLICATION_NAME, master2Cfg.getInstanceName()); + master2Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + master2Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); + master2Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + master2Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + master2Cluster = new MiniAccumuloClusterImpl(master2Cfg); + setCoreSite(master2Cluster); + + try { + master2Cluster.start(); + break; + } catch (ZooKeeperBindException e) { + log.warn("Failed to start ZooKeeper on " + master2Cfg.getZooKeeperPort() + ", will retry"); + } + } + + try { + Connector connMaster1 = master1Cluster.getConnector("root", new PasswordToken(password)), connMaster2 = master2Cluster.getConnector("root", + new PasswordToken(password)); + + String master1UserName = "master1", master1Password = "foo"; + String master2UserName = "master2", master2Password = "bar"; + String master1Table = master1Cluster.getInstanceName(), master2Table = master2Cluster.getInstanceName(); + + connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password)); + connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password)); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master2Cluster.getInstanceName(), master2UserName); + connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master2Cluster.getInstanceName(), master2Password); + + connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master1Cluster.getInstanceName(), master1UserName); + connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master1Cluster.getInstanceName(), master1Password); + + connMaster1.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + master2Cluster.getInstanceName(), + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(master2Cluster.getInstanceName(), master2Cluster.getZooKeepers()))); + + connMaster2.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + master1Cluster.getInstanceName(), + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), master1Cluster.getZooKeepers()))); + + connMaster1.tableOperations().create(master1Table, new NewTableConfiguration().withoutDefaultIterators()); + String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Table); + Assert.assertNotNull(master1TableId); + + connMaster2.tableOperations().create(master2Table, new NewTableConfiguration().withoutDefaultIterators()); + String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Table); + Assert.assertNotNull(master2TableId); + + // Replicate master1 in the master1 cluster to master2 in the master2 cluster + connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION_TARGET.getKey() + master2Cluster.getInstanceName(), master2TableId); + + // Replicate master2 in the master2 cluster to master1 in the master2 cluster + connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION_TARGET.getKey() + master1Cluster.getInstanceName(), master1TableId); + + // Give our replication user the ability to write to the respective table + connMaster1.securityOperations().grantTablePermission(master1UserName, master1Table, TablePermission.WRITE); + connMaster2.securityOperations().grantTablePermission(master2UserName, master2Table, TablePermission.WRITE); + + IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class); + SummingCombiner.setEncodingType(summingCombiner, Type.STRING); + SummingCombiner.setCombineAllColumns(summingCombiner, true); + + // Set a combiner on both instances that will sum multiple values + // We can use this to verify that the mutation was not sent multiple times + connMaster1.tableOperations().attachIterator(master1Table, summingCombiner); + connMaster2.tableOperations().attachIterator(master2Table, summingCombiner); + + // Write a single entry + BatchWriter bw = connMaster1.createBatchWriter(master1Table, new BatchWriterConfig()); + Mutation m = new Mutation("row"); + m.put("count", "", "1"); + bw.addMutation(m); + bw.close(); + + Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Table); + + log.info("Found {} that need replication from master1", files); + + // Kill and restart the tserver to close the WAL on master1 + for (ProcessReference proc : master1Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + master1Cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + master1Cluster.exec(TabletServer.class); + + log.info("Restarted tserver on master1"); + + // Try to avoid ACCUMULO-2964 + Thread.sleep(1000); + + // Sanity check that the element is there on master1 + Scanner s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + // Wait for this table to replicate + connMaster1.replicationOperations().drain(master1Table, files); + + Thread.sleep(5000); + + // Check that the element made it to master2 only once + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + // Wait for master2 to finish replicating it back + files = connMaster2.replicationOperations().referencedFiles(master2Table); + + // Kill and restart the tserver to close the WAL on master2 + for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + master2Cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + master2Cluster.exec(TabletServer.class); + + // Try to avoid ACCUMULO-2964 + Thread.sleep(1000); + + // Check that the element made it to master2 only once + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + connMaster2.replicationOperations().drain(master2Table, files); + + Thread.sleep(5000); + + // Verify that the entry wasn't sent back to master1 + s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + } finally { + master1Cluster.stop(); + master2Cluster.stop(); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a4afd1bf/test/src/main/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java index 32df894,0000000..b439afc mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java @@@ -1,246 -1,0 +1,246 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.security.PrivilegedExceptionAction; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.harness.AccumuloITBase; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.TestingKdc; +import org.apache.accumulo.master.replication.SequentialWorkAssigner; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.replication.ReplicaSystemFactory; - import org.apache.accumulo.test.categories.MiniClusterOnlyTest; ++import org.apache.accumulo.test.categories.MiniClusterOnlyTests; +import org.apache.accumulo.test.functional.KerberosIT; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +/** + * Ensure that replication occurs using keytabs instead of password (not to mention SASL) + */ - @Category(MiniClusterOnlyTest.class) ++@Category(MiniClusterOnlyTests.class) +public class KerberosReplicationIT extends AccumuloITBase { + private static final Logger log = LoggerFactory.getLogger(KerberosIT.class); + + private static TestingKdc kdc; + private static String krbEnabledForITs = null; + private static ClusterUser rootUser; + + @BeforeClass + public static void startKdc() throws Exception { + kdc = new TestingKdc(); + kdc.start(); + krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION); + if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true"); + } + rootUser = kdc.getRootUser(); + } + + @AfterClass + public static void stopKdc() throws Exception { + if (null != kdc) { + kdc.stop(); + } + if (null != krbEnabledForITs) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs); + } + } + + private MiniAccumuloClusterImpl primary, peer; + private String PRIMARY_NAME = "primary", PEER_NAME = "peer"; + + @Override + protected int defaultTimeoutSeconds() { + return 60 * 3; + } + + private MiniClusterConfigurationCallback getConfigCallback(final String name) { + return new MiniClusterConfigurationCallback() { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + cfg.setProperty(Property.REPLICATION_NAME, name); + cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName()); + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + coreSite.set("fs.defaultFS", "file:///"); + } + }; + } + + @Before + public void setup() throws Exception { + MiniClusterHarness harness = new MiniClusterHarness(); + + // Create a primary and a peer instance, both with the same "root" user + primary = harness.create(getClass().getName(), testName.getMethodName(), new PasswordToken("unused"), getConfigCallback(PRIMARY_NAME), kdc); + primary.start(); + + peer = harness.create(getClass().getName(), testName.getMethodName() + "_peer", new PasswordToken("unused"), getConfigCallback(PEER_NAME), kdc); + peer.start(); + + // Enable kerberos auth + Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + @After + public void teardown() throws Exception { + if (null != peer) { + peer.stop(); + } + if (null != primary) { + primary.stop(); + } + UserGroupInformation.setConfiguration(new Configuration(false)); + } + + @Test + public void dataReplicatedToCorrectTable() throws Exception { + // Login as the root user + final UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().toURI().toString()); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + log.info("testing {}", ugi); + final KerberosToken token = new KerberosToken(); + final Connector primaryConn = primary.getConnector(rootUser.getPrincipal(), token); + final Connector peerConn = peer.getConnector(rootUser.getPrincipal(), token); + + ClusterUser replicationUser = kdc.getClientPrincipal(0); + + // Create user for replication to the peer + peerConn.securityOperations().createLocalUser(replicationUser.getPrincipal(), null); + + primaryConn.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + PEER_NAME, replicationUser.getPrincipal()); + primaryConn.instanceOperations().setProperty(Property.REPLICATION_PEER_KEYTAB.getKey() + PEER_NAME, replicationUser.getKeytab().getAbsolutePath()); + + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers + primaryConn.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + PEER_NAME, + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(peerConn.getInstance().getInstanceName(), peerConn.getInstance().getZooKeepers()))); + + String primaryTable1 = "primary", peerTable1 = "peer"; + + // Create tables + primaryConn.tableOperations().create(primaryTable1); + String masterTableId1 = primaryConn.tableOperations().tableIdMap().get(primaryTable1); + Assert.assertNotNull(masterTableId1); + + peerConn.tableOperations().create(peerTable1); + String peerTableId1 = peerConn.tableOperations().tableIdMap().get(peerTable1); + Assert.assertNotNull(peerTableId1); + + // Grant write permission + peerConn.securityOperations().grantTablePermission(replicationUser.getPrincipal(), peerTable1, TablePermission.WRITE); + + // Replicate this table to the peerClusterName in a table with the peerTableId table id + primaryConn.tableOperations().setProperty(primaryTable1, Property.TABLE_REPLICATION.getKey(), "true"); + primaryConn.tableOperations().setProperty(primaryTable1, Property.TABLE_REPLICATION_TARGET.getKey() + PEER_NAME, peerTableId1); + + // Write some data to table1 + BatchWriter bw = primaryConn.createBatchWriter(primaryTable1, new BatchWriterConfig()); + long masterTable1Records = 0l; + for (int rows = 0; rows < 2500; rows++) { + Mutation m = new Mutation(primaryTable1 + rows); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + masterTable1Records++; + } + bw.addMutation(m); + } + + bw.close(); + + log.info("Wrote all data to primary cluster"); + + Set<String> filesFor1 = primaryConn.replicationOperations().referencedFiles(primaryTable1); + + // Restart the tserver to force a close on the WAL + for (ProcessReference proc : primary.getProcesses().get(ServerType.TABLET_SERVER)) { + primary.killProcess(ServerType.TABLET_SERVER, proc); + } + primary.exec(TabletServer.class); + + log.info("Restarted the tserver"); + + // Read the data -- the tserver is back up and running and tablets are assigned + Iterators.size(primaryConn.createScanner(primaryTable1, Authorizations.EMPTY).iterator()); + + // Wait for both tables to be replicated + log.info("Waiting for {} for {}", filesFor1, primaryTable1); + primaryConn.replicationOperations().drain(primaryTable1, filesFor1); + + long countTable = 0l; + for (Entry<Key,Value> entry : peerConn.createScanner(peerTable1, Authorizations.EMPTY)) { + countTable++; + Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() + .startsWith(primaryTable1)); + } + + log.info("Found {} records in {}", countTable, peerTable1); + Assert.assertEquals(masterTable1Records, countTable); + + return null; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/a4afd1bf/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java index 9fc8927,0000000..e1578b3 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java @@@ -1,202 -1,0 +1,205 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.start; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.accumulo.core.file.rfile.PrintInfo; +import org.apache.accumulo.core.util.Classpath; +import org.apache.accumulo.core.util.CreateToken; +import org.apache.accumulo.core.util.Help; +import org.apache.accumulo.core.util.Jar; +import org.apache.accumulo.core.util.Version; +import org.apache.accumulo.gc.GCExecutable; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.master.MasterExecutable; +import org.apache.accumulo.minicluster.MiniAccumuloRunner; +import org.apache.accumulo.minicluster.impl.MiniClusterExecutable; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.monitor.MonitorExecutable; +import org.apache.accumulo.proxy.Proxy; +import org.apache.accumulo.server.conf.ConfigSanityCheck; +import org.apache.accumulo.server.init.Initialize; +import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.server.util.Info; +import org.apache.accumulo.server.util.LoginProperties; +import org.apache.accumulo.server.util.ZooKeeperMain; +import org.apache.accumulo.shell.Shell; +import org.apache.accumulo.start.Main; +import org.apache.accumulo.start.spi.KeywordExecutable; ++import org.apache.accumulo.test.categories.SunnyDayTests; +import org.apache.accumulo.tracer.TraceServer; +import org.apache.accumulo.tracer.TracerExecutable; +import org.apache.accumulo.tserver.TServerExecutable; +import org.apache.accumulo.tserver.TabletServer; +import org.junit.Test; ++import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + ++@Category(SunnyDayTests.class) +public class KeywordStartIT { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Test + public void testKeywordsMatch() throws IOException { + for (Entry<String,KeywordExecutable> entry : Main.getExecutables(getClass().getClassLoader()).entrySet()) { + assertEquals(entry.getKey(), entry.getValue().keyword()); + } + } + + @Test + public void testCheckDuplicates() { + NoOp one = new NoOp("one"); + NoOp anotherOne = new NoOp("another"); + NoOp two = new NoOp("two"); + NoOp three = new NoOp("three"); + List<NoOp> services = Arrays.asList(one, three, two, two, three, three, anotherOne); + assertEquals(7, services.size()); + Map<String,KeywordExecutable> results = Main.checkDuplicates(services); + assertTrue(results.containsKey(one.keyword())); + assertTrue(results.containsKey(anotherOne.keyword())); + assertFalse(results.containsKey(two.keyword())); + assertFalse(results.containsKey(three.keyword())); + assertEquals(2, results.size()); + } + + // Note: this test may fail in Eclipse, if the services files haven't been generated by the AutoService annotation processor + @Test + public void testExpectedClasses() throws IOException { + assumeTrue(new File(System.getProperty("user.dir") + "/src").exists()); + TreeMap<String,Class<? extends KeywordExecutable>> expectSet = new TreeMap<>(); + expectSet.put("admin", Admin.class); + expectSet.put("check-server-config", ConfigSanityCheck.class); + expectSet.put("classpath", Classpath.class); + expectSet.put("create-token", CreateToken.class); + expectSet.put("gc", GCExecutable.class); + expectSet.put("help", Help.class); + expectSet.put("info", Info.class); + expectSet.put("init", Initialize.class); + expectSet.put("jar", Jar.class); + expectSet.put("login-info", LoginProperties.class); + expectSet.put("master", MasterExecutable.class); + expectSet.put("minicluster", MiniClusterExecutable.class); + expectSet.put("monitor", MonitorExecutable.class); + expectSet.put("proxy", Proxy.class); + expectSet.put("rfile-info", PrintInfo.class); + expectSet.put("shell", Shell.class); + expectSet.put("tracer", TracerExecutable.class); + expectSet.put("tserver", TServerExecutable.class); + expectSet.put("version", Version.class); + expectSet.put("zookeeper", ZooKeeperMain.class); + + Iterator<Entry<String,Class<? extends KeywordExecutable>>> expectIter = expectSet.entrySet().iterator(); + TreeMap<String,KeywordExecutable> actualSet = new TreeMap<>(Main.getExecutables(getClass().getClassLoader())); + Iterator<Entry<String,KeywordExecutable>> actualIter = actualSet.entrySet().iterator(); + Entry<String,Class<? extends KeywordExecutable>> expected; + Entry<String,KeywordExecutable> actual; + while (expectIter.hasNext() && actualIter.hasNext()) { + expected = expectIter.next(); + actual = actualIter.next(); + assertEquals(expected.getKey(), actual.getKey()); + assertEquals(expected.getValue(), actual.getValue().getClass()); + } + boolean moreExpected = expectIter.hasNext(); + if (moreExpected) { + while (expectIter.hasNext()) { + log.warn("Missing class for keyword '" + expectIter.next() + "'"); + } + } + assertFalse("Missing expected classes", moreExpected); + boolean moreActual = actualIter.hasNext(); + if (moreActual) { + while (actualIter.hasNext()) { + log.warn("Extra class found with keyword '" + actualIter.next() + "'"); + } + } + assertFalse("Found additional unexpected classes", moreActual); + } + + @Test + public void checkHasMain() { + assertFalse("Sanity check for test failed. Somehow the test class has a main method", hasMain(this.getClass())); + + HashSet<Class<?>> expectSet = new HashSet<>(); + expectSet.add(Admin.class); + expectSet.add(CreateToken.class); + expectSet.add(Info.class); + expectSet.add(Initialize.class); + expectSet.add(LoginProperties.class); + expectSet.add(Master.class); + expectSet.add(MiniAccumuloRunner.class); + expectSet.add(Monitor.class); + expectSet.add(PrintInfo.class); + expectSet.add(Proxy.class); + expectSet.add(Shell.class); + expectSet.add(SimpleGarbageCollector.class); + expectSet.add(TabletServer.class); + expectSet.add(TraceServer.class); + expectSet.add(ZooKeeperMain.class); + + for (Class<?> c : expectSet) { + assertTrue("Class " + c.getName() + " is missing a main method!", hasMain(c)); + } + + } + + private static boolean hasMain(Class<?> classToCheck) { + Method main; + try { + main = classToCheck.getMethod("main", new String[0].getClass()); + } catch (NoSuchMethodException e) { + return false; + } + return main != null && Modifier.isPublic(main.getModifiers()) && Modifier.isStatic(main.getModifiers()); + } + + private static class NoOp implements KeywordExecutable { + + private final String kw; + + public NoOp(String kw) { + this.kw = kw; + } + + @Override + public String keyword() { + return kw; + } + + @Override + public void execute(String[] args) throws Exception {} + + } +}