Repository: accumulo Updated Branches: refs/heads/master 8eeab8a11 -> a45dbfbcf
ACCUMULO-3618 send more state to the tablet state change iterator Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ed1aa060 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ed1aa060 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ed1aa060 Branch: refs/heads/master Commit: ed1aa060d3e01a89c4a1bdeb64b7ab1ae0c6e428 Parents: 9238416 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Tue Feb 24 13:16:57 2015 -0500 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue Feb 24 13:16:57 2015 -0500 ---------------------------------------------------------------------- .../server/master/state/CurrentState.java | 5 +++ .../master/state/MetaDataTableScanner.java | 2 + .../master/state/TabletStateChangeIterator.java | 41 +++++++++++++++++--- .../java/org/apache/accumulo/master/Master.java | 12 +++++- .../apache/accumulo/master/TestMergeState.java | 11 ++++++ .../functional/TabletStateChangeIteratorIT.java | 12 ++++++ 6 files changed, 75 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ed1aa060/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java index b07a931..af02dde 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Set; import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.master.thrift.MasterState; public interface CurrentState { @@ -27,7 +28,11 @@ public interface CurrentState { Set<TServerInstance> onlineTabletServers(); + Set<TServerInstance> shutdownServers(); + Collection<MergeInfo> merges(); Collection<KeyExtent> migrations(); + + MasterState getMasterState(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ed1aa060/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java index 3ff987e..615e5d3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java @@ -86,6 +86,8 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables()); TabletStateChangeIterator.setMerges(tabletChange, state.merges()); TabletStateChangeIterator.setMigrations(tabletChange, state.migrations()); + TabletStateChangeIterator.setMasterState(tabletChange, state.getMasterState()); + TabletStateChangeIterator.setShuttingDown(tabletChange, state.shutdownServers()); } scanner.addScanIterator(tabletChange); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ed1aa060/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java index 77b598c..e58d4fe 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.master.thrift.MasterState; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.Base64; import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; @@ -53,13 +54,16 @@ public class TabletStateChangeIterator extends SkippingIterator { private static final String MERGES_OPTION = "merges"; private static final String DEBUG_OPTION = "debug"; private static final String MIGRATIONS_OPTION = "migrations"; + private static final String MASTER_STATE_OPTION = "masterState"; + private static final String SHUTTING_DOWN_OPTION = "shuttingDown"; private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class); - Set<TServerInstance> current; - Set<String> onlineTables; - Map<Text,MergeInfo> merges; - boolean debug = false; - Set<KeyExtent> migrations; + private Set<TServerInstance> current; + private Set<String> onlineTables; + private Map<Text,MergeInfo> merges; + private boolean debug = false; + private Set<KeyExtent> migrations; + private MasterState masterState = MasterState.NORMAL; @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { @@ -69,6 +73,17 @@ public class TabletStateChangeIterator extends SkippingIterator { merges = parseMerges(options.get(MERGES_OPTION)); debug = options.containsKey(DEBUG_OPTION); migrations = parseMigrations(options.get(MIGRATIONS_OPTION)); + try { + masterState = MasterState.valueOf(options.get(MASTER_STATE_OPTION)); + } catch (Exception ex) { + if (options.get(MASTER_STATE_OPTION) != null) { + log.error("Unable to decode masterState " + options.get(MASTER_STATE_OPTION)); + } + } + Set<TServerInstance> shuttingDown = parseServers(options.get(SHUTTING_DOWN_OPTION)); + if (current != null && shuttingDown != null) { + current.removeAll(shuttingDown); + } } private Set<KeyExtent> parseMigrations(String migrations) { @@ -142,7 +157,7 @@ public class TabletStateChangeIterator extends SkippingIterator { Key k = getSource().getTopKey(); Value v = getSource().getTopValue(); - if (onlineTables == null || current == null) + if (onlineTables == null || current == null || masterState != MasterState.NORMAL) return; TabletLocationState tls; @@ -241,4 +256,18 @@ public class TabletStateChangeIterator extends SkippingIterator { cfg.addOption(MIGRATIONS_OPTION, encoded); } + public static void setMasterState(IteratorSetting cfg, MasterState state) { + cfg.addOption(MASTER_STATE_OPTION, state.toString()); + } + + public static void setShuttingDown(IteratorSetting cfg, Set<TServerInstance> servers) { + if (servers != null) { + List<String> serverList = new ArrayList<String>(); + for (TServerInstance server : servers) { + serverList.add(server.toString()); + } + cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers)); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ed1aa060/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 81339ad..562c584 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -206,7 +206,8 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections.unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>()); - synchronized MasterState getMasterState() { + @Override + public synchronized MasterState getMasterState() { return state; } @@ -774,7 +775,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List case SPLITTING: return TabletGoalState.HOSTED; case WAITING_FOR_CHOPPED: - if (tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) { + if (tls.getState(tserverSet.getCurrentServers()).equals(TabletState.HOSTED)) { if (tls.chopped) return TabletGoalState.UNASSIGNED; } else { @@ -1536,4 +1537,11 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List public Collection<KeyExtent> migrations() { return migrations.keySet(); } + + @Override + public Set<TServerInstance> shutdownServers() { + synchronized (serversToShutdown) { + return new HashSet<TServerInstance>(serversToShutdown); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ed1aa060/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java index c04f4cb..b0240f1 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java +++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.thrift.MasterState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; @@ -85,6 +86,16 @@ public class TestMergeState { public Collection<KeyExtent> migrations() { return Collections.emptyList(); } + + @Override + public MasterState getMasterState() { + return MasterState.NORMAL; + } + + @Override + public Set<TServerInstance> shutdownServers() { + return Collections.emptySet(); + } } private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/ed1aa060/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java index 4c192f7..a0d6008 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.master.thrift.MasterState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; @@ -175,6 +176,17 @@ public class TabletStateChangeIteratorIT extends SharedMiniClusterIT { public Collection<KeyExtent> migrations() { return Collections.emptyList(); } + + @Override + public MasterState getMasterState() { + return MasterState.NORMAL; + } + + @Override + public Set<TServerInstance> shutdownServers() { + return Collections.emptySet(); + } + } }