ACCUMULO-2694 update migration cleanup for 1.6 branch. - manually redo changes to Master - convert python functional test to an IT -- Add programmatic access to MasterMonitorInfo to MiniAccumuloClusterImpl -- Make SystemCredentials work with MiniAccumuloClusterImpl
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/533983f1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/533983f1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/533983f1 Branch: refs/heads/master Commit: 533983f10136ce581a8b877379b19eb2302fa578 Parents: dedc9cd Author: Sean Busbey <bus...@cloudera.com> Authored: Thu Jun 26 11:11:39 2014 -0500 Committer: Sean Busbey <bus...@cloudera.com> Committed: Fri Aug 1 19:32:25 2014 -0500 ---------------------------------------------------------------------- .../core/client/AccumuloSecurityException.java | 8 + .../impl/MiniAccumuloClusterImpl.java | 32 ++++ .../impl/MiniAccumuloClusterImplTest.java | 37 +++++ minicluster/src/test/resources/log4j.properties | 22 ++- .../server/security/SystemCredentials.java | 21 ++- .../java/org/apache/accumulo/master/Master.java | 34 ++++- test/pom.xml | 5 + .../BalanceInPresenceOfOfflineTableIT.java | 152 +++++++++++++++++++ 8 files changed, 300 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/533983f1/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java b/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java index 06b148d..35ea188 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java +++ b/core/src/main/java/org/apache/accumulo/core/client/AccumuloSecurityException.java @@ -75,6 +75,14 @@ public class AccumuloSecurityException extends Exception { } /** + * Construct a user-facing exception from a serialized version. + * @param thrift a serialized version + */ + public AccumuloSecurityException(final ThriftSecurityException thrift) { + this(thrift.getUser(), thrift.getCode(), thrift); + } + + /** * @param user * the relevant user for the security violation * @param errorcode http://git-wip-us.apache.org/repos/asf/accumulo/blob/533983f1/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java index 9a8efa6..deb04d9 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java @@ -56,9 +56,13 @@ import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.StringUtil; @@ -68,10 +72,12 @@ import org.apache.accumulo.master.Master; import org.apache.accumulo.master.state.SetGoalState; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.server.init.Initialize; +import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.util.PortUtils; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.accumulo.start.Main; import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil; +import org.apache.accumulo.trace.instrument.Tracer; import org.apache.accumulo.tserver.TabletServer; import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.io.FileUtils; @@ -82,6 +88,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.log4j.Logger; +import org.apache.thrift.TException; import org.apache.zookeeper.server.ZooKeeperServerMain; import com.google.common.base.Predicate; @@ -742,4 +749,29 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { return future.get(timeout, unit); } + + /** + * Get programmatic interface to information available in a normal monitor. + * XXX the returned structure won't contain information about the metadata table until there is data in it. + * e.g. if you want to see the metadata table you should create a table. + * @since 1.6.1 + */ + public MasterMonitorInfo getMasterMonitorInfo() throws AccumuloException, AccumuloSecurityException { + MasterClientService.Iface client = null; + MasterMonitorInfo stats = null; + try { + Instance instance = new ZooKeeperInstance(getClientConfig()); + client = MasterClient.getConnectionWithRetry(instance); + stats = client.getMasterStats(Tracer.traceInfo(), SystemCredentials.get(instance).toThrift(instance)); + } catch (ThriftSecurityException exception) { + throw new AccumuloSecurityException(exception); + } catch (TException exception) { + throw new AccumuloException(exception); + } finally { + if (client != null) { + MasterClient.close(client); + } + } + return stats; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/533983f1/minicluster/src/test/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImplTest.java ---------------------------------------------------------------------- diff --git a/minicluster/src/test/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImplTest.java b/minicluster/src/test/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImplTest.java index 99f8d7d..32b20b0 100644 --- a/minicluster/src/test/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImplTest.java +++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImplTest.java @@ -17,9 +17,17 @@ package org.apache.accumulo.minicluster.impl; import java.io.File; +import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.master.thrift.MasterState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.minicluster.ServerType; import org.apache.commons.io.FileUtils; import org.apache.log4j.Level; @@ -31,10 +39,17 @@ import org.junit.Test; public class MiniAccumuloClusterImplTest { + private static final Logger log = Logger.getLogger(MiniAccumuloClusterImplTest.class); + public static File testDir; private static MiniAccumuloClusterImpl accumulo; + private static final int NUM_TSERVERS = 2; + + private static String TEST_TABLE = "test"; + private static String testTableID; + @BeforeClass public static void setupMiniCluster() throws Exception { Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR); @@ -46,8 +61,14 @@ public class MiniAccumuloClusterImplTest { testDir.mkdir(); MiniAccumuloConfigImpl config = new MiniAccumuloConfigImpl(testDir, "superSecret").setJDWPEnabled(true); + // expressly set number of tservers since we assert it later, in case the default changes + config.setNumTservers(NUM_TSERVERS); accumulo = new MiniAccumuloClusterImpl(config); accumulo.start(); + // create a table to ensure there are some entries in the !0 table + TableOperations tableops = accumulo.getConnector("root","superSecret").tableOperations(); + tableops.create(TEST_TABLE); + testTableID = tableops.tableIdMap().get(TEST_TABLE); } @Test(timeout = 10000) @@ -67,6 +88,22 @@ public class MiniAccumuloClusterImplTest { } } + @Test + public void saneMonitorInfo() throws Exception { + log.info("ensure monitor info includes some base information."); + MasterMonitorInfo stats = accumulo.getMasterMonitorInfo(); + List<MasterState> validStates = Arrays.asList(MasterState.values()); + List<MasterGoalState> validGoals = Arrays.asList(MasterGoalState.values()); + Assert.assertTrue("master state should be valid.", validStates.contains(stats.state)); + Assert.assertTrue("master goal state should be in " + validGoals + ". is " + stats.goalState, validGoals.contains(stats.goalState)); + Assert.assertNotNull("should have a table map.", stats.tableMap); + Assert.assertTrue("root table should exist in " + stats.tableMap.keySet(), stats.tableMap.keySet().contains(RootTable.ID)); + Assert.assertTrue("meta table should exist in " + stats.tableMap.keySet(), stats.tableMap.keySet().contains(MetadataTable.ID)); + Assert.assertTrue("our test table should exist in " + stats.tableMap.keySet(), stats.tableMap.keySet().contains(testTableID)); + Assert.assertNotNull("there should be tservers.", stats.tServerInfo); + Assert.assertEquals(NUM_TSERVERS, stats.tServerInfo.size()); + } + @AfterClass public static void tearDownMiniCluster() throws Exception { accumulo.stop(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/533983f1/minicluster/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/minicluster/src/test/resources/log4j.properties b/minicluster/src/test/resources/log4j.properties index b5efe8d..75dfd78 100644 --- a/minicluster/src/test/resources/log4j.properties +++ b/minicluster/src/test/resources/log4j.properties @@ -1 +1,21 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. log4j.rootLogger=INFO, CA log4j.appender.CA=org.apache.log4j.ConsoleAppender log4j.appender.CA.layout=org.apache.log4j.PatternLayout log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%-8c{2}] %-5p: %m%n log4j .logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR \ No newline at end of file +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, CA +log4j.appender.CA=org.apache.log4j.ConsoleAppender +log4j.appender.CA.layout=org.apache.log4j.PatternLayout +log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%-8c{2}] %-5p: %m%n + +log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR http://git-wip-us.apache.org/repos/asf/accumulo/blob/533983f1/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java index 767ed25..19e7ff7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java @@ -57,14 +57,29 @@ public final class SystemCredentials extends Credentials { } public static SystemCredentials get() { + check_permission(); + if (SYSTEM_CREDS == null) { + SYSTEM_CREDS = new SystemCredentials(HdfsZooInstance.getInstance()); + } + return SYSTEM_CREDS; + } + + private static void check_permission() { SecurityManager sm = System.getSecurityManager(); if (sm != null) { sm.checkPermission(SYSTEM_CREDENTIALS_PERMISSION); } - if (SYSTEM_CREDS == null) { - SYSTEM_CREDS = new SystemCredentials(HdfsZooInstance.getInstance()); + } + + public static SystemCredentials get(Instance instance) { + check_permission(); + /* Special case to avoid duplicating SYSTEM_CREDS */ + if (null != SYSTEM_CREDS) { + if (SYSTEM_CREDS.AS_THRIFT.getInstanceId().equals(instance.getInstanceID())) { + return SYSTEM_CREDS; + } } - return SYSTEM_CREDS; + return new SystemCredentials(instance); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/533983f1/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 30b1f2e..2f04922 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -715,7 +715,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt while (stillMaster()) { if (!migrations.isEmpty()) { try { - cleanupMutations(); + cleanupOfflineMigrations(); + cleanupNonexistentMigrations(getConnector()); } catch (Exception ex) { log.error("Error cleaning up migrations", ex); } @@ -724,12 +725,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } - // If a migrating tablet splits, and the tablet dies before sending the - // master a message, the migration will refer to a non-existing tablet, - // so it can never complete. Periodically scan the metadata table and - // remove any migrating tablets that no longer exist. - private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Connector connector = getConnector(); + /** + * If a migrating tablet splits, and the tablet dies before sending the + * master a message, the migration will refer to a non-existing tablet, + * so it can never complete. Periodically scan the metadata table and + * remove any migrating tablets that no longer exist. + */ + private void cleanupNonexistentMigrations(final Connector connector) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); Set<KeyExtent> found = new HashSet<KeyExtent>(); @@ -741,6 +743,21 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } migrations.keySet().retainAll(found); } + + /** + * If migrating a tablet for a table that is offline, the migration + * can never succeed because no tablet server will load the tablet. + * check for offline tables and remove their migrations. + */ + private void cleanupOfflineMigrations() { + TableManager manager = TableManager.getInstance(); + for (String tableId : Tables.getIdToNameMap(instance).keySet()) { + TableState state = manager.getTableState(tableId); + if (TableState.OFFLINE == state) { + clearMigrations(tableId); + } + } + } } private class StatusThread extends Daemon { @@ -1206,6 +1223,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt @Override public void stateChanged(String tableId, TableState state) { nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state); + if (TableState.OFFLINE == state) { + clearMigrations(tableId); + } } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/533983f1/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index f3cc927..17c8527 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -137,6 +137,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/accumulo/blob/533983f1/test/src/test/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java new file mode 100644 index 0000000..37aa8a1 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.commons.lang.math.NumberUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Start a new table, create many splits, and offline before they can rebalance. Then try to have a different table balance + */ +public class BalanceInPresenceOfOfflineTableIT extends ConfigurableMacIT { + + private static Logger log = LoggerFactory.getLogger(BalanceInPresenceOfOfflineTableIT.class); + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> siteConfig = new HashMap<String, String>(); + siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K"); + siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0"); + cfg.setSiteConfig(siteConfig ); + // ensure we have two tservers + if (cfg.getNumTservers() < 2) { + cfg.setNumTservers(2); + } + } + + @Override + protected int defaultTimeoutSeconds() { + return 10 * 60; + } + + private static final int NUM_SPLITS = 200; + private static final String UNUSED_TABLE = "unused"; + private static final String TEST_TABLE = "test_ingest"; + + private Connector connector; + + @Before + public void setupTables() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException { + // set up splits + final SortedSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new Text(String.format("%08x", i * 1000))); + } + // load into a table we won't use + connector = getConnector(); + connector.tableOperations().create(UNUSED_TABLE); + connector.tableOperations().addSplits(UNUSED_TABLE, splits); + // mark the table offline before it can rebalance. + connector.tableOperations().offline(UNUSED_TABLE); + + // actual test table + connector.tableOperations().create(TEST_TABLE); + connector.tableOperations().setProperty(TEST_TABLE, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); + } + + @Test + public void test() throws Exception { + log.info("Test that balancing is not stopped by an offline table with outstanding migrations."); + + log.debug("starting test ingestion"); + + TestIngest.Opts opts = new TestIngest.Opts(); + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + vopts.rows = opts.rows = 200000; + TestIngest.ingest(connector, opts, BWOPTS); + connector.tableOperations().flush(TEST_TABLE, null, null, true); + VerifyIngest.verifyIngest(connector, vopts, SOPTS); + + log.debug("waiting for balancing, up to ~5 minutes to allow for migration cleanup."); + final long startTime = System.currentTimeMillis(); + long currentWait = 10*1000; + boolean balancingWorked = false; + + while (!balancingWorked && (System.currentTimeMillis() - startTime) < ((5*60 + 15)*1000)) { + Thread.sleep(currentWait); + currentWait *= 2; + + log.debug("fetch the list of tablets assigned to each tserver."); + MasterMonitorInfo stats = getCluster().getMasterMonitorInfo(); + + if (stats.getTServerInfoSize() < 2) { + log.debug("we need >= 2 servers. sleeping for " + currentWait + "ms"); + continue; + } + if (stats.getUnassignedTablets() != 0) { + log.debug("We shouldn't have unassigned tablets. sleeping for " + currentWait + "ms"); + continue; + } + + int numTablets = 0; + long[] tabletsPerServer = new long[stats.getTServerInfoSize()]; + Arrays.fill(tabletsPerServer, 0l); + for (int i = 0; i < stats.getTServerInfoSize(); i++) { + for (Map.Entry<String, TableInfo> entry : stats.getTServerInfo().get(i).getTableMap().entrySet()) { + tabletsPerServer[i] += entry.getValue().getTablets(); + } + } + + if (tabletsPerServer[0] <= 10) { + log.debug("We should have > 10 tablets. sleeping for " + currentWait + "ms"); + continue; + } + if ((NumberUtils.min(tabletsPerServer) / ((double)NumberUtils.max(tabletsPerServer))) < 0.5) { + log.debug("ratio of min to max tablets per server should be roughly even. sleeping for " + currentWait + "ms"); + continue; + } + balancingWorked = true; + } + + Assert.assertTrue("did not properly balance", balancingWorked); + } + +}